from flask import current_app from datetime import datetime from decimal import Decimal from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl.styles import Font, PatternFill import pandas as pd import os from typing import List, Dict, Any, Optional from app.utils.mysql_query import execute_query from app.utils.calc_api_usage import calculate_api_usage from app.services.redis_service import cache_del, cache_get, cache_set from app.utils.current_date import is_current_date from app.utils.hash_key import set_hash_key import json class TranscriptionReportService: def __init__(self, company_id: str, start_date: str, end_date: str): self.company_id = str(company_id) self.start_date = start_date self.end_date = end_date self.end_date = end_date self.mongo_client = current_app.mongo_client self.mongo_results = [] self.unique_ids= [] def _fetch_mongo_data(self, page: int = 1, page_size: int = 20, all_data: Optional[bool] = False) -> Dict[str, int]: collection = self.mongo_client["billing-api"]["api_usages"] # Define os produtos válidos a partir da tabela de preços pricing_collection = self.mongo_client["billing-api"]["api_pricings"] result_stt = pricing_collection.find({"product": {"$nin": ["whatsapp"]}}) products = [t["product"] for t in result_stt] match_stage = { "$match": { "companyId": self.company_id, "product": {"$in": products}, "createdAt": { "$gte": datetime.strptime(f"{self.start_date} 00:00:00", "%Y-%m-%d %H:%M:%S"), "$lte": datetime.strptime(f"{self.end_date} 23:59:59", "%Y-%m-%d %H:%M:%S") } } } lookup_stage = { "$lookup": { "from": "api_pricings", "localField": "product", "foreignField": "product", "as": "pricing" } } unwind_stage = { "$unwind": "$pricing" } # Agrupa por sessionId + type + product group_stage_1 = { "$group": { "_id": { "sessionId": "$sessionId", "type": "$pricing.type", "product": "$product", "provider": "$pricing.provider" }, "usage": {"$sum": {"$toDouble": "$usage"}}, "totalCost": {"$sum": {"$toDouble": "$total_cost"}}, "callerIds": {"$addToSet": "$callerId"}, "firstCreatedAt": {"$min": "$createdAt"} } } # Agrupa por sessionId final, montando maps de uso e custo group_stage_2 = { "$group": { "_id": "$_id.sessionId", "count": {"$sum": 1}, "firstCreatedAt": {"$first": "$firstCreatedAt"}, "callerIds": {"$first": "$callerIds"}, "totalCost": {"$sum": "$totalCost"}, "usageByType": { "$push": { "k": "$_id.type", "v": { "product": "$_id.product", "type": "$_id.type", "provider": "$_id.provider", "usage": "$usage", "usageCost": "$totalCost" } } } } } project_stage = { "$project": { "count": 1, "firstCreatedAt": 1, "callerIds": 1, "totalCost": 1, "usageByType": {"$arrayToObject": "$usageByType"} } } sort_stage = {"$sort": {"firstCreatedAt": 1}} # Monta o pipeline pipeline = [ match_stage, lookup_stage, unwind_stage, group_stage_1, group_stage_2, project_stage, sort_stage ] if not all_data: pipeline.extend([ {"$skip": (page - 1) * page_size}, {"$limit": page_size} ]) # Executa agregação principal self.mongo_results = list(collection.aggregate(pipeline)) self.unique_ids = [doc["_id"] for doc in self.mongo_results] # print("=====> self.mongo_results: ", self.mongo_results) # exit(1) # Pipeline para contagem total count_pipeline = [ match_stage, lookup_stage, unwind_stage, group_stage_1, group_stage_2, {"$count": "total"} ] count_result = list(collection.aggregate(count_pipeline)) total = count_result[0]["total"] if count_result else 0 return { "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } def _fetch_mysql_data(self, hit_report: Optional[bool] = False)-> List[Dict[str, Any]]: collection = self.mongo_client["billing-api"]["api_products"] products = list(collection.find({})) sql = f"""SELECT uniqueid, src, dst, MIN(calldate) AS start_call, MAX(calldate) AS end_call, SUM(CASE WHEN dstchannel LIKE 'PJSIP/%' AND lastapp = 'Queue' THEN billsec ELSE 0 END) AS total_billsec FROM tab_cdr WHERE uniqueid IN {tuple(self.unique_ids)} GROUP BY uniqueid, src, dst;""" rows = execute_query(self.company_id, sql) if hit_report: for row in rows: row["companyId"] = self.company_id if rowMongo := next((m for m in self.mongo_results if m["_id"] == row["uniqueid"] ), None): row["custo_hit"] = f"{float(rowMongo["totalCost"])}" token_output = rowMongo.get('usageByType', {}).get('output', {}) token_input = rowMongo.get('usageByType', {}).get('input',{}) row["qtd_token_input"] = token_input.get('usage', 0) row["qtd_token_output"] = token_output.get('usage', 0) row["total_cost_token"] = float(token_input.get('usageCost',0) + token_output.get('usageCost', 0)) row["llm_provider"] = token_output.get('provider','unknown') tts = rowMongo.get('usageByType', {}).get('stt',{}) row["tts_model"] = tts.get('product', 'unknown') row["tts_provider"] = tts.get('provider', 'unknown') row["tts_cost"] = tts.get('usageCost', 0) row["tts_usage"] = tts.get('usage', 0) row["total_min"] = f"{(int(row['total_billsec']) / 60):.2f}" self.client_price_row(products, row) self.formate_properties(row) else: for row in rows: row["total_min"] = f"{(int(row['total_billsec']) / 60):.2f}" self.client_price_row(products, row) self.formate_properties(row) return rows def formate_properties(self, row): for key in row: if isinstance(row[key], datetime): row[key] = row[key].isoformat(sep=' ') elif isinstance(row[key], Decimal): row[key] = float(row[key]) elif key == "uniqueid": row[key] = str(row[key]) def client_price_row(self, products, row): if products and len(products) > 0 and products[0]["priceHistory"]: matched_period = None last_period = None for period in products[0]["priceHistory"]: last_period = period start = period['startDate'].date() end = period['endDate'].date() if period['endDate'] else None start_call = row['start_call'].date() if end: if start <= start_call <= end: matched_period = period break else: if start_call >= start: matched_period = period break f"{(int(row['total_billsec']) / 60):.2f}" if matched_period: row['client_total_cost'] = f"{((int(row['total_billsec']) / 60) * matched_period['price'])}" row["client_price"] = matched_period['price'] else: row['client_total_cost'] = f"{((int(row['total_billsec']) / 60) * last_period['price'])}" row["client_price"] = last_period['price'] def _create_excel(self, data: list, hit_report: Optional[bool] = False) -> str: if hit_report: header_mapping = { "companyId": "Empresa", "uniqueid": "Identificador da chamada", "src": "Origem", "dst": "Destino", "total_billsec": "Quantidade de segundos", "total_min": "Duração (Em minutos)", "custo_hit": "Custo HIT", "qtd_token_input": "Quantidade de tokens(input)", "qtd_token_output": "Quantidade de tokens(output)", "total_cost_token": "Preço Final LLM", "llm_provider": "Provider LLM", "tts_model": "TTS", "tts_provider": "Provider TTS", "tts_cost": "Preço Final TTS", "tts_usage": "Segundos Transcritos", "client_total_cost": "Custo Cliente", "client_price": "Preço Cliente por Minuto", "start_call": "Inicio", "end_call": "Fim" } else: header_mapping = { "uniqueid": "Identificador da chamada", "src": "Origem", "dst": "Destino", "start_call": "Inicio da Chamada", "total_billsec": "Duração (Em segundos)", "total_min": "Duração (Em minutos)", "client_total_cost": "Custo Cliente", } # Filtrar e ordenar os dados conforme header_mapping selected_keys = list(header_mapping.keys()) filtered_data = [{k: row.get(k, "") for k in selected_keys} for row in data] df = pd.DataFrame(filtered_data, columns=selected_keys) # Criação do Excel wb = Workbook() ws = wb.active ws.title = "tab_cdr" header_font = Font(bold=True) yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") # Adiciona cabeçalhos personalizados custom_headers = [header_mapping[col] for col in selected_keys] ws.append(custom_headers) for cell in ws[ws.max_row]: cell.font = header_font cell.fill = yellow_fill # Adiciona os dados for row in df.itertuples(index=False, name=None): ws.append(row) # Define caminho e salva o arquivo filename = f"HISTORICO-CHAMADAS-GRAVADAS-{self.start_date}_{self.end_date}.xlsx" BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) reports_dir = os.path.join(BASE_DIR, "reports") os.makedirs(reports_dir, exist_ok=True) path = os.path.join(reports_dir, filename) wb.save(path) return path def _reportDataTotalCost(self): sum_key = set_hash_key(f"{self.company_id}_{self.start_date}_{self.end_date}") ignore_cache = is_current_date(self.start_date, self.end_date) if not ignore_cache: if data := cache_get(f'report_model_usage:total_cost:{sum_key}'): return json.loads(data) self._fetch_mongo_data(all_data=True) mysql_data = self._fetch_mysql_data(hit_report=True) total_cost_hit = sum(float(item.get('custo_hit') or 0) for item in mysql_data) total_client_cost = sum(float(item.get('client_total_cost') or 0) for item in mysql_data) sum_total_cost = { 'company_id': self.company_id, 'start_date': self.start_date, 'end_date': self.end_date, 'total_cost_hit': total_cost_hit, 'total_client_cost': total_client_cost } cache_set(f'report_model_usage:total_cost:{sum_key}', json.dumps(sum_total_cost), 86400) return sum_total_cost def reportDataXLSX(self, hit_report: Optional[bool] = False) -> str: self._fetch_mongo_data(all_data=True) if hit_report: mysql_data = self._fetch_mysql_data(hit_report=True) return self._create_excel(mysql_data, hit_report=True) mysql_data = self._fetch_mysql_data() return self._create_excel(mysql_data) def reportData(self, page: int = 1, page_size: int = 20, hit_report: Optional[bool] = False) -> Dict[str, Any]: mongo_data = self._fetch_mongo_data(page=page, page_size=page_size) if hit_report: mysql_data = self._fetch_mysql_data(hit_report=True) else: mysql_data = self._fetch_mysql_data() return { "pagination": mongo_data, "data": mysql_data, "cost": self._reportDataTotalCost() }