transcription-cost-usage-re.../backend/app/services/report_service.py

345 lines
12 KiB
Python
Raw Normal View History

2025-06-09 11:13:05 +00:00
from flask import current_app
from datetime import datetime
from decimal import Decimal
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Font, PatternFill
import pandas as pd
import os
from typing import List, Dict, Any, Optional
from app.utils.mysql_query import execute_query
from app.utils.calc_api_usage import calculate_api_usage
import math
2025-06-09 11:13:05 +00:00
class TranscriptionReportService:
def __init__(self, company_id: str, start_date: str, end_date: str):
self.company_id = str(company_id)
self.start_date = start_date
self.end_date = end_date
self.end_date = end_date
self.mongo_client = current_app.mongo_client
self.mongo_results = []
self.unique_ids= []
def _fetch_mongo_data(self, page: int = 1, page_size: int = 20, all_data: Optional[bool] = False) -> Dict[str, int]:
collection = self.mongo_client["billing-api"]["api_usages"]
# Define os produtos válidos a partir da tabela de preços
pricing_collection = self.mongo_client["billing-api"]["api_pricings"]
result_stt = pricing_collection.find({"product": {"$nin": ["whatsapp"]}})
2025-06-09 11:13:05 +00:00
products = [t["product"] for t in result_stt]
match_stage = {
"$match": {
"companyId": self.company_id,
"product": {"$in": products},
"createdAt": {
"$gte": datetime.strptime(f"{self.start_date} 00:00:00", "%Y-%m-%d %H:%M:%S"),
"$lte": datetime.strptime(f"{self.end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
}
}
}
lookup_stage = {
"$lookup": {
"from": "api_pricings",
"localField": "product",
"foreignField": "product",
"as": "pricing"
}
}
unwind_stage = {
"$unwind": "$pricing"
}
# Agrupa por sessionId + type + product
group_stage_1 = {
2025-06-09 11:13:05 +00:00
"$group": {
"_id": {
"sessionId": "$sessionId",
"type": "$pricing.type",
"product": "$product"
},
"usage": {"$sum": {"$toDouble": "$usage"}},
2025-06-09 11:13:05 +00:00
"totalCost": {"$sum": {"$toDouble": "$total_cost"}},
"callerIds": {"$addToSet": "$callerId"},
"firstCreatedAt": {"$min": "$createdAt"}
}
}
# Agrupa por sessionId final, montando maps de uso e custo
group_stage_2 = {
"$group": {
"_id": "$_id.sessionId",
"count": {"$sum": 1},
"firstCreatedAt": {"$first": "$firstCreatedAt"},
"callerIds": {"$first": "$callerIds"},
"totalCost": {"$sum": "$totalCost"},
"usageByType": {
"$push": {
"k": "$_id.type",
"v": "$usage"
}
},
"costByType": {
"$push": {
"k": "$_id.type",
"v": "$totalCost"
}
},
"usageByProduct": {
"$push": {
"k": "$_id.product",
"v": "$usage"
}
}
}
}
project_stage = {
"$project": {
"count": 1,
"firstCreatedAt": 1,
"callerIds": 1,
"totalCost": 1,
"usageByType": {"$arrayToObject": "$usageByType"},
"costByType": {"$arrayToObject": "$costByType"},
"usageByProduct": {"$arrayToObject": "$usageByProduct"}
2025-06-09 11:13:05 +00:00
}
}
sort_stage = {"$sort": {"firstCreatedAt": 1}}
# Monta o pipeline
pipeline = [
match_stage,
lookup_stage,
unwind_stage,
group_stage_1,
group_stage_2,
project_stage,
sort_stage
]
2025-06-09 11:13:05 +00:00
if not all_data:
pipeline.extend([
{"$skip": (page - 1) * page_size},
{"$limit": page_size}
])
# Executa agregação principal
2025-06-09 11:13:05 +00:00
self.mongo_results = list(collection.aggregate(pipeline))
self.unique_ids = [doc["_id"] for doc in self.mongo_results]
# print("=====> self.mongo_results: ", self.mongo_results)
2025-06-09 11:13:05 +00:00
# Pipeline para contagem total
2025-06-09 11:13:05 +00:00
count_pipeline = [
match_stage,
lookup_stage,
unwind_stage,
group_stage_1,
group_stage_2,
2025-06-09 11:13:05 +00:00
{"$count": "total"}
]
count_result = list(collection.aggregate(count_pipeline))
total = count_result[0]["total"] if count_result else 0
return {
2025-06-09 11:13:05 +00:00
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
def _fetch_mysql_data(self, hit_report: Optional[bool] = False)-> List[Dict[str, Any]]:
2025-06-09 11:13:05 +00:00
collection = self.mongo_client["billing-api"]["api_products"]
products = list(collection.find({}))
2025-06-09 11:13:05 +00:00
sql = f"""SELECT
uniqueid,
src,
dst,
MIN(calldate) AS start_call,
MAX(calldate) AS end_call,
SUM(CASE
WHEN dstchannel LIKE 'PJSIP/%' AND lastapp = 'Queue'
THEN billsec
ELSE 0
END) AS total_billsec
FROM
tab_cdr
WHERE
uniqueid IN {tuple(self.unique_ids)}
GROUP BY
uniqueid, src, dst;"""
rows = execute_query(self.company_id, sql)
if hit_report:
# collection = self.mongo_client["billing-api"]["api_pricings"]
# result_stt = collection.find({"type": "stt"})
2025-06-09 11:13:05 +00:00
# result_stt = [{"product": t["product"], "clientPrice": t["clientPrice"]} for t in result_stt if "clientPrice" in t]
2025-06-09 11:13:05 +00:00
for row in rows:
2025-06-09 11:13:05 +00:00
row["companyId"] = self.company_id
if rowMongo := next((m for m in self.mongo_results if m["_id"] == row["uniqueid"] ), None):
row["custo_hit"] = f"{float(rowMongo["totalCost"])}"
row["qtd_token_input"] = rowMongo.get('usageByType', {}).get('input', 0)
row["qtd_token_output"] = rowMongo.get('usageByType', {}).get('output', 0)
2025-06-09 11:13:05 +00:00
self.client_price_row(products, row)
self.formate_properties(row)
2025-06-09 11:13:05 +00:00
else:
for row in rows:
row["total_min"] = f"{(int(row['total_billsec']) / 60):.2f}"
self.client_price_row(products, row)
self.formate_properties(row)
2025-06-09 11:13:05 +00:00
return rows
def formate_properties(self, row):
for key in row:
if isinstance(row[key], datetime):
row[key] = row[key].isoformat(sep=' ')
elif isinstance(row[key], Decimal):
row[key] = float(row[key])
elif key == "uniqueid":
row[key] = str(row[key])
def client_price_row(self, products, row):
if products and len(products) > 0 and products[0]["priceHistory"]:
matched_period = None
last_period = None
for period in products[0]["priceHistory"]:
last_period = period
start = period['startDate'].date()
end = period['endDate'].date() if period['endDate'] else None
start_call = row['start_call'].date()
if end:
if start <= start_call <= end:
matched_period = period
break
else:
if start_call >= start:
matched_period = period
break
f"{(int(row['total_billsec']) / 60):.2f}"
if matched_period:
row['client_total_cost'] = f"{((int(row['total_billsec']) / 60) * matched_period['price'])}"
row["client_price"] = matched_period['price']
else:
row['client_total_cost'] = f"{((int(row['total_billsec']) / 60) * last_period['price'])}"
row["client_price"] = last_period['price']
2025-06-09 11:13:05 +00:00
def _create_excel(self, data: list, hit_report: Optional[bool] = False) -> str:
if hit_report:
header_mapping = {
"companyId": "Empresa",
"uniqueid": "Identificador da chamada",
"src": "Origem",
"dst": "Destino",
"total_billsec": "Quantidade de segundos",
"custo_hit": "Custo HIT",
"qtd_token_input": "Quantidade de tokens(input)",
"qtd_token_output": "Quantidade de tokens(output)",
"client_total_cost": "Custo Cliente",
"client_price": "Preço Cliente por Minuto",
"start_call": "Inicio",
"end_call": "Fim"
2025-06-09 11:13:05 +00:00
}
else:
header_mapping = {
"uniqueid": "Identificador da chamada",
"src": "Origem",
"dst": "Destino",
"start_call": "Inicio da Chamada",
"total_billsec": "Duração (Em segundos)",
"total_min": "Duração (Em minutos)",
"client_total_cost": "Custo Cliente",
2025-06-09 11:13:05 +00:00
}
# Filtrar e ordenar os dados conforme header_mapping
selected_keys = list(header_mapping.keys())
filtered_data = [{k: row.get(k, "") for k in selected_keys} for row in data]
df = pd.DataFrame(filtered_data, columns=selected_keys)
# Criação do Excel
wb = Workbook()
ws = wb.active
ws.title = "tab_cdr"
header_font = Font(bold=True)
yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
# Adiciona cabeçalhos personalizados
custom_headers = [header_mapping[col] for col in selected_keys]
ws.append(custom_headers)
for cell in ws[ws.max_row]:
cell.font = header_font
cell.fill = yellow_fill
# Adiciona os dados
for row in df.itertuples(index=False, name=None):
ws.append(row)
# Define caminho e salva o arquivo
filename = f"HISTORICO-CHAMADAS-GRAVADAS-{self.start_date}_{self.end_date}.xlsx"
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
reports_dir = os.path.join(BASE_DIR, "reports")
os.makedirs(reports_dir, exist_ok=True)
path = os.path.join(reports_dir, filename)
wb.save(path)
return path
def reportDataXLSX(self, hit_report: Optional[bool] = False) -> str:
self._fetch_mongo_data(all_data=True)
if hit_report:
mysql_data = self._fetch_mysql_data(hit_report=True)
return self._create_excel(mysql_data, hit_report=True)
mysql_data = self._fetch_mysql_data()
return self._create_excel(mysql_data)
def reportData(self, page: int = 1, page_size: int = 20, hit_report: Optional[bool] = False) -> Dict[str, Any]:
mongo_data = self._fetch_mongo_data(page=page, page_size=page_size)
if hit_report:
mysql_data = self._fetch_mysql_data(hit_report=True)
else:
mysql_data = self._fetch_mysql_data()
return {
"pagination": mongo_data,
"data": mysql_data
}