messages.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import json
  2. import time
  3. import uuid
  4. from typing import Optional
  5. from open_webui.internal.db import Base, get_db
  6. from open_webui.models.tags import TagModel, Tag, Tags
  7. from pydantic import BaseModel, ConfigDict
  8. from sqlalchemy import BigInteger, Boolean, Column, String, Text, JSON
  9. from sqlalchemy import or_, func, select, and_, text
  10. from sqlalchemy.sql import exists
  11. ####################
  12. # Message DB Schema
  13. ####################
  14. class MessageReaction(Base):
  15. __tablename__ = "message_reaction"
  16. id = Column(Text, primary_key=True)
  17. user_id = Column(Text)
  18. message_id = Column(Text)
  19. name = Column(Text)
  20. created_at = Column(BigInteger)
  21. class MessageReactionModel(BaseModel):
  22. model_config = ConfigDict(from_attributes=True)
  23. id: str
  24. user_id: str
  25. message_id: str
  26. name: str
  27. created_at: int # timestamp in epoch
  28. class Message(Base):
  29. __tablename__ = "message"
  30. id = Column(Text, primary_key=True)
  31. user_id = Column(Text)
  32. channel_id = Column(Text, nullable=True)
  33. parent_id = Column(Text, nullable=True)
  34. content = Column(Text)
  35. data = Column(JSON, nullable=True)
  36. meta = Column(JSON, nullable=True)
  37. created_at = Column(BigInteger) # time_ns
  38. updated_at = Column(BigInteger) # time_ns
  39. class MessageModel(BaseModel):
  40. model_config = ConfigDict(from_attributes=True)
  41. id: str
  42. user_id: str
  43. channel_id: Optional[str] = None
  44. parent_id: Optional[str] = None
  45. content: str
  46. data: Optional[dict] = None
  47. meta: Optional[dict] = None
  48. created_at: int # timestamp in epoch
  49. updated_at: int # timestamp in epoch
  50. ####################
  51. # Forms
  52. ####################
  53. class MessageForm(BaseModel):
  54. content: str
  55. parent_id: Optional[str] = None
  56. data: Optional[dict] = None
  57. meta: Optional[dict] = None
  58. class Reactions(BaseModel):
  59. name: str
  60. user_ids: list[str]
  61. count: int
  62. class MessageResponse(MessageModel):
  63. latest_reply_at: Optional[int]
  64. reply_count: int
  65. reactions: list[Reactions]
  66. class MessageTable:
  67. def insert_new_message(
  68. self, form_data: MessageForm, channel_id: str, user_id: str
  69. ) -> Optional[MessageModel]:
  70. with get_db() as db:
  71. id = str(uuid.uuid4())
  72. ts = int(time.time_ns())
  73. message = MessageModel(
  74. **{
  75. "id": id,
  76. "user_id": user_id,
  77. "channel_id": channel_id,
  78. "parent_id": form_data.parent_id,
  79. "content": form_data.content,
  80. "data": form_data.data,
  81. "meta": form_data.meta,
  82. "created_at": ts,
  83. "updated_at": ts,
  84. }
  85. )
  86. result = Message(**message.model_dump())
  87. db.add(result)
  88. db.commit()
  89. db.refresh(result)
  90. return MessageModel.model_validate(result) if result else None
  91. def get_message_by_id(self, id: str) -> Optional[MessageResponse]:
  92. with get_db() as db:
  93. message = db.get(Message, id)
  94. if not message:
  95. return None
  96. reactions = self.get_reactions_by_message_id(id)
  97. replies = self.get_replies_by_message_id(id)
  98. return MessageResponse(
  99. **{
  100. **MessageModel.model_validate(message).model_dump(),
  101. "latest_reply_at": replies[0].created_at if replies else None,
  102. "reply_count": len(replies),
  103. "reactions": reactions,
  104. }
  105. )
  106. def get_replies_by_message_id(self, id: str) -> list[MessageModel]:
  107. with get_db() as db:
  108. all_messages = (
  109. db.query(Message)
  110. .filter_by(parent_id=id)
  111. .order_by(Message.created_at.desc())
  112. .all()
  113. )
  114. return [MessageModel.model_validate(message) for message in all_messages]
  115. def get_reply_user_ids_by_message_id(self, id: str) -> list[str]:
  116. with get_db() as db:
  117. return [
  118. message.user_id
  119. for message in db.query(Message).filter_by(parent_id=id).all()
  120. ]
  121. def get_messages_by_channel_id(
  122. self, channel_id: str, skip: int = 0, limit: int = 50
  123. ) -> list[MessageModel]:
  124. with get_db() as db:
  125. all_messages = (
  126. db.query(Message)
  127. .filter_by(channel_id=channel_id, parent_id=None)
  128. .order_by(Message.created_at.desc())
  129. .offset(skip)
  130. .limit(limit)
  131. .all()
  132. )
  133. return [MessageModel.model_validate(message) for message in all_messages]
  134. def get_messages_by_parent_id(
  135. self, channel_id: str, parent_id: str, skip: int = 0, limit: int = 50
  136. ) -> list[MessageModel]:
  137. with get_db() as db:
  138. message = db.get(Message, parent_id)
  139. if not message:
  140. return []
  141. all_messages = (
  142. db.query(Message)
  143. .filter_by(channel_id=channel_id, parent_id=parent_id)
  144. .order_by(Message.created_at.desc())
  145. .offset(skip)
  146. .limit(limit)
  147. .all()
  148. )
  149. # If length of all_messages is less than limit, then add the parent message
  150. if len(all_messages) < limit:
  151. all_messages.append(message)
  152. return [MessageModel.model_validate(message) for message in all_messages]
  153. def update_message_by_id(
  154. self, id: str, form_data: MessageForm
  155. ) -> Optional[MessageModel]:
  156. with get_db() as db:
  157. message = db.get(Message, id)
  158. message.content = form_data.content
  159. message.data = {
  160. **(message.data if message.data else {}),
  161. **(form_data.data if form_data.data else {}),
  162. }
  163. message.meta = {
  164. **(message.meta if message.meta else {}),
  165. **(form_data.meta if form_data.meta else {}),
  166. }
  167. message.updated_at = int(time.time_ns())
  168. db.commit()
  169. db.refresh(message)
  170. return MessageModel.model_validate(message) if message else None
  171. def add_reaction_to_message(
  172. self, id: str, user_id: str, name: str
  173. ) -> Optional[MessageReactionModel]:
  174. with get_db() as db:
  175. reaction_id = str(uuid.uuid4())
  176. reaction = MessageReactionModel(
  177. id=reaction_id,
  178. user_id=user_id,
  179. message_id=id,
  180. name=name,
  181. created_at=int(time.time_ns()),
  182. )
  183. result = MessageReaction(**reaction.model_dump())
  184. db.add(result)
  185. db.commit()
  186. db.refresh(result)
  187. return MessageReactionModel.model_validate(result) if result else None
  188. def get_reactions_by_message_id(self, id: str) -> list[Reactions]:
  189. with get_db() as db:
  190. all_reactions = db.query(MessageReaction).filter_by(message_id=id).all()
  191. reactions = {}
  192. for reaction in all_reactions:
  193. if reaction.name not in reactions:
  194. reactions[reaction.name] = {
  195. "name": reaction.name,
  196. "user_ids": [],
  197. "count": 0,
  198. }
  199. reactions[reaction.name]["user_ids"].append(reaction.user_id)
  200. reactions[reaction.name]["count"] += 1
  201. return [Reactions(**reaction) for reaction in reactions.values()]
  202. def remove_reaction_by_id_and_user_id_and_name(
  203. self, id: str, user_id: str, name: str
  204. ) -> bool:
  205. with get_db() as db:
  206. db.query(MessageReaction).filter_by(
  207. message_id=id, user_id=user_id, name=name
  208. ).delete()
  209. db.commit()
  210. return True
  211. def delete_reactions_by_id(self, id: str) -> bool:
  212. with get_db() as db:
  213. db.query(MessageReaction).filter_by(message_id=id).delete()
  214. db.commit()
  215. return True
  216. def delete_replies_by_id(self, id: str) -> bool:
  217. with get_db() as db:
  218. db.query(Message).filter_by(parent_id=id).delete()
  219. db.commit()
  220. return True
  221. def delete_message_by_id(self, id: str) -> bool:
  222. with get_db() as db:
  223. db.query(Message).filter_by(id=id).delete()
  224. # Delete all reactions to this message
  225. db.query(MessageReaction).filter_by(message_id=id).delete()
  226. db.commit()
  227. return True
  228. Messages = MessageTable()