123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- import json
- import uuid
- from open_webui.utils.redis import get_redis_connection
- from open_webui.env import REDIS_KEY_PREFIX
- from typing import Optional, List, Tuple
- import pycrdt as Y
- class RedisLock:
- def __init__(
- self,
- redis_url,
- lock_name,
- timeout_secs,
- redis_sentinels=[],
- redis_cluster=False,
- ):
- self.lock_name = lock_name
- self.lock_id = str(uuid.uuid4())
- self.timeout_secs = timeout_secs
- self.lock_obtained = False
- self.redis = get_redis_connection(
- redis_url,
- redis_sentinels,
- redis_cluster=redis_cluster,
- decode_responses=True,
- )
- def aquire_lock(self):
- # nx=True will only set this key if it _hasn't_ already been set
- self.lock_obtained = self.redis.set(
- self.lock_name, self.lock_id, nx=True, ex=self.timeout_secs
- )
- return self.lock_obtained
- def renew_lock(self):
- # xx=True will only set this key if it _has_ already been set
- return self.redis.set(
- self.lock_name, self.lock_id, xx=True, ex=self.timeout_secs
- )
- def release_lock(self):
- lock_value = self.redis.get(self.lock_name)
- if lock_value and lock_value == self.lock_id:
- self.redis.delete(self.lock_name)
- class RedisDict:
- def __init__(self, name, redis_url, redis_sentinels=[], redis_cluster=False):
- self.name = name
- self.redis = get_redis_connection(
- redis_url,
- redis_sentinels,
- redis_cluster=redis_cluster,
- decode_responses=True,
- )
- def __setitem__(self, key, value):
- serialized_value = json.dumps(value)
- self.redis.hset(self.name, key, serialized_value)
- def __getitem__(self, key):
- value = self.redis.hget(self.name, key)
- if value is None:
- raise KeyError(key)
- return json.loads(value)
- def __delitem__(self, key):
- result = self.redis.hdel(self.name, key)
- if result == 0:
- raise KeyError(key)
- def __contains__(self, key):
- return self.redis.hexists(self.name, key)
- def __len__(self):
- return self.redis.hlen(self.name)
- def keys(self):
- return self.redis.hkeys(self.name)
- def values(self):
- return [json.loads(v) for v in self.redis.hvals(self.name)]
- def items(self):
- return [(k, json.loads(v)) for k, v in self.redis.hgetall(self.name).items()]
- def get(self, key, default=None):
- try:
- return self[key]
- except KeyError:
- return default
- def clear(self):
- self.redis.delete(self.name)
- def update(self, other=None, **kwargs):
- if other is not None:
- for k, v in other.items() if hasattr(other, "items") else other:
- self[k] = v
- for k, v in kwargs.items():
- self[k] = v
- def setdefault(self, key, default=None):
- if key not in self:
- self[key] = default
- return self[key]
- class YdocManager:
- def __init__(
- self,
- redis=None,
- redis_key_prefix: str = f"{REDIS_KEY_PREFIX}:ydoc:documents",
- ):
- self._updates = {}
- self._users = {}
- self._redis = redis
- self._redis_key_prefix = redis_key_prefix
- async def append_to_updates(self, document_id: str, update: bytes):
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:updates"
- await self._redis.rpush(redis_key, json.dumps(list(update)))
- else:
- if document_id not in self._updates:
- self._updates[document_id] = []
- self._updates[document_id].append(update)
- async def get_updates(self, document_id: str) -> List[bytes]:
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:updates"
- updates = await self._redis.lrange(redis_key, 0, -1)
- return [bytes(json.loads(update)) for update in updates]
- else:
- return self._updates.get(document_id, [])
- async def document_exists(self, document_id: str) -> bool:
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:updates"
- return await self._redis.exists(redis_key) > 0
- else:
- return document_id in self._updates
- async def get_users(self, document_id: str) -> List[str]:
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:users"
- users = await self._redis.smembers(redis_key)
- return list(users)
- else:
- return self._users.get(document_id, [])
- async def add_user(self, document_id: str, user_id: str):
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:users"
- await self._redis.sadd(redis_key, user_id)
- else:
- if document_id not in self._users:
- self._users[document_id] = set()
- self._users[document_id].add(user_id)
- async def remove_user(self, document_id: str, user_id: str):
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:users"
- await self._redis.srem(redis_key, user_id)
- else:
- if document_id in self._users and user_id in self._users[document_id]:
- self._users[document_id].remove(user_id)
- async def remove_user_from_all_documents(self, user_id: str):
- if self._redis:
- keys = await self._redis.keys(f"{self._redis_key_prefix}:*")
- for key in keys:
- if key.endswith(":users"):
- await self._redis.srem(key, user_id)
- document_id = key.split(":")[-2]
- if len(await self.get_users(document_id)) == 0:
- await self.clear_document(document_id)
- else:
- for document_id in list(self._users.keys()):
- if user_id in self._users[document_id]:
- self._users[document_id].remove(user_id)
- if not self._users[document_id]:
- del self._users[document_id]
- await self.clear_document(document_id)
- async def clear_document(self, document_id: str):
- document_id = document_id.replace(":", "_")
- if self._redis:
- redis_key = f"{self._redis_key_prefix}:{document_id}:updates"
- await self._redis.delete(redis_key)
- redis_users_key = f"{self._redis_key_prefix}:{document_id}:users"
- await self._redis.delete(redis_users_key)
- else:
- if document_id in self._updates:
- del self._updates[document_id]
- if document_id in self._users:
- del self._users[document_id]
|