redis.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import inspect
  2. from urllib.parse import urlparse
  3. import logging
  4. import redis
  5. from open_webui.env import REDIS_SENTINEL_MAX_RETRY_COUNT
  6. log = logging.getLogger(__name__)
  7. class SentinelRedisProxy:
  8. def __init__(self, sentinel, service, *, async_mode: bool = True, **kw):
  9. self._sentinel = sentinel
  10. self._service = service
  11. self._kw = kw
  12. self._async_mode = async_mode
  13. def _master(self):
  14. return self._sentinel.master_for(self._service, **self._kw)
  15. def __getattr__(self, item):
  16. master = self._master()
  17. orig_attr = getattr(master, item)
  18. if not callable(orig_attr):
  19. return orig_attr
  20. FACTORY_METHODS = {"pipeline", "pubsub", "monitor", "client", "transaction"}
  21. if item in FACTORY_METHODS:
  22. return orig_attr
  23. if self._async_mode:
  24. async def _wrapped(*args, **kwargs):
  25. for i in range(REDIS_SENTINEL_MAX_RETRY_COUNT):
  26. try:
  27. method = getattr(self._master(), item)
  28. result = method(*args, **kwargs)
  29. if inspect.iscoroutine(result):
  30. return await result
  31. return result
  32. except (
  33. redis.exceptions.ConnectionError,
  34. redis.exceptions.ReadOnlyError,
  35. ) as e:
  36. if i < REDIS_SENTINEL_MAX_RETRY_COUNT - 1:
  37. log.debug(
  38. "Redis sentinel fail-over (%s). Retry %s/%s",
  39. type(e).__name__,
  40. i + 1,
  41. REDIS_SENTINEL_MAX_RETRY_COUNT,
  42. )
  43. continue
  44. log.error(
  45. "Redis operation failed after %s retries: %s",
  46. REDIS_SENTINEL_MAX_RETRY_COUNT,
  47. e,
  48. )
  49. raise e from e
  50. return _wrapped
  51. else:
  52. def _wrapped(*args, **kwargs):
  53. for i in range(REDIS_SENTINEL_MAX_RETRY_COUNT):
  54. try:
  55. method = getattr(self._master(), item)
  56. return method(*args, **kwargs)
  57. except (
  58. redis.exceptions.ConnectionError,
  59. redis.exceptions.ReadOnlyError,
  60. ) as e:
  61. if i < REDIS_SENTINEL_MAX_RETRY_COUNT - 1:
  62. log.debug(
  63. "Redis sentinel fail-over (%s). Retry %s/%s",
  64. type(e).__name__,
  65. i + 1,
  66. REDIS_SENTINEL_MAX_RETRY_COUNT,
  67. )
  68. continue
  69. log.error(
  70. "Redis operation failed after %s retries: %s",
  71. REDIS_SENTINEL_MAX_RETRY_COUNT,
  72. e,
  73. )
  74. raise e from e
  75. return _wrapped
  76. def parse_redis_service_url(redis_url):
  77. parsed_url = urlparse(redis_url)
  78. if parsed_url.scheme != "redis":
  79. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  80. return {
  81. "username": parsed_url.username or None,
  82. "password": parsed_url.password or None,
  83. "service": parsed_url.hostname or "mymaster",
  84. "port": parsed_url.port or 6379,
  85. "db": int(parsed_url.path.lstrip("/") or 0),
  86. }
  87. def get_redis_connection(
  88. redis_url, redis_sentinels, async_mode=False, decode_responses=True
  89. ):
  90. if async_mode:
  91. import redis.asyncio as redis
  92. # If using sentinel in async mode
  93. if redis_sentinels:
  94. redis_config = parse_redis_service_url(redis_url)
  95. sentinel = redis.sentinel.Sentinel(
  96. redis_sentinels,
  97. port=redis_config["port"],
  98. db=redis_config["db"],
  99. username=redis_config["username"],
  100. password=redis_config["password"],
  101. decode_responses=decode_responses,
  102. )
  103. return SentinelRedisProxy(
  104. sentinel,
  105. redis_config["service"],
  106. async_mode=async_mode,
  107. )
  108. elif redis_url:
  109. return redis.from_url(redis_url, decode_responses=decode_responses)
  110. else:
  111. return None
  112. else:
  113. import redis
  114. if redis_sentinels:
  115. redis_config = parse_redis_service_url(redis_url)
  116. sentinel = redis.sentinel.Sentinel(
  117. redis_sentinels,
  118. port=redis_config["port"],
  119. db=redis_config["db"],
  120. username=redis_config["username"],
  121. password=redis_config["password"],
  122. decode_responses=decode_responses,
  123. )
  124. return SentinelRedisProxy(
  125. sentinel,
  126. redis_config["service"],
  127. async_mode=async_mode,
  128. )
  129. elif redis_url:
  130. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  131. else:
  132. return None
  133. def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
  134. if sentinel_hosts_env:
  135. sentinel_hosts = sentinel_hosts_env.split(",")
  136. sentinel_port = int(sentinel_port_env)
  137. return [(host, sentinel_port) for host in sentinel_hosts]
  138. return []
  139. def get_sentinel_url_from_env(redis_url, sentinel_hosts_env, sentinel_port_env):
  140. redis_config = parse_redis_service_url(redis_url)
  141. username = redis_config["username"] or ""
  142. password = redis_config["password"] or ""
  143. auth_part = ""
  144. if username or password:
  145. auth_part = f"{username}:{password}@"
  146. hosts_part = ",".join(
  147. f"{host}:{sentinel_port_env}" for host in sentinel_hosts_env.split(",")
  148. )
  149. return f"redis+sentinel://{auth_part}{hosts_part}/{redis_config['db']}/{redis_config['service']}"