from flask import current_app from datetime import datetime from decimal import Decimal from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl.styles import Font, PatternFill import pandas as pd import os from typing import List, Dict, Any, Optional from app.utils.mysql_query import execute_query from app.utils.calc_api_usage import calculate_api_usage class TranscriptionReportService: def __init__(self, company_id: str, start_date: str, end_date: str): self.company_id = str(company_id) self.start_date = start_date self.end_date = end_date self.end_date = end_date self.mongo_client = current_app.mongo_client self.mongo_results = [] self.unique_ids= [] def _fetch_mongo_data(self, page: int = 1, page_size: int = 20, all_data: Optional[bool] = False) -> Dict[str, int]: collection = self.mongo_client["billing-api"]["api_pricings"] result_stt = collection.find({"type": "stt"}) products = [t["product"] for t in result_stt] match_stage = { "$match": { "companyId": self.company_id, "product": {"$in": products}, "createdAt": { "$gte": datetime.strptime(f"{self.start_date} 00:00:00", "%Y-%m-%d %H:%M:%S"), "$lte": datetime.strptime(f"{self.end_date} 23:59:59", "%Y-%m-%d %H:%M:%S") } } } group_stage = { "$group": { "_id": "$sessionId", "count": {"$sum": 1}, "firstCreatedAt": {"$first": "$createdAt"}, "callerIds": {"$addToSet": "$callerId"}, "totalCost": {"$sum": {"$toDouble": "$total_cost"}}, "totalUsage": {"$sum": {"$toDouble": "$usage"}}, "price": {"$first": {"$toDouble": "$price"}}, "product": {"$first": "$product"} } } sort_stage = {"$sort": {"firstCreatedAt": 1}} pipeline = [match_stage, group_stage, sort_stage] if not all_data: # Aplica paginação se all_data for False pipeline.extend([ {"$skip": (page - 1) * page_size}, {"$limit": page_size} ]) # Executa pipeline com ou sem paginação collection = self.mongo_client["billing-api"]["api_usages"] self.mongo_results = list(collection.aggregate(pipeline)) self.unique_ids = [doc["_id"] for doc in self.mongo_results] # print("=====> mongoResults: ", self.mongo_results) # Sempre calcula o total (para controle) count_pipeline = [ match_stage, group_stage, {"$count": "total"} ] count_result = list(collection.aggregate(count_pipeline)) total = count_result[0]["total"] if count_result else 0 return { "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } def _fetch_mysql_data(self, hit_report: Optional[bool] = False)-> List[Dict[str, Any]]: sql = f"""SELECT uniqueid, src, dst, MIN(calldate) AS start_call, MAX(calldate) AS end_call, SUM(CASE WHEN dstchannel LIKE 'PJSIP/%' AND lastapp = 'Queue' THEN billsec ELSE 0 END) AS total_billsec FROM tab_cdr WHERE uniqueid IN {tuple(self.unique_ids)} GROUP BY uniqueid, src, dst;""" rows = execute_query(self.company_id, sql) if hit_report: collection = self.mongo_client["billing-api"]["api_pricings"] result_stt = collection.find({"type": "stt"}) result_stt = [{"product": t["product"], "clientPrice": t["clientPrice"]} for t in result_stt if "clientPrice" in t] for row in rows: row["companyId"] = self.company_id if rowMongo := next((m for m in self.mongo_results if m["_id"] == row["uniqueid"] ), None): row["custo_HIT"] = rowMongo["totalCost"] row["price"] = rowMongo["price"] p = [p for p in result_stt if p['product'] == rowMongo["product"]] if len(p) > 0: row["client_price"] = p[0]["clientPrice"] # row["client_total_cost"] = calculate_api_usage(float(p[0]["clientPrice"]), 60, float(rowMongo["totalUsage"])) for key in row: if isinstance(row[key], datetime): row[key] = row[key].isoformat(sep=' ') elif isinstance(row[key], Decimal): row[key] = float(row[key]) elif key == "uniqueid": row[key] = str(row[key]) else: for row in rows: row["total_min"] = f"{(int(row['total_billsec']) / 60):.2f}" del row["end_call"] for key in row: if isinstance(row[key], datetime): row[key] = row[key].isoformat(sep=' ') elif isinstance(row[key], Decimal): row[key] = float(row[key]) elif key == "uniqueid": row[key] = str(row[key]) return rows def _create_excel(self, data: list, hit_report: Optional[bool] = False) -> str: if hit_report: header_mapping = { "companyId": "companyId", "uniqueid": "sessionId", "total_billsec": "tempo (billsec)", "custo_HIT": "custo_HIT", "price": "tarifa ($/min)", "client_price": "valor_cobrado" } else: header_mapping = { "uniqueid": "Identificador da chamada", "src": "Origem", "dst": "Destino", "start_call": "Inicio da Chamada", "total_billsec": "Duração (Em segundos)", "total_min": "Duração (Em minutos)" } # Filtrar e ordenar os dados conforme header_mapping selected_keys = list(header_mapping.keys()) filtered_data = [{k: row.get(k, "") for k in selected_keys} for row in data] df = pd.DataFrame(filtered_data, columns=selected_keys) # Criação do Excel wb = Workbook() ws = wb.active ws.title = "tab_cdr" header_font = Font(bold=True) yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") # Adiciona cabeçalhos personalizados custom_headers = [header_mapping[col] for col in selected_keys] ws.append(custom_headers) for cell in ws[ws.max_row]: cell.font = header_font cell.fill = yellow_fill # Adiciona os dados for row in df.itertuples(index=False, name=None): ws.append(row) # Define caminho e salva o arquivo filename = f"HISTORICO-CHAMADAS-GRAVADAS-{self.start_date}_{self.end_date}.xlsx" BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) reports_dir = os.path.join(BASE_DIR, "reports") os.makedirs(reports_dir, exist_ok=True) path = os.path.join(reports_dir, filename) wb.save(path) return path def reportDataXLSX(self, hit_report: Optional[bool] = False) -> str: self._fetch_mongo_data(all_data=True) if hit_report: mysql_data = self._fetch_mysql_data(hit_report=True) return self._create_excel(mysql_data, hit_report=True) mysql_data = self._fetch_mysql_data() return self._create_excel(mysql_data) def reportData(self, page: int = 1, page_size: int = 20, hit_report: Optional[bool] = False) -> Dict[str, Any]: mongo_data = self._fetch_mongo_data(page=page, page_size=page_size) if hit_report: mysql_data = self._fetch_mysql_data(hit_report=True) else: mysql_data = self._fetch_mysql_data() return { "pagination": mongo_data, "data": mysql_data }