transcription-cost-usage-re.../backend/app/routes/usage_routes.py

199 lines
6.7 KiB
Python
Raw Normal View History

2025-06-09 11:13:05 +00:00
import os
from bson import json_util
from flask_restx import Resource
from flask_jwt_extended import jwt_required
from app.schemas.prices_schema import GetTranscriptionRequest
from app.schemas.transcription_schema import TranscriptionRequest
from app.schemas.model_price_update_schema import UpdateModelPriceRequest
from app.schemas.usage_cost_update_schema import UpdateUsageCostRequest
from flask import current_app, request, send_file, after_this_request
from app.services.report_service import TranscriptionReportService
from app.services.mongo_billing_service import MongoBillingService
from app.docs.usage_models import (
usage_cost_model,
model_price_update,
model_prices_query_params,
transcription_data_query_params, usage_ns)
@usage_ns.route('/export/trascription')
class TranscriptionExport(Resource):
@usage_ns.doc(security='Bearer Auth')
@usage_ns.doc(params=transcription_data_query_params)
@usage_ns.response(200, 'success')
@usage_ns.response(400, 'Validation error')
@usage_ns.response(404, 'File not found')
@jwt_required()
def get(self):
"""
Export transcription report in XLSX.
"""
data = {
"company_id": request.args.get("companyId", type=str),
"start_date": request.args.get("startDate"),
"end_date": request.args.get("endDate"),
"who": request.args.get("who", type=str)
}
validated = TranscriptionRequest(**data)
service = TranscriptionReportService(
validated.company_id,
validated.start_date,
validated.end_date
)
if validated.who == "hit":
report_path = service.reportDataXLSX(hit_report=True)
else:
report_path = service.reportDataXLSX()
if not os.path.exists(report_path):
return {"error": "File not found"}, 404
@after_this_request
def remove_file(response):
try:
os.remove(report_path)
except Exception as delete_error:
current_app.logger.warning(f"Error trying to delete file: {delete_error}")
return response
return send_file(
path_or_file=report_path,
as_attachment=True,
download_name=os.path.basename(report_path),
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
@usage_ns.route('/data/trascription')
class TranscriptionUsageData(Resource):
@usage_ns.doc(security='Bearer Auth')
@usage_ns.doc(params=transcription_data_query_params)
@usage_ns.response(200, 'success')
@usage_ns.response(400, 'Validation error')
@jwt_required()
def get(self):
"""
Get transcription report data.
"""
data = {
"company_id": request.args.get("companyId", type=str),
"start_date": request.args.get("startDate"),
"end_date": request.args.get("endDate"),
"who": request.args.get("who", type=str)
}
validated = TranscriptionRequest(**data)
page = request.args.get("page", default=1, type=int)
page_size = request.args.get("page_size", default=20, type=int)
service = TranscriptionReportService(
validated.company_id,
validated.start_date,
validated.end_date
)
if validated.who == "hit":
result = service.reportData(page=page, page_size=page_size,hit_report=True)
else:
result = service.reportData(page=page, page_size=page_size)
return {"success": True, "data": {"data": result["data"], "pagination": result["pagination"]}}, 200
@usage_ns.route('/model/prices')
class TranscriptionModelPrices(Resource):
@usage_ns.doc(security='Bearer Auth')
@usage_ns.doc(params=model_prices_query_params)
@usage_ns.response(200, 'success')
@usage_ns.response(400, 'Validation error')
@jwt_required()
def get(self):
"""
Get model pricing and supplier information
"""
data = {
"type": request.args.get("type", "").split(",") if request.args.get("type") else [],
"provider": request.args.get("provider", "").split(",") if request.args.get("provider") else []
}
validated = GetTranscriptionRequest(**data)
mongo = current_app.mongo_client
collection = mongo["billing-api"]["api_pricings"]
query = {"product": {"$nin": ["whatsapp",]}}
if validated.type:
query["type"] = {"$in": validated.type}
if validated.provider:
query["provider"] = {"$in": validated.provider}
prices_cursor = collection.find(query)
prices_list = list(prices_cursor)
return current_app.response_class(
response=json_util.dumps({"success": True, "data": prices_list}),
mimetype='application/json'
)
@usage_ns.route('/cost')
class UpdateUsageCost(Resource):
@usage_ns.doc(security='Bearer Auth')
@usage_ns.expect(usage_cost_model)
@usage_ns.response(200, 'success')
@usage_ns.response(400, 'Validation error')
@jwt_required()
def patch(self):
"""
Updates the total cost of using products by company
"""
data = request.get_json()
validated = UpdateUsageCostRequest(**data)
service = MongoBillingService()
count_udpated = service.update_usage_total_cost(
product=validated.product,
start_date=validated.start_date,
end_date=validated.end_date,
price=validated.price,
billing_unit=validated.billing_unit,
company_ids=validated.company_ids,
)
return {"success": True, "docs_updated": count_udpated}, 200
@usage_ns.route('/model/prices/<string:id>')
@usage_ns.param('id', 'ID of the model to be updated')
class UpdateModelPrice(Resource):
@usage_ns.doc(security='Bearer Auth')
@usage_ns.expect(model_price_update)
@usage_ns.response(200, 'Sucess')
@usage_ns.response(400, 'Validation error')
@jwt_required()
def patch(self, id):
"""
Updates the price of a specific model by ID.
Only the fields provided in the body will be changed.
"""
data = request.get_json()
data["id"] = id
validated = UpdateModelPriceRequest(**data)
del data["id"]
service = MongoBillingService()
service.update_model_policy_price(validated.id, data)
return {"success": True}, 200