knowledge.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import json
  2. import logging
  3. import time
  4. from typing import Optional
  5. import uuid
  6. from open_webui.internal.db import Base, get_db
  7. from open_webui.env import SRC_LOG_LEVELS
  8. from open_webui.models.files import FileMetadataResponse
  9. from open_webui.models.groups import Groups
  10. from open_webui.models.users import Users, UserResponse
  11. from pydantic import BaseModel, ConfigDict
  12. from sqlalchemy import BigInteger, Column, String, Text, JSON
  13. from open_webui.utils.access_control import has_access
  14. log = logging.getLogger(__name__)
  15. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  16. ####################
  17. # Knowledge DB Schema
  18. ####################
  19. class Knowledge(Base):
  20. __tablename__ = "knowledge"
  21. id = Column(Text, unique=True, primary_key=True)
  22. user_id = Column(Text)
  23. name = Column(Text)
  24. description = Column(Text)
  25. data = Column(JSON, nullable=True)
  26. meta = Column(JSON, nullable=True)
  27. access_control = Column(JSON, nullable=True) # Controls data access levels.
  28. # Defines access control rules for this entry.
  29. # - `None`: Public access, available to all users with the "user" role.
  30. # - `{}`: Private access, restricted exclusively to the owner.
  31. # - Custom permissions: Specific access control for reading and writing;
  32. # Can specify group or user-level restrictions:
  33. # {
  34. # "read": {
  35. # "group_ids": ["group_id1", "group_id2"],
  36. # "user_ids": ["user_id1", "user_id2"]
  37. # },
  38. # "write": {
  39. # "group_ids": ["group_id1", "group_id2"],
  40. # "user_ids": ["user_id1", "user_id2"]
  41. # }
  42. # }
  43. created_at = Column(BigInteger)
  44. updated_at = Column(BigInteger)
  45. class KnowledgeModel(BaseModel):
  46. model_config = ConfigDict(from_attributes=True)
  47. id: str
  48. user_id: str
  49. name: str
  50. description: str
  51. data: Optional[dict] = None
  52. meta: Optional[dict] = None
  53. access_control: Optional[dict] = None
  54. created_at: int # timestamp in epoch
  55. updated_at: int # timestamp in epoch
  56. ####################
  57. # Forms
  58. ####################
  59. class KnowledgeUserModel(KnowledgeModel):
  60. user: Optional[UserResponse] = None
  61. class KnowledgeResponse(KnowledgeModel):
  62. files: Optional[list[FileMetadataResponse | dict]] = None
  63. class KnowledgeUserResponse(KnowledgeUserModel):
  64. files: Optional[list[FileMetadataResponse | dict]] = None
  65. class KnowledgeForm(BaseModel):
  66. name: str
  67. description: str
  68. data: Optional[dict] = None
  69. access_control: Optional[dict] = None
  70. class KnowledgeTable:
  71. def insert_new_knowledge(
  72. self, user_id: str, form_data: KnowledgeForm
  73. ) -> Optional[KnowledgeModel]:
  74. with get_db() as db:
  75. knowledge = KnowledgeModel(
  76. **{
  77. **form_data.model_dump(),
  78. "id": str(uuid.uuid4()),
  79. "user_id": user_id,
  80. "created_at": int(time.time()),
  81. "updated_at": int(time.time()),
  82. }
  83. )
  84. try:
  85. result = Knowledge(**knowledge.model_dump())
  86. db.add(result)
  87. db.commit()
  88. db.refresh(result)
  89. if result:
  90. return KnowledgeModel.model_validate(result)
  91. else:
  92. return None
  93. except Exception:
  94. return None
  95. def get_knowledge_bases(self) -> list[KnowledgeUserModel]:
  96. with get_db() as db:
  97. all_knowledge = (
  98. db.query(Knowledge).order_by(Knowledge.updated_at.desc()).all()
  99. )
  100. user_ids = list(set(knowledge.user_id for knowledge in all_knowledge))
  101. users = Users.get_users_by_user_ids(user_ids) if user_ids else []
  102. users_dict = {user.id: user for user in users}
  103. knowledge_bases = []
  104. for knowledge in all_knowledge:
  105. user = users_dict.get(knowledge.user_id)
  106. knowledge_bases.append(
  107. KnowledgeUserModel.model_validate(
  108. {
  109. **KnowledgeModel.model_validate(knowledge).model_dump(),
  110. "user": user.model_dump() if user else None,
  111. }
  112. )
  113. )
  114. return knowledge_bases
  115. def check_access_by_user_id(self, id, user_id, permission="write") -> bool:
  116. knowledge = self.get_knowledge_by_id(id)
  117. if not knowledge:
  118. return False
  119. if knowledge.user_id == user_id:
  120. return True
  121. user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user_id)}
  122. return has_access(user_id, permission, knowledge.access_control, user_group_ids)
  123. def get_knowledge_bases_by_user_id(
  124. self, user_id: str, permission: str = "write"
  125. ) -> list[KnowledgeUserModel]:
  126. knowledge_bases = self.get_knowledge_bases()
  127. user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user_id)}
  128. return [
  129. knowledge_base
  130. for knowledge_base in knowledge_bases
  131. if knowledge_base.user_id == user_id
  132. or has_access(
  133. user_id, permission, knowledge_base.access_control, user_group_ids
  134. )
  135. ]
  136. def get_knowledge_by_id(self, id: str) -> Optional[KnowledgeModel]:
  137. try:
  138. with get_db() as db:
  139. knowledge = db.query(Knowledge).filter_by(id=id).first()
  140. return KnowledgeModel.model_validate(knowledge) if knowledge else None
  141. except Exception:
  142. return None
  143. def update_knowledge_by_id(
  144. self, id: str, form_data: KnowledgeForm, overwrite: bool = False
  145. ) -> Optional[KnowledgeModel]:
  146. try:
  147. with get_db() as db:
  148. knowledge = self.get_knowledge_by_id(id=id)
  149. db.query(Knowledge).filter_by(id=id).update(
  150. {
  151. **form_data.model_dump(),
  152. "updated_at": int(time.time()),
  153. }
  154. )
  155. db.commit()
  156. return self.get_knowledge_by_id(id=id)
  157. except Exception as e:
  158. log.exception(e)
  159. return None
  160. def update_knowledge_data_by_id(
  161. self, id: str, data: dict
  162. ) -> Optional[KnowledgeModel]:
  163. try:
  164. with get_db() as db:
  165. knowledge = self.get_knowledge_by_id(id=id)
  166. db.query(Knowledge).filter_by(id=id).update(
  167. {
  168. "data": data,
  169. "updated_at": int(time.time()),
  170. }
  171. )
  172. db.commit()
  173. return self.get_knowledge_by_id(id=id)
  174. except Exception as e:
  175. log.exception(e)
  176. return None
  177. def delete_knowledge_by_id(self, id: str) -> bool:
  178. try:
  179. with get_db() as db:
  180. db.query(Knowledge).filter_by(id=id).delete()
  181. db.commit()
  182. return True
  183. except Exception:
  184. return False
  185. def delete_all_knowledge(self) -> bool:
  186. with get_db() as db:
  187. try:
  188. db.query(Knowledge).delete()
  189. db.commit()
  190. return True
  191. except Exception:
  192. return False
  193. Knowledges = KnowledgeTable()