Files
app_faturas/app/processor.py
2025-07-28 22:47:31 -03:00

133 lines
4.4 KiB
Python

import logging
import os
import shutil
import asyncio
from sqlalchemy.future import select
from app.utils import extrair_dados_pdf
from app.database import AsyncSessionLocal
from app.models import Fatura, LogProcessamento
import time
import traceback
logger = logging.getLogger(__name__)
UPLOADS_DIR = os.path.join("app", "uploads")
TEMP_DIR = os.path.join(UPLOADS_DIR, "temp")
fila_processamento = asyncio.Queue()
status_arquivos = {}
def remover_arquivo_temp(caminho_pdf):
try:
if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf:
os.remove(caminho_pdf)
logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}")
except Exception as e:
logger.warning(f"Falha ao remover arquivo temporário: {e}")
def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal):
try:
extensao = os.path.splitext(nome_original)[1].lower()
nome_destino = f"{nota_fiscal}{extensao}"
destino_final = os.path.join(UPLOADS_DIR, nome_destino)
shutil.copy2(caminho_pdf_temp, destino_final)
return destino_final
except Exception as e:
logger.error(f"Erro ao salvar em uploads: {e}")
return caminho_pdf_temp
async def process_single_file(caminho_pdf_temp: str, nome_original: str):
inicio = time.perf_counter()
async with AsyncSessionLocal() as session:
try:
dados = extrair_dados_pdf(caminho_pdf_temp)
dados['arquivo_pdf'] = nome_original
# Verifica se a fatura já existe
existente_result = await session.execute(
select(Fatura).filter_by(
nota_fiscal=dados['nota_fiscal'],
unidade_consumidora=dados['unidade_consumidora']
)
)
if existente_result.scalar_one_or_none():
duracao = round(time.perf_counter() - inicio, 2)
remover_arquivo_temp(caminho_pdf_temp)
return {
"status": "Duplicado",
"dados": dados,
"tempo": f"{duracao}s"
}
# Salva arquivo final
caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal'])
dados['link_arquivo'] = caminho_final
# Salva fatura
fatura = Fatura(**dados)
session.add(fatura)
await session.commit()
remover_arquivo_temp(caminho_pdf_temp)
duracao = round(time.perf_counter() - inicio, 2)
return {
"status": "Concluído",
"dados": dados,
"tempo": f"{duracao}s"
}
except Exception as e:
erro_str = traceback.format_exc()
duracao = round(time.perf_counter() - inicio, 2)
await session.rollback()
remover_arquivo_temp(caminho_pdf_temp)
print(f"\n📄 ERRO no arquivo: {nome_original}")
print(f"⏱ Tempo até erro: {duracao}s")
print(f"❌ Erro detalhado:\n{erro_str}")
return {
"status": "Erro",
"mensagem": str(e),
"tempo": f"{duracao}s",
"trace": erro_str
}
async def processar_em_lote():
import traceback # para exibir erros
resultados = []
while not fila_processamento.empty():
item = await fila_processamento.get()
try:
resultado = await process_single_file(item['caminho_pdf'], item['nome_original'])
status_arquivos[item['nome_original']] = {
"status": resultado.get("status"),
"mensagem": resultado.get("mensagem", ""),
"tempo": resultado.get("tempo", "---")
}
resultados.append(status_arquivos[item['nome_original']])
except Exception as e:
status_arquivos[item['nome_original']] = {
"status": "Erro",
"mensagem": str(e),
"tempo": "---"
}
resultados.append({
"nome": item['nome_original'],
"status": "Erro",
"mensagem": str(e)
})
print(f"Erro ao processar {item['nome_original']}: {e}")
print(traceback.format_exc())
return resultados
def limpar_arquivos_processados():
status_arquivos.clear()
while not fila_processamento.empty():
fila_processamento.get_nowait()