redis.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import socketio
  2. import redis
  3. from redis import asyncio as aioredis
  4. from urllib.parse import urlparse
  5. def parse_redis_sentinel_url(redis_url):
  6. parsed_url = urlparse(redis_url)
  7. if parsed_url.scheme != "redis":
  8. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  9. return {
  10. "username": parsed_url.username or None,
  11. "password": parsed_url.password or None,
  12. "service": parsed_url.hostname or "mymaster",
  13. "port": parsed_url.port or 6379,
  14. "db": int(parsed_url.path.lstrip("/") or 0),
  15. }
  16. def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
  17. if redis_sentinels:
  18. redis_config = parse_redis_sentinel_url(redis_url)
  19. sentinel = redis.sentinel.Sentinel(
  20. redis_sentinels,
  21. port=redis_config["port"],
  22. db=redis_config["db"],
  23. username=redis_config["username"],
  24. password=redis_config["password"],
  25. decode_responses=decode_responses,
  26. )
  27. # Get a master connection from Sentinel
  28. return sentinel.master_for(redis_config["service"])
  29. else:
  30. # Standard Redis connection
  31. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  32. def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
  33. if sentinel_hosts_env:
  34. sentinel_hosts = sentinel_hosts_env.split(",")
  35. sentinel_port = int(sentinel_port_env)
  36. return [(host, sentinel_port) for host in sentinel_hosts]
  37. return []
  38. class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
  39. def __init__(
  40. self,
  41. sentinel_hosts,
  42. sentinel_port=26379,
  43. redis_port=6379,
  44. service="mymaster",
  45. db=0,
  46. username=None,
  47. password=None,
  48. channel="socketio",
  49. write_only=False,
  50. logger=None,
  51. redis_options=None,
  52. ):
  53. """
  54. Initialize the Redis Sentinel Manager.
  55. This implementation mostly replicates the __init__ of AsyncRedisManager and
  56. overrides _redis_connect() with a version that uses Redis Sentinel
  57. :param sentinel_hosts: List of Sentinel hosts
  58. :param sentinel_port: Sentinel Port
  59. :param redis_port: Redis Port (currently unsupported by aioredis!)
  60. :param service: Master service name in Sentinel
  61. :param db: Redis database to use
  62. :param username: Redis username (if any) (currently unsupported by aioredis!)
  63. :param password: Redis password (if any)
  64. :param channel: The channel name on which the server sends and receives
  65. notifications. Must be the same in all the servers.
  66. :param write_only: If set to ``True``, only initialize to emit events. The
  67. default of ``False`` initializes the class for emitting
  68. and receiving.
  69. :param redis_options: additional keyword arguments to be passed to
  70. ``aioredis.from_url()``.
  71. """
  72. self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
  73. self._redis_port = redis_port
  74. self._service = service
  75. self._db = db
  76. self._username = username
  77. self._password = password
  78. self._channel = channel
  79. self.redis_options = redis_options or {}
  80. # connect and call grandparent constructor
  81. self._redis_connect()
  82. super(socketio.AsyncRedisManager, self).__init__(
  83. channel=channel, write_only=write_only, logger=logger
  84. )
  85. def _redis_connect(self):
  86. """Establish connections to Redis through Sentinel."""
  87. sentinel = aioredis.sentinel.Sentinel(
  88. self._sentinels,
  89. port=self._redis_port,
  90. db=self._db,
  91. password=self._password,
  92. **self.redis_options,
  93. )
  94. self.redis = sentinel.master_for(self._service)
  95. self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)