redis.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import socketio
  2. import redis
  3. from redis import asyncio as aioredis
  4. from urllib.parse import urlparse
  5. def parse_redis_sentinel_url(redis_url):
  6. parsed_url = urlparse(redis_url)
  7. if parsed_url.scheme != "redis":
  8. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  9. return {
  10. "username": parsed_url.username or None,
  11. "password": parsed_url.password or None,
  12. "service": parsed_url.hostname or 'mymaster',
  13. "port": parsed_url.port or 6379,
  14. "db": int(parsed_url.path.lstrip("/") or 0),
  15. }
  16. def get_redis_connection(redis_url, sentinels, decode_responses=True):
  17. if sentinels:
  18. redis_config = parse_redis_sentinel_url(redis_url)
  19. sentinel = redis.sentinel.Sentinel(
  20. sentinels,
  21. port=redis_config['port'],
  22. db=redis_config['db'],
  23. username=redis_config['username'],
  24. password=redis_config['password'],
  25. decode_responses=decode_responses
  26. )
  27. # Get a master connection from Sentinel
  28. return sentinel.master_for(redis_config['service'])
  29. else:
  30. # Standard Redis connection
  31. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  32. def get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT):
  33. sentinel_hosts=SENTINEL_HOSTS.split(',')
  34. sentinel_port=int(SENTINEL_PORT)
  35. return [(host, sentinel_port) for host in sentinel_hosts]
  36. class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
  37. def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
  38. username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
  39. """
  40. Initialize the Redis Sentinel Manager.
  41. This implementation mostly replicates the __init__ of AsyncRedisManager and
  42. overrides _redis_connect() with a version that uses Redis Sentinel
  43. :param sentinel_hosts: List of Sentinel hosts
  44. :param sentinel_port: Sentinel Port
  45. :param redis_port: Redis Port (currently unsupported by aioredis!)
  46. :param service: Master service name in Sentinel
  47. :param db: Redis database to use
  48. :param username: Redis username (if any) (currently unsupported by aioredis!)
  49. :param password: Redis password (if any)
  50. :param channel: The channel name on which the server sends and receives
  51. notifications. Must be the same in all the servers.
  52. :param write_only: If set to ``True``, only initialize to emit events. The
  53. default of ``False`` initializes the class for emitting
  54. and receiving.
  55. :param redis_options: additional keyword arguments to be passed to
  56. ``aioredis.from_url()``.
  57. """
  58. self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
  59. self._redis_port=redis_port
  60. self._service = service
  61. self._db = db
  62. self._username = username
  63. self._password = password
  64. self._channel = channel
  65. self.redis_options = redis_options or {}
  66. # connect and call grandparent constructor
  67. self._redis_connect()
  68. super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
  69. def _redis_connect(self):
  70. """Establish connections to Redis through Sentinel."""
  71. sentinel = aioredis.sentinel.Sentinel(
  72. self._sentinels,
  73. port=self._redis_port,
  74. db=self._db,
  75. password=self._password,
  76. **self.redis_options
  77. )
  78. self.redis = sentinel.master_for(self._service)
  79. self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)