users.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. from pymongo import ReturnDocument
  4. import time
  5. from utils import decode_token
  6. from config import DB
  7. ####################
  8. # User DB Schema
  9. ####################
  10. class UserModel(BaseModel):
  11. id: str
  12. name: str
  13. email: str
  14. role: str = "user"
  15. created_at: int # timestamp in epoch
  16. ####################
  17. # Forms
  18. ####################
  19. class UsersTable:
  20. def __init__(self, db):
  21. self.db = db
  22. self.table = db.users
  23. def insert_new_user(
  24. self, id: str, name: str, email: str, role: str = "user"
  25. ) -> Optional[UserModel]:
  26. user = UserModel(
  27. **{
  28. "id": id,
  29. "name": name,
  30. "email": email,
  31. "role": role,
  32. "created_at": int(time.time()),
  33. }
  34. )
  35. result = self.table.insert_one(user.model_dump())
  36. if result:
  37. return user
  38. else:
  39. return None
  40. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  41. user = self.table.find_one({"email": email}, {"_id": False})
  42. if user:
  43. return UserModel(**user)
  44. else:
  45. return None
  46. def get_user_by_token(self, token: str) -> Optional[UserModel]:
  47. data = decode_token(token)
  48. if data != None and "email" in data:
  49. return self.get_user_by_email(data["email"])
  50. else:
  51. return None
  52. def get_users(self, skip: int = 0, limit: int = 50) -> Optional[UserModel]:
  53. return [
  54. UserModel(**user)
  55. for user in list(self.table.find({}, {"_id": False}))
  56. .skip(skip)
  57. .limit(limit)
  58. ]
  59. Users = UsersTable(DB)