redis.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import socketio
  2. from urllib.parse import urlparse
  3. from typing import Optional
  4. def parse_redis_service_url(redis_url):
  5. parsed_url = urlparse(redis_url)
  6. if parsed_url.scheme != "redis":
  7. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  8. return {
  9. "username": parsed_url.username or None,
  10. "password": parsed_url.password or None,
  11. "service": parsed_url.hostname or "mymaster",
  12. "port": parsed_url.port or 6379,
  13. "db": int(parsed_url.path.lstrip("/") or 0),
  14. }
  15. def get_redis_connection(
  16. redis_url, redis_sentinels, async_mode=False, decode_responses=True
  17. ):
  18. if async_mode:
  19. import redis.asyncio as redis
  20. # If using sentinel in async mode
  21. if redis_sentinels:
  22. redis_config = parse_redis_service_url(redis_url)
  23. sentinel = redis.sentinel.Sentinel(
  24. redis_sentinels,
  25. port=redis_config["port"],
  26. db=redis_config["db"],
  27. username=redis_config["username"],
  28. password=redis_config["password"],
  29. decode_responses=decode_responses,
  30. )
  31. return sentinel.master_for(redis_config["service"])
  32. elif redis_url:
  33. return redis.from_url(redis_url, decode_responses=decode_responses)
  34. else:
  35. return None
  36. else:
  37. import redis
  38. if redis_sentinels:
  39. redis_config = parse_redis_service_url(redis_url)
  40. sentinel = redis.sentinel.Sentinel(
  41. redis_sentinels,
  42. port=redis_config["port"],
  43. db=redis_config["db"],
  44. username=redis_config["username"],
  45. password=redis_config["password"],
  46. decode_responses=decode_responses,
  47. )
  48. return sentinel.master_for(redis_config["service"])
  49. elif redis_url:
  50. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  51. else:
  52. return None
  53. def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
  54. if sentinel_hosts_env:
  55. sentinel_hosts = sentinel_hosts_env.split(",")
  56. sentinel_port = int(sentinel_port_env)
  57. return [(host, sentinel_port) for host in sentinel_hosts]
  58. return []
  59. def get_sentinel_url_from_env(redis_url, sentinel_hosts_env, sentinel_port_env):
  60. redis_config = parse_redis_service_url(redis_url)
  61. username = redis_config["username"] or ""
  62. password = redis_config["password"] or ""
  63. auth_part = ""
  64. if username or password:
  65. auth_part = f"{username}:{password}@"
  66. hosts_part = ",".join(
  67. f"{host}:{sentinel_port_env}" for host in sentinel_hosts_env.split(",")
  68. )
  69. return f"redis+sentinel://{auth_part}{hosts_part}/{redis_config['db']}/{redis_config['service']}"