474 lines
18 KiB
Python
474 lines
18 KiB
Python
import asyncio
|
||
import uuid
|
||
from fastapi import FastAPI, HTTPException, Request, UploadFile, File
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.responses import HTMLResponse, JSONResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
import os, shutil
|
||
from sqlalchemy import text
|
||
from datetime import date
|
||
import re
|
||
from fastapi.responses import StreamingResponse
|
||
from io import BytesIO
|
||
from app.models import ParametrosFormula
|
||
from sqlalchemy.future import select
|
||
from app.database import AsyncSessionLocal
|
||
from app.models import Fatura
|
||
from app.processor import (
|
||
fila_processamento,
|
||
processar_em_lote,
|
||
status_arquivos,
|
||
limpar_arquivos_processados
|
||
)
|
||
from fastapi.responses import FileResponse
|
||
from app.models import Fatura, SelicMensal, ParametrosFormula
|
||
from datetime import date
|
||
from app.utils import avaliar_formula
|
||
|
||
|
||
app = FastAPI()
|
||
templates = Jinja2Templates(directory="app/templates")
|
||
app.mount("/static", StaticFiles(directory="app/static"), name="static")
|
||
|
||
UPLOAD_DIR = "uploads/temp"
|
||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||
|
||
def _parse_referencia(ref: str):
|
||
"""Aceita 'JAN/2024', 'JAN/24', '01/2024', '01/24', '202401'. Retorna (ano, mes)."""
|
||
meses = {'JAN':1,'FEV':2,'MAR':3,'ABR':4,'MAI':5,'JUN':6,'JUL':7,'AGO':8,'SET':9,'OUT':10,'NOV':11,'DEZ':12}
|
||
ref = (ref or "").strip().upper()
|
||
|
||
if "/" in ref:
|
||
a, b = [p.strip() for p in ref.split("/", 1)]
|
||
# mês pode vir 'JAN' ou '01'
|
||
mes = meses.get(a, None)
|
||
if mes is None:
|
||
mes = int(re.sub(r"\D", "", a) or 1)
|
||
ano = int(re.sub(r"\D", "", b) or 0)
|
||
# ano 2 dígitos -> 2000+
|
||
if ano < 100:
|
||
ano += 2000
|
||
else:
|
||
# '202401' ou '2024-01'
|
||
num = re.sub(r"\D", "", ref)
|
||
if len(num) >= 6:
|
||
ano, mes = int(num[:4]), int(num[4:6])
|
||
elif len(num) == 4: # '2024'
|
||
ano, mes = int(num), 1
|
||
else:
|
||
ano, mes = date.today().year, 1
|
||
return ano, mes
|
||
|
||
async def _carregar_selic_map(session):
|
||
res = await session.execute(text("SELECT ano, mes, percentual FROM faturas.selic_mensal"))
|
||
rows = res.mappings().all()
|
||
return {(int(r["ano"]), int(r["mes"])): float(r["percentual"]) for r in rows}
|
||
|
||
def _fator_selic_from_map(selic_map: dict, ano_inicio: int, mes_inicio: int, hoje: date) -> float:
|
||
try:
|
||
ano, mes = int(ano_inicio), int(mes_inicio)
|
||
except Exception:
|
||
return 1.0
|
||
if ano > hoje.year or (ano == hoje.year and mes > hoje.month):
|
||
return 1.0
|
||
|
||
fator = 1.0
|
||
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
|
||
perc = selic_map.get((ano, mes))
|
||
if perc is not None:
|
||
fator *= (1 + (perc / 100.0))
|
||
mes += 1
|
||
if mes > 12:
|
||
mes = 1
|
||
ano += 1
|
||
return fator
|
||
|
||
|
||
def _avaliar_formula(texto_formula: str | None, contexto: dict) -> float:
|
||
if not texto_formula:
|
||
return 0.0
|
||
expr = str(texto_formula)
|
||
|
||
# Substitui nomes de campos por valores numéricos (None -> 0)
|
||
for campo, valor in contexto.items():
|
||
v = valor
|
||
if v is None or v == "":
|
||
v = 0
|
||
# aceita vírgula como decimal vindo do banco
|
||
if isinstance(v, str):
|
||
v = v.replace(".", "").replace(",", ".") if re.search(r"[0-9],[0-9]", v) else v
|
||
expr = re.sub(rf'\b{re.escape(str(campo))}\b', str(v), expr)
|
||
|
||
try:
|
||
return float(eval(expr, {"__builtins__": {}}, {}))
|
||
except Exception:
|
||
return 0.0
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def dashboard(request: Request, cliente: str | None = None):
|
||
print("DBG /: inicio", flush=True)
|
||
try:
|
||
async with AsyncSessionLocal() as session:
|
||
print("DBG /: abrindo sessão", flush=True)
|
||
|
||
r = await session.execute(text(
|
||
"SELECT DISTINCT nome FROM faturas.faturas ORDER BY nome"
|
||
))
|
||
clientes = [c for c, in r.fetchall()]
|
||
print(f"DBG /: clientes={len(clientes)}", flush=True)
|
||
|
||
# Fórmulas
|
||
fp = await session.execute(text("""
|
||
SELECT formula FROM faturas.parametros_formula
|
||
WHERE nome = 'Cálculo PIS sobre ICMS' AND ativo = TRUE LIMIT 1
|
||
"""))
|
||
formula_pis = fp.scalar_one_or_none()
|
||
fc = await session.execute(text("""
|
||
SELECT formula FROM faturas.parametros_formula
|
||
WHERE nome = 'Cálculo COFINS sobre ICMS' AND ativo = TRUE LIMIT 1
|
||
"""))
|
||
formula_cofins = fc.scalar_one_or_none()
|
||
print(f"DBG /: tem_formulas pis={bool(formula_pis)} cofins={bool(formula_cofins)}", flush=True)
|
||
|
||
sql = "SELECT * FROM faturas.faturas"
|
||
params = {}
|
||
if cliente:
|
||
sql += " WHERE nome = :cliente"
|
||
params["cliente"] = cliente
|
||
print("DBG /: SQL faturas ->", sql, params, flush=True)
|
||
|
||
ftrs = (await session.execute(text(sql), params)).mappings().all()
|
||
print(f"DBG /: total_faturas={len(ftrs)}", flush=True)
|
||
|
||
# ===== KPIs e Séries para o dashboard =====
|
||
from collections import defaultdict
|
||
|
||
total_faturas = len(ftrs)
|
||
qtd_icms_na_base = 0
|
||
soma_corrigida = 0.0
|
||
hoje = date.today()
|
||
selic_map = await _carregar_selic_map(session)
|
||
|
||
# Séries e somatórios comerciais
|
||
serie_mensal = defaultdict(float) # {(ano, mes): valor_corrigido}
|
||
sum_por_dist = defaultdict(float) # {"distribuidora": valor_corrigido}
|
||
somatorio_v_total = 0.0
|
||
contagem_com_icms = 0
|
||
|
||
for f in ftrs:
|
||
ctx = dict(f)
|
||
|
||
# PIS/COFINS sobre ICMS
|
||
v_pis = _avaliar_formula(formula_pis, ctx)
|
||
v_cof = _avaliar_formula(formula_cofins, ctx)
|
||
v_total = max(0.0, float(v_pis or 0) + float(v_cof or 0))
|
||
|
||
# % de faturas com ICMS na base
|
||
if (v_pis or 0) > 0:
|
||
qtd_icms_na_base += 1
|
||
contagem_com_icms += 1
|
||
|
||
# referência -> (ano,mes)
|
||
try:
|
||
ano, mes = _parse_referencia(f.get("referencia"))
|
||
except Exception:
|
||
ano, mes = hoje.year, hoje.month
|
||
|
||
# SELIC
|
||
fator = _fator_selic_from_map(selic_map, ano, mes, hoje)
|
||
valor_corrigido = v_total * fator
|
||
|
||
soma_corrigida += valor_corrigido
|
||
somatorio_v_total += v_total
|
||
|
||
# séries
|
||
serie_mensal[(ano, mes)] += valor_corrigido
|
||
dist = (f.get("distribuidora") or "").strip() or "Não informado"
|
||
sum_por_dist[dist] += valor_corrigido
|
||
|
||
percentual_icms_base = (qtd_icms_na_base / total_faturas * 100.0) if total_faturas else 0.0
|
||
valor_restituicao_corrigida = soma_corrigida
|
||
valor_medio_com_icms = (somatorio_v_total / contagem_com_icms) if contagem_com_icms else 0.0
|
||
|
||
# total de clientes (distinct já carregado)
|
||
total_clientes = len(clientes)
|
||
|
||
# Série mensal – últimos 12 meses
|
||
ultimos = []
|
||
a, m = hoje.year, hoje.month
|
||
for _ in range(12):
|
||
ultimos.append((a, m))
|
||
m -= 1
|
||
if m == 0:
|
||
m = 12; a -= 1
|
||
ultimos.reverse()
|
||
|
||
serie_mensal_labels = [f"{mes:02d}/{ano}" for (ano, mes) in ultimos]
|
||
serie_mensal_valores = [round(serie_mensal.get((ano, mes), 0.0), 2) for (ano, mes) in ultimos]
|
||
|
||
# Top 5 distribuidoras
|
||
top5 = sorted(sum_por_dist.items(), key=lambda kv: kv[1], reverse=True)[:5]
|
||
top5_labels = [k for k, _ in top5]
|
||
top5_valores = [round(v, 2) for _, v in top5]
|
||
|
||
print("DBG /: calculos OK", flush=True)
|
||
|
||
|
||
print("DBG /: render template", flush=True)
|
||
return templates.TemplateResponse("dashboard.html", {
|
||
"request": request,
|
||
"clientes": clientes,
|
||
"cliente_atual": cliente or "",
|
||
"total_faturas": total_faturas,
|
||
"valor_restituicao_corrigida": valor_restituicao_corrigida,
|
||
"percentual_icms_base": percentual_icms_base,
|
||
|
||
# Novos dados para o template
|
||
"total_clientes": total_clientes,
|
||
"valor_medio_com_icms": valor_medio_com_icms,
|
||
"situacao_atual_percent": percentual_icms_base, # para gráfico de alerta
|
||
"serie_mensal_labels": serie_mensal_labels,
|
||
"serie_mensal_valores": serie_mensal_valores,
|
||
"top5_labels": top5_labels,
|
||
"top5_valores": top5_valores,
|
||
})
|
||
except Exception as e:
|
||
import traceback
|
||
print("ERR /:", e, flush=True)
|
||
traceback.print_exc()
|
||
# Página de erro amigável (sem derrubar servidor)
|
||
return HTMLResponse(
|
||
f"<pre style='padding:16px;color:#b91c1c;background:#fff1f2'>Falha no dashboard:\n{e}</pre>",
|
||
status_code=500
|
||
)
|
||
|
||
|
||
@app.get("/upload", response_class=HTMLResponse)
|
||
def upload_page(request: Request):
|
||
app_env = os.getenv("APP_ENV", "dev") # Captura variável de ambiente
|
||
return templates.TemplateResponse("upload.html", {
|
||
"request": request,
|
||
"app_env": app_env # Passa para o template
|
||
})
|
||
|
||
@app.get("/relatorios", response_class=HTMLResponse)
|
||
def relatorios_page(request: Request):
|
||
return templates.TemplateResponse("relatorios.html", {"request": request})
|
||
|
||
@app.post("/upload-files")
|
||
async def upload_files(files: list[UploadFile] = File(...)):
|
||
for file in files:
|
||
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
|
||
with open(temp_path, "wb") as f:
|
||
shutil.copyfileobj(file.file, f)
|
||
await fila_processamento.put({
|
||
"caminho_pdf": temp_path,
|
||
"nome_original": file.filename
|
||
})
|
||
return {"message": "Arquivos enviados para fila"}
|
||
|
||
@app.post("/process-queue")
|
||
async def process_queue():
|
||
resultados = await processar_em_lote()
|
||
return {"message": "Processamento concluído", "resultados": resultados}
|
||
|
||
@app.get("/get-status")
|
||
async def get_status():
|
||
files = []
|
||
for nome, status in status_arquivos.items():
|
||
if isinstance(status, dict):
|
||
files.append({
|
||
"nome": nome,
|
||
"status": status.get("status", "Erro"),
|
||
"mensagem": status.get("mensagem", "---"),
|
||
"tempo": status.get("tempo", "---"),
|
||
"tamanho": f"{status.get('tamanho', 0)} KB",
|
||
"data": status.get("data", "")
|
||
})
|
||
|
||
else:
|
||
files.append({
|
||
"nome": nome,
|
||
"status": status,
|
||
"mensagem": "---" if status == "Concluído" else status,
|
||
"tempo": "---" # ✅ AQUI também
|
||
})
|
||
is_processing = not fila_processamento.empty()
|
||
return JSONResponse(content={"is_processing": is_processing, "files": files})
|
||
|
||
|
||
@app.post("/clear-all")
|
||
async def clear_all():
|
||
limpar_arquivos_processados()
|
||
for f in os.listdir(UPLOAD_DIR):
|
||
os.remove(os.path.join(UPLOAD_DIR, f))
|
||
return {"message": "Fila e arquivos limpos"}
|
||
|
||
@app.get("/export-excel")
|
||
async def export_excel():
|
||
import pandas as pd
|
||
async with AsyncSessionLocal() as session:
|
||
# 1. Coletar faturas e tabela SELIC
|
||
faturas_result = await session.execute(select(Fatura))
|
||
faturas = faturas_result.scalars().all()
|
||
|
||
selic_result = await session.execute(select(SelicMensal))
|
||
selic_tabela = selic_result.scalars().all()
|
||
|
||
# 2. Criar mapa {(ano, mes): percentual}
|
||
selic_map = {(s.ano, s.mes): float(s.percentual) for s in selic_tabela}
|
||
hoje = date.today()
|
||
|
||
def calcular_fator_selic(ano_inicio, mes_inicio):
|
||
fator = 1.0
|
||
ano, mes = ano_inicio, mes_inicio
|
||
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
|
||
percentual = selic_map.get((ano, mes))
|
||
if percentual:
|
||
fator *= (1 + percentual / 100)
|
||
mes += 1
|
||
if mes > 12:
|
||
mes = 1
|
||
ano += 1
|
||
return fator
|
||
|
||
# 3. Buscar fórmulas exatas por nome
|
||
formula_pis_result = await session.execute(
|
||
select(ParametrosFormula.formula).where(
|
||
ParametrosFormula.nome == "Cálculo PIS sobre ICMS",
|
||
ParametrosFormula.ativo == True
|
||
).limit(1)
|
||
)
|
||
formula_cofins_result = await session.execute(
|
||
select(ParametrosFormula.formula).where(
|
||
ParametrosFormula.nome == "Cálculo COFINS sobre ICMS",
|
||
ParametrosFormula.ativo == True
|
||
).limit(1)
|
||
)
|
||
|
||
formula_pis = formula_pis_result.scalar_one_or_none()
|
||
formula_cofins = formula_cofins_result.scalar_one_or_none()
|
||
|
||
# 4. Montar dados
|
||
mes_map = {
|
||
'JAN': 1, 'FEV': 2, 'MAR': 3, 'ABR': 4, 'MAI': 5, 'JUN': 6,
|
||
'JUL': 7, 'AGO': 8, 'SET': 9, 'OUT': 10, 'NOV': 11, 'DEZ': 12
|
||
}
|
||
|
||
dados = []
|
||
for f in faturas:
|
||
try:
|
||
if "/" in f.referencia:
|
||
mes_str, ano_str = f.referencia.split("/")
|
||
mes = mes_map.get(mes_str.strip().upper())
|
||
ano = int(ano_str)
|
||
if not mes or not ano:
|
||
raise ValueError("Mês ou ano inválido")
|
||
else:
|
||
ano = int(f.referencia[:4])
|
||
mes = int(f.referencia[4:])
|
||
|
||
fator = calcular_fator_selic(ano, mes)
|
||
periodo = f"{mes:02d}/{ano} à {hoje.month:02d}/{hoje.year}"
|
||
|
||
contexto = f.__dict__
|
||
valor_pis_icms = avaliar_formula(formula_pis, contexto) if formula_pis else None
|
||
valor_cofins_icms = avaliar_formula(formula_cofins, contexto) if formula_cofins else None
|
||
dados.append({
|
||
"Nome": f.nome,
|
||
"UC": f.unidade_consumidora,
|
||
"Referência": f.referencia,
|
||
"Nota Fiscal": f.nota_fiscal,
|
||
"Valor Total": f.valor_total,
|
||
"ICMS (%)": f.icms_aliq,
|
||
"ICMS (R$)": f.icms_valor,
|
||
"Base ICMS": f.icms_base,
|
||
"PIS (%)": f.pis_aliq,
|
||
"PIS (R$)": f.pis_valor,
|
||
"Base PIS": f.pis_base,
|
||
"COFINS (%)": f.cofins_aliq,
|
||
"COFINS (R$)": f.cofins_valor,
|
||
"Base COFINS": f.cofins_base,
|
||
"Consumo (kWh)": f.consumo,
|
||
"Tarifa": f.tarifa,
|
||
"Cidade": f.cidade,
|
||
"Estado": f.estado,
|
||
"Distribuidora": f.distribuidora,
|
||
"Data Processamento": f.data_processamento,
|
||
"Fator SELIC acumulado": fator,
|
||
"Período SELIC usado": periodo,
|
||
"PIS sobre ICMS": valor_pis_icms,
|
||
"Valor Corrigido PIS (ICMS)": valor_pis_icms * fator if valor_pis_icms else None,
|
||
"COFINS sobre ICMS": valor_cofins_icms,
|
||
"Valor Corrigido COFINS (ICMS)": valor_cofins_icms * fator if valor_cofins_icms else None,
|
||
})
|
||
except Exception as e:
|
||
print(f"Erro ao processar fatura {f.nota_fiscal}: {e}")
|
||
|
||
df = pd.DataFrame(dados)
|
||
|
||
output = BytesIO()
|
||
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
|
||
df.to_excel(writer, index=False, sheet_name="Faturas Corrigidas")
|
||
|
||
output.seek(0)
|
||
return StreamingResponse(output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={
|
||
"Content-Disposition": "attachment; filename=faturas_corrigidas.xlsx"
|
||
})
|
||
|
||
|
||
from app.parametros import router as parametros_router
|
||
app.include_router(parametros_router)
|
||
|
||
def is_homolog():
|
||
return os.getenv("APP_ENV", "dev") == "homolog"
|
||
|
||
@app.post("/limpar-faturas")
|
||
async def limpar_faturas():
|
||
app_env = os.getenv("APP_ENV", "dev")
|
||
if app_env not in ["homolog", "dev", "local"]:
|
||
return JSONResponse(status_code=403, content={"message": "Operação não permitida neste ambiente."})
|
||
|
||
async with AsyncSessionLocal() as session:
|
||
print("🧪 Limpando faturas do banco...")
|
||
await session.execute(text("DELETE FROM faturas.faturas"))
|
||
await session.commit()
|
||
|
||
upload_path = os.path.join("app", "uploads")
|
||
for nome in os.listdir(upload_path):
|
||
caminho = os.path.join(upload_path, nome)
|
||
if os.path.isfile(caminho):
|
||
os.remove(caminho)
|
||
|
||
return {"message": "Faturas e arquivos apagados com sucesso."}
|
||
|
||
@app.get("/erros/download")
|
||
async def download_erros():
|
||
zip_path = os.path.join("app", "uploads", "erros", "faturas_erro.zip")
|
||
if os.path.exists(zip_path):
|
||
response = FileResponse(zip_path, filename="faturas_erro.zip", media_type="application/zip")
|
||
# ⚠️ Agendar exclusão após resposta
|
||
asyncio.create_task(limpar_erros())
|
||
return response
|
||
else:
|
||
raise HTTPException(status_code=404, detail="Arquivo de erro não encontrado.")
|
||
|
||
@app.get("/erros/log")
|
||
async def download_log_erros():
|
||
txt_path = os.path.join("app", "uploads", "erros", "erros.txt")
|
||
if os.path.exists(txt_path):
|
||
response = FileResponse(txt_path, filename="erros.txt", media_type="text/plain")
|
||
# ⚠️ Agendar exclusão após resposta
|
||
asyncio.create_task(limpar_erros())
|
||
return response
|
||
else:
|
||
raise HTTPException(status_code=404, detail="Log de erro não encontrado.")
|
||
|
||
async def limpar_erros():
|
||
await asyncio.sleep(5) # Aguarda 5 segundos para garantir que o download inicie
|
||
pasta = os.path.join("app", "uploads", "erros")
|
||
for nome in ["faturas_erro.zip", "erros.txt"]:
|
||
caminho = os.path.join(pasta, nome)
|
||
if os.path.exists(caminho):
|
||
os.remove(caminho)
|