All checks were successful
continuous-integration/drone/push Build is passing
285 lines
11 KiB
Python
285 lines
11 KiB
Python
import asyncio
|
|
import uuid
|
|
from fastapi import FastAPI, HTTPException, Request, UploadFile, File
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
import os, shutil
|
|
from sqlalchemy import text
|
|
from fastapi.responses import StreamingResponse
|
|
from io import BytesIO
|
|
import pandas as pd
|
|
from app.models import ParametrosFormula
|
|
from sqlalchemy.future import select
|
|
from app.database import AsyncSessionLocal
|
|
from app.models import Fatura
|
|
from app.processor import (
|
|
fila_processamento,
|
|
processar_em_lote,
|
|
status_arquivos,
|
|
limpar_arquivos_processados
|
|
)
|
|
from app.parametros import router as parametros_router
|
|
from fastapi.responses import FileResponse
|
|
from app.models import Fatura, SelicMensal, ParametrosFormula
|
|
from datetime import date
|
|
from app.utils import avaliar_formula
|
|
|
|
|
|
|
|
app = FastAPI()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
app.mount("/static", StaticFiles(directory="app/static"), name="static")
|
|
|
|
UPLOAD_DIR = "uploads/temp"
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def dashboard(request: Request):
|
|
indicadores = [
|
|
{"titulo": "Total de Faturas", "valor": 124},
|
|
{"titulo": "Faturas com ICMS", "valor": "63%"},
|
|
{"titulo": "Valor Total", "valor": "R$ 280.000,00"},
|
|
]
|
|
|
|
analise_stf = {
|
|
"antes": {"percentual_com_icms": 80, "media_valor": 1200},
|
|
"depois": {"percentual_com_icms": 20, "media_valor": 800},
|
|
}
|
|
|
|
return templates.TemplateResponse("dashboard.html", {
|
|
"request": request,
|
|
"cliente_atual": "",
|
|
"clientes": ["Cliente A", "Cliente B"],
|
|
"indicadores": indicadores,
|
|
"analise_stf": analise_stf
|
|
})
|
|
|
|
@app.get("/upload", response_class=HTMLResponse)
|
|
def upload_page(request: Request):
|
|
app_env = os.getenv("APP_ENV", "dev") # Captura variável de ambiente
|
|
return templates.TemplateResponse("upload.html", {
|
|
"request": request,
|
|
"app_env": app_env # Passa para o template
|
|
})
|
|
|
|
@app.get("/relatorios", response_class=HTMLResponse)
|
|
def relatorios_page(request: Request):
|
|
return templates.TemplateResponse("relatorios.html", {"request": request})
|
|
|
|
@app.post("/upload-files")
|
|
async def upload_files(files: list[UploadFile] = File(...)):
|
|
for file in files:
|
|
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
|
|
with open(temp_path, "wb") as f:
|
|
shutil.copyfileobj(file.file, f)
|
|
await fila_processamento.put({
|
|
"caminho_pdf": temp_path,
|
|
"nome_original": file.filename
|
|
})
|
|
return {"message": "Arquivos enviados para fila"}
|
|
|
|
@app.post("/process-queue")
|
|
async def process_queue():
|
|
resultados = await processar_em_lote()
|
|
return {"message": "Processamento concluído", "resultados": resultados}
|
|
|
|
@app.get("/get-status")
|
|
async def get_status():
|
|
files = []
|
|
for nome, status in status_arquivos.items():
|
|
if isinstance(status, dict):
|
|
files.append({
|
|
"nome": nome,
|
|
"status": status.get("status", "Erro"),
|
|
"mensagem": status.get("mensagem", "---"),
|
|
"tempo": status.get("tempo", "---"),
|
|
"tamanho": f"{status.get('tamanho', 0)} KB",
|
|
"data": status.get("data", "")
|
|
})
|
|
|
|
else:
|
|
files.append({
|
|
"nome": nome,
|
|
"status": status,
|
|
"mensagem": "---" if status == "Concluído" else status,
|
|
"tempo": "---" # ✅ AQUI também
|
|
})
|
|
is_processing = not fila_processamento.empty()
|
|
return JSONResponse(content={"is_processing": is_processing, "files": files})
|
|
|
|
|
|
@app.post("/clear-all")
|
|
async def clear_all():
|
|
limpar_arquivos_processados()
|
|
for f in os.listdir(UPLOAD_DIR):
|
|
os.remove(os.path.join(UPLOAD_DIR, f))
|
|
return {"message": "Fila e arquivos limpos"}
|
|
|
|
@app.get("/export-excel")
|
|
async def export_excel():
|
|
async with AsyncSessionLocal() as session:
|
|
# 1. Coletar faturas e tabela SELIC
|
|
faturas_result = await session.execute(select(Fatura))
|
|
faturas = faturas_result.scalars().all()
|
|
|
|
selic_result = await session.execute(select(SelicMensal))
|
|
selic_tabela = selic_result.scalars().all()
|
|
|
|
# 2. Criar mapa {(ano, mes): percentual}
|
|
selic_map = {(s.ano, s.mes): float(s.percentual) for s in selic_tabela}
|
|
hoje = date.today()
|
|
|
|
def calcular_fator_selic(ano_inicio, mes_inicio):
|
|
fator = 1.0
|
|
ano, mes = ano_inicio, mes_inicio
|
|
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
|
|
percentual = selic_map.get((ano, mes))
|
|
if percentual:
|
|
fator *= (1 + percentual / 100)
|
|
mes += 1
|
|
if mes > 12:
|
|
mes = 1
|
|
ano += 1
|
|
return fator
|
|
|
|
# 3. Buscar fórmulas exatas por nome
|
|
formula_pis_result = await session.execute(
|
|
select(ParametrosFormula.formula).where(
|
|
ParametrosFormula.nome == "Cálculo PIS sobre ICMS",
|
|
ParametrosFormula.ativo == True
|
|
).limit(1)
|
|
)
|
|
formula_cofins_result = await session.execute(
|
|
select(ParametrosFormula.formula).where(
|
|
ParametrosFormula.nome == "Cálculo COFINS sobre ICMS",
|
|
ParametrosFormula.ativo == True
|
|
).limit(1)
|
|
)
|
|
|
|
formula_pis = formula_pis_result.scalar_one_or_none()
|
|
formula_cofins = formula_cofins_result.scalar_one_or_none()
|
|
|
|
# 4. Montar dados
|
|
mes_map = {
|
|
'JAN': 1, 'FEV': 2, 'MAR': 3, 'ABR': 4, 'MAI': 5, 'JUN': 6,
|
|
'JUL': 7, 'AGO': 8, 'SET': 9, 'OUT': 10, 'NOV': 11, 'DEZ': 12
|
|
}
|
|
|
|
dados = []
|
|
for f in faturas:
|
|
try:
|
|
if "/" in f.referencia:
|
|
mes_str, ano_str = f.referencia.split("/")
|
|
mes = mes_map.get(mes_str.strip().upper())
|
|
ano = int(ano_str)
|
|
if not mes or not ano:
|
|
raise ValueError("Mês ou ano inválido")
|
|
else:
|
|
ano = int(f.referencia[:4])
|
|
mes = int(f.referencia[4:])
|
|
|
|
fator = calcular_fator_selic(ano, mes)
|
|
periodo = f"{mes:02d}/{ano} à {hoje.month:02d}/{hoje.year}"
|
|
|
|
contexto = f.__dict__
|
|
valor_pis_icms = avaliar_formula(formula_pis, contexto) if formula_pis else None
|
|
valor_cofins_icms = avaliar_formula(formula_cofins, contexto) if formula_cofins else None
|
|
dados.append({
|
|
"Nome": f.nome,
|
|
"UC": f.unidade_consumidora,
|
|
"Referência": f.referencia,
|
|
"Nota Fiscal": f.nota_fiscal,
|
|
"Valor Total": f.valor_total,
|
|
"ICMS (%)": f.icms_aliq,
|
|
"ICMS (R$)": f.icms_valor,
|
|
"Base ICMS": f.icms_base,
|
|
"PIS (%)": f.pis_aliq,
|
|
"PIS (R$)": f.pis_valor,
|
|
"Base PIS": f.pis_base,
|
|
"COFINS (%)": f.cofins_aliq,
|
|
"COFINS (R$)": f.cofins_valor,
|
|
"Base COFINS": f.cofins_base,
|
|
"Consumo (kWh)": f.consumo,
|
|
"Tarifa": f.tarifa,
|
|
"Cidade": f.cidade,
|
|
"Estado": f.estado,
|
|
"Distribuidora": f.distribuidora,
|
|
"Data Processamento": f.data_processamento,
|
|
"Fator SELIC acumulado": fator,
|
|
"Período SELIC usado": periodo,
|
|
"PIS sobre ICMS": valor_pis_icms,
|
|
"Valor Corrigido PIS (ICMS)": valor_pis_icms * fator if valor_pis_icms else None,
|
|
"COFINS sobre ICMS": valor_cofins_icms,
|
|
"Valor Corrigido COFINS (ICMS)": valor_cofins_icms * fator if valor_cofins_icms else None,
|
|
})
|
|
except Exception as e:
|
|
print(f"Erro ao processar fatura {f.nota_fiscal}: {e}")
|
|
|
|
df = pd.DataFrame(dados)
|
|
|
|
output = BytesIO()
|
|
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
|
|
df.to_excel(writer, index=False, sheet_name="Faturas Corrigidas")
|
|
|
|
output.seek(0)
|
|
return StreamingResponse(output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={
|
|
"Content-Disposition": "attachment; filename=faturas_corrigidas.xlsx"
|
|
})
|
|
|
|
|
|
from app.parametros import router as parametros_router
|
|
app.include_router(parametros_router)
|
|
|
|
def is_homolog():
|
|
return os.getenv("APP_ENV", "dev") == "homolog"
|
|
|
|
@app.post("/limpar-faturas")
|
|
async def limpar_faturas():
|
|
app_env = os.getenv("APP_ENV", "dev")
|
|
if app_env not in ["homolog", "dev", "local"]:
|
|
return JSONResponse(status_code=403, content={"message": "Operação não permitida neste ambiente."})
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
print("🧪 Limpando faturas do banco...")
|
|
await session.execute(text("DELETE FROM faturas.faturas"))
|
|
await session.commit()
|
|
|
|
upload_path = os.path.join("app", "uploads")
|
|
for nome in os.listdir(upload_path):
|
|
caminho = os.path.join(upload_path, nome)
|
|
if os.path.isfile(caminho):
|
|
os.remove(caminho)
|
|
|
|
return {"message": "Faturas e arquivos apagados com sucesso."}
|
|
|
|
@app.get("/erros/download")
|
|
async def download_erros():
|
|
zip_path = os.path.join("app", "uploads", "erros", "faturas_erro.zip")
|
|
if os.path.exists(zip_path):
|
|
response = FileResponse(zip_path, filename="faturas_erro.zip", media_type="application/zip")
|
|
# ⚠️ Agendar exclusão após resposta
|
|
asyncio.create_task(limpar_erros())
|
|
return response
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Arquivo de erro não encontrado.")
|
|
|
|
@app.get("/erros/log")
|
|
async def download_log_erros():
|
|
txt_path = os.path.join("app", "uploads", "erros", "erros.txt")
|
|
if os.path.exists(txt_path):
|
|
response = FileResponse(txt_path, filename="erros.txt", media_type="text/plain")
|
|
# ⚠️ Agendar exclusão após resposta
|
|
asyncio.create_task(limpar_erros())
|
|
return response
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Log de erro não encontrado.")
|
|
|
|
async def limpar_erros():
|
|
await asyncio.sleep(5) # Aguarda 5 segundos para garantir que o download inicie
|
|
pasta = os.path.join("app", "uploads", "erros")
|
|
for nome in ["faturas_erro.zip", "erros.txt"]:
|
|
caminho = os.path.join(pasta, nome)
|
|
if os.path.exists(caminho):
|
|
os.remove(caminho)
|