2025-06-24 19:08:08 +00:00
|
|
|
from flask_restx import Resource
|
|
|
|
from flask import request, current_app
|
2025-06-09 11:13:05 +00:00
|
|
|
from flask_jwt_extended import create_access_token
|
|
|
|
from app.db.models import UserModel
|
2025-06-24 19:08:08 +00:00
|
|
|
from app.docs.auth_models import auth_ns, signup_model, signin_model
|
|
|
|
from app.schemas.auth_sigin_schema import SigInRequest
|
|
|
|
from app.schemas.auth_sigup_schema import SigUpRequest
|
2025-06-09 11:13:05 +00:00
|
|
|
|
|
|
|
@auth_ns.route('/signup')
|
|
|
|
class SignUp(Resource):
|
|
|
|
@auth_ns.expect(signup_model)
|
|
|
|
def post(self):
|
|
|
|
data = request.get_json()
|
2025-06-24 19:08:08 +00:00
|
|
|
|
|
|
|
validated = SigUpRequest(**data)
|
|
|
|
|
|
|
|
roles = data.get("roles", [])
|
|
|
|
|
2025-06-09 11:13:05 +00:00
|
|
|
user_model = UserModel()
|
2025-06-24 19:08:08 +00:00
|
|
|
if user_model.find_by_email(validated.email):
|
2025-06-09 11:13:05 +00:00
|
|
|
return {'message': 'User already exists'}, 400
|
2025-06-24 19:08:08 +00:00
|
|
|
|
|
|
|
result = user_model.create_user(validated.email, validated.password)
|
|
|
|
|
|
|
|
user_model.update_user(result.inserted_id, {"email": validated.email, "roles": roles})
|
2025-06-09 11:13:05 +00:00
|
|
|
|
2025-06-24 19:08:08 +00:00
|
|
|
return {'message': 'success'}, 201
|
2025-06-09 11:13:05 +00:00
|
|
|
|
|
|
|
@auth_ns.route('/login')
|
|
|
|
class Login(Resource):
|
2025-06-24 19:08:08 +00:00
|
|
|
@auth_ns.expect(signin_model)
|
2025-06-09 11:13:05 +00:00
|
|
|
def post(self):
|
|
|
|
data = request.get_json()
|
2025-06-24 19:08:08 +00:00
|
|
|
|
|
|
|
validated = SigInRequest(**data)
|
|
|
|
|
2025-06-09 11:13:05 +00:00
|
|
|
user_model = UserModel()
|
2025-06-24 19:08:08 +00:00
|
|
|
user = user_model.find_by_email(validated.email)
|
2025-06-09 11:13:05 +00:00
|
|
|
|
|
|
|
if not user or not user_model.verify_password(user['password'], data['password']):
|
2025-06-24 19:08:08 +00:00
|
|
|
return {'message': 'Invalid credentials'}, 401
|
|
|
|
|
|
|
|
roles = user.get("roles", []) if user else []
|
|
|
|
|
|
|
|
access_token = create_access_token(
|
|
|
|
identity=user['email'],
|
|
|
|
additional_claims={"roles": roles}
|
|
|
|
)
|
2025-06-09 11:13:05 +00:00
|
|
|
|
|
|
|
return {'access_token': access_token}, 200
|