Browse Source

fixes & add envs for Sentinel implementation

Jan Kessler 3 months ago
parent
commit
9bf663934a
3 changed files with 44 additions and 39 deletions
  1. 4 0
      backend/open_webui/env.py
  2. 38 37
      backend/open_webui/socket/main.py
  3. 2 2
      backend/open_webui/socket/utils.py

+ 4 - 0
backend/open_webui/env.py

@@ -379,6 +379,10 @@ WEBSOCKET_MANAGER = os.environ.get("WEBSOCKET_MANAGER", "")
 WEBSOCKET_REDIS_URL = os.environ.get("WEBSOCKET_REDIS_URL", REDIS_URL)
 WEBSOCKET_REDIS_LOCK_TIMEOUT = os.environ.get("WEBSOCKET_REDIS_LOCK_TIMEOUT", 60)
 
+WEBSOCKET_SENTINEL_HOSTS = os.environ.get("WEBSOCKET_SENTINEL_HOSTS", "")
+
+WEBSOCKET_SENTINEL_PORT = os.environ.get("WEBSOCKET_SENTINEL_PORT", "26379")
+
 AIOHTTP_CLIENT_TIMEOUT = os.environ.get("AIOHTTP_CLIENT_TIMEOUT", "")
 
 if AIOHTTP_CLIENT_TIMEOUT == "":

+ 38 - 37
backend/open_webui/socket/main.py

@@ -3,7 +3,7 @@ import socketio
 import logging
 import sys
 import time
-from redis.sentinel import Sentinel
+from redis import asyncio as aioredis
 
 from open_webui.models.users import Users, UserNameResponse
 from open_webui.models.channels import Channels
@@ -26,52 +26,53 @@ from open_webui.env import (
 )
 
 class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
-    def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service_name="mymaster", db=0,
-                 username=None, password=None, channel='socketio', write_only=False, **kwargs):
+    def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
+                 username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
         """
         Initialize the Redis Sentinel Manager.
+        This implementation mostly replicates the __init__ of AsyncRedisManager and
+        overrides _redis_connect() with a version that uses Redis Sentinel
 
         :param sentinel_hosts: List of Sentinel hosts
         :param sentinel_port: Sentinel Port
-        :param redis_port: Redis Port
-        :param service_name: Master service name in Sentinel
+        :param redis_port: Redis Port (currently unsupported by aioredis!)
+        :param service: Master service name in Sentinel
         :param db: Redis database to use
-        :param username: Redis username (if any)
+        :param username: Redis username (if any) (currently unsupported by aioredis!)
         :param password: Redis password (if any)
-        :param channel: The Redis channel name
-        :param write_only: If set to True, only initialize the connection to send messages
-        :param kwargs: Additional connection arguments for Redis
+        :param channel: The channel name on which the server sends and receives
+                        notifications. Must be the same in all the servers.
+        :param write_only: If set to ``True``, only initialize to emit events. The
+                           default of ``False`` initializes the class for emitting
+                           and receiving.
+        :param redis_options: additional keyword arguments to be passed to
+                              ``aioredis.from_url()``.
         """
-        self.sentinel_addresses = [(host, sentinel_port) for host in sentinel_hosts]
-        self.redis_port=redis_port
-        self.service_name = service_name
-        self.db = db
-        self.username = username
-        self.password = password
-        self.channel = channel
-        self.write_only = write_only
-        self.redis_kwargs = kwargs
-
-        # Skip parent's init but call grandparent's init
-        socketio.AsyncManager.__init__(self)
+        self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
+        self._redis_port=redis_port
+        self._service = service
+        self._db = db
+        self._username = username
+        self._password = password
+        self._channel = channel
+        self.redis_options = redis_options or {}
+
+        # connect and call grandparent constructor
         self._redis_connect()
+        super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
 
     def _redis_connect(self):
         """Establish connections to Redis through Sentinel."""
-        sentinel = redis.sentinel.Sentinel(
-            self.sentinel_addresses,
-            port=self.redis_port,
-            db=self.db,
-            username=self.username,
-            password=self.password,
-            **self.redis_kwargs
+        sentinel = aioredis.sentinel.Sentinel(
+            self._sentinels,
+            port=self._redis_port,
+            db=self._db,
+            password=self._password,
+            **self.redis_options
         )
 
-        # Get connections to the Redis master and slave
-        self.redis = sentinel.master_for(self.service_name)
-        if not self.write_only:
-            self.pubsub = sentinel.slave_for(self.service_name).pubsub()
-            self.pubsub.subscribe(self.channel)
+        self.redis = sentinel.master_for(self._service)
+        self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
 
 
 logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
@@ -114,15 +115,15 @@ if WEBSOCKET_MANAGER == "redis":
     sentinel_hosts=WEBSOCKET_SENTINEL_HOSTS.split(',')
     sentinel_port=int(WEBSOCKET_SENTINEL_PORT)
     sentinels=[(host, sentinel_port) for host in sentinel_hosts]
-    SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels)
-    USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels)
-    USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels)
+    SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels)
+    USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels)
+    USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels)
 
     clean_up_lock = RedisLock(
         redis_url=WEBSOCKET_REDIS_URL,
         lock_name="usage_cleanup_lock",
         timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT,
-        sentinels,
+        sentinels=sentinels,
     )
     aquire_func = clean_up_lock.aquire_lock
     renew_func = clean_up_lock.renew_lock

+ 2 - 2
backend/open_webui/socket/utils.py

@@ -24,13 +24,13 @@ def get_redis_connection(redis_url, sentinels, decode_responses=True):
     if sentinels:
         redis_config = parse_redis_sentinel_url(redis_url)
         sentinel = redis.sentinel.Sentinel(
-            self.sentinels,
+            sentinels,
             port=redis_config['port'],
             db=redis_config['db'],
             username=redis_config['username'],
             password=redis_config['password'],
             decode_responses=decode_responses
-        }
+        )
 
         # Get a master connection from Sentinel
         return sentinel.master_for(redis_config['service'])