users.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. ####################
  10. # User DB Schema
  11. ####################
  12. class User(Model):
  13. id = CharField(unique=True)
  14. name = CharField()
  15. email = CharField()
  16. role = CharField()
  17. profile_image_url = CharField()
  18. timestamp = DateField()
  19. class Meta:
  20. database = DB
  21. class UserModel(BaseModel):
  22. class Config:
  23. orm_mode = True
  24. id: str
  25. name: str
  26. email: str
  27. role: str = "pending"
  28. profile_image_url: str = "/user.png"
  29. timestamp: int # timestamp in epoch
  30. ####################
  31. # Forms
  32. ####################
  33. class UserRoleUpdateForm(BaseModel):
  34. id: str
  35. role: str
  36. class UsersTable:
  37. def __init__(self, db):
  38. self.db = db
  39. self.db.create_tables([User])
  40. def insert_new_user(
  41. self, id: str, name: str, email: str, role: str = "pending"
  42. ) -> Optional[UserModel]:
  43. user = UserModel(
  44. **{
  45. "id": id,
  46. "name": name,
  47. "email": email,
  48. "role": role,
  49. "profile_image_url": get_gravatar_url(email),
  50. "timestamp": int(time.time()),
  51. }
  52. )
  53. result = User.create(**user.model_dump())
  54. if result:
  55. return user
  56. else:
  57. return None
  58. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  59. try:
  60. user = User.get(User.id == id)
  61. return UserModel(**model_to_dict(user))
  62. except:
  63. return None
  64. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  65. try:
  66. user = User.get(User.email == email)
  67. return UserModel(**model_to_dict(user))
  68. except:
  69. return None
  70. def get_user_by_token(self, token: str) -> Optional[UserModel]:
  71. data = decode_token(token)
  72. if data != None and "email" in data:
  73. return self.get_user_by_email(data["email"])
  74. else:
  75. return None
  76. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  77. return [
  78. UserModel(**model_to_dict(user))
  79. for user in User.select().limit(limit).offset(skip)
  80. ]
  81. def get_num_users(self) -> Optional[int]:
  82. return User.select().count()
  83. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  84. try:
  85. query = User.update(role=role).where(User.id == id)
  86. query.execute()
  87. user = User.get(User.id == id)
  88. return UserModel(**model_to_dict(user))
  89. except:
  90. return None
  91. Users = UsersTable(DB)