54 lines
1.8 KiB
Python
54 lines
1.8 KiB
Python
from flask_bcrypt import generate_password_hash, check_password_hash
|
|
from flask import current_app
|
|
from pymongo import MongoClient
|
|
from bson.objectid import ObjectId
|
|
from bson.errors import InvalidId
|
|
|
|
class UserModel:
|
|
def __init__(self):
|
|
client = current_app.mongo_client
|
|
self.collection = client["billing-api"]["users"]
|
|
|
|
def create_user(self, email, password):
|
|
hashed = generate_password_hash(password).decode('utf-8')
|
|
user = {"email": email, "password": hashed}
|
|
return self.collection.insert_one(user)
|
|
|
|
def find_by_email(self, email, except_user_id=None):
|
|
|
|
if except_user_id:
|
|
object_id = ObjectId(except_user_id)
|
|
return self.collection.find_one({'email': email, "_id": {"$ne": object_id}})
|
|
|
|
return self.collection.find_one({"email": email})
|
|
|
|
|
|
def get_user_by_id(self, user_id):
|
|
object_id = ObjectId(user_id)
|
|
return self.collection.find_one({"_id": object_id},{"email": 1, "roles": 1,})
|
|
|
|
def update_user(self, user_id, update_data: dict):
|
|
object_id = ObjectId(user_id)
|
|
|
|
if "password" in update_data:
|
|
update_data["password"] = generate_password_hash(update_data["password"]).decode('utf-8')
|
|
|
|
result = self.collection.update_one(
|
|
{"_id": object_id},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
return result.modified_count > 0
|
|
|
|
def delete_user(self, user_id):
|
|
object_id = ObjectId(user_id)
|
|
result = self.collection.delete_one({"_id": object_id})
|
|
return result.deleted_count > 0
|
|
|
|
def list_users(self):
|
|
users = self.collection.find({}, {"email": 1, "roles": 1,})
|
|
return users
|
|
|
|
def verify_password(self, hashed_password, plain_password):
|
|
return check_password_hash(hashed_password, plain_password)
|