redis.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import socketio
  2. import redis
  3. from redis import asyncio as aioredis
  4. from urllib.parse import urlparse
  5. def parse_redis_sentinel_url(redis_url):
  6. parsed_url = urlparse(redis_url)
  7. if parsed_url.scheme != "redis":
  8. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  9. return {
  10. "username": parsed_url.username or None,
  11. "password": parsed_url.password or None,
  12. "service": parsed_url.hostname or 'mymaster',
  13. "port": parsed_url.port or 6379,
  14. "db": int(parsed_url.path.lstrip("/") or 0),
  15. }
  16. def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
  17. if redis_sentinels:
  18. redis_config = parse_redis_sentinel_url(redis_url)
  19. sentinel = redis.sentinel.Sentinel(
  20. redis_sentinels,
  21. port=redis_config['port'],
  22. db=redis_config['db'],
  23. username=redis_config['username'],
  24. password=redis_config['password'],
  25. decode_responses=decode_responses
  26. )
  27. # Get a master connection from Sentinel
  28. return sentinel.master_for(redis_config['service'])
  29. else:
  30. # Standard Redis connection
  31. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  32. def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
  33. if sentinel_hosts_env:
  34. sentinel_hosts=sentinel_hosts_env.split(',')
  35. sentinel_port=int(sentinel_port_env)
  36. return [(host, sentinel_port) for host in sentinel_hosts]
  37. return []
  38. class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
  39. def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
  40. username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
  41. """
  42. Initialize the Redis Sentinel Manager.
  43. This implementation mostly replicates the __init__ of AsyncRedisManager and
  44. overrides _redis_connect() with a version that uses Redis Sentinel
  45. :param sentinel_hosts: List of Sentinel hosts
  46. :param sentinel_port: Sentinel Port
  47. :param redis_port: Redis Port (currently unsupported by aioredis!)
  48. :param service: Master service name in Sentinel
  49. :param db: Redis database to use
  50. :param username: Redis username (if any) (currently unsupported by aioredis!)
  51. :param password: Redis password (if any)
  52. :param channel: The channel name on which the server sends and receives
  53. notifications. Must be the same in all the servers.
  54. :param write_only: If set to ``True``, only initialize to emit events. The
  55. default of ``False`` initializes the class for emitting
  56. and receiving.
  57. :param redis_options: additional keyword arguments to be passed to
  58. ``aioredis.from_url()``.
  59. """
  60. self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
  61. self._redis_port=redis_port
  62. self._service = service
  63. self._db = db
  64. self._username = username
  65. self._password = password
  66. self._channel = channel
  67. self.redis_options = redis_options or {}
  68. # connect and call grandparent constructor
  69. self._redis_connect()
  70. super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
  71. def _redis_connect(self):
  72. """Establish connections to Redis through Sentinel."""
  73. sentinel = aioredis.sentinel.Sentinel(
  74. self._sentinels,
  75. port=self._redis_port,
  76. db=self._db,
  77. password=self._password,
  78. **self.redis_options
  79. )
  80. self.redis = sentinel.master_for(self._service)
  81. self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)