redis.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import socketio
  2. import redis
  3. from redis import asyncio as aioredis
  4. from urllib.parse import urlparse
  5. def parse_redis_sentinel_url(redis_url):
  6. parsed_url = urlparse(redis_url)
  7. if parsed_url.scheme != "redis":
  8. raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
  9. return {
  10. "username": parsed_url.username or None,
  11. "password": parsed_url.password or None,
  12. "service": parsed_url.hostname or 'mymaster',
  13. "port": parsed_url.port or 6379,
  14. "db": int(parsed_url.path.lstrip("/") or 0),
  15. }
  16. def get_redis_connection(redis_url, sentinels, decode_responses=True):
  17. if sentinels:
  18. redis_config = parse_redis_sentinel_url(redis_url)
  19. sentinel = redis.sentinel.Sentinel(
  20. sentinels,
  21. port=redis_config['port'],
  22. db=redis_config['db'],
  23. username=redis_config['username'],
  24. password=redis_config['password'],
  25. decode_responses=decode_responses
  26. )
  27. # Get a master connection from Sentinel
  28. return sentinel.master_for(redis_config['service'])
  29. else:
  30. # Standard Redis connection
  31. return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
  32. class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
  33. def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
  34. username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
  35. """
  36. Initialize the Redis Sentinel Manager.
  37. This implementation mostly replicates the __init__ of AsyncRedisManager and
  38. overrides _redis_connect() with a version that uses Redis Sentinel
  39. :param sentinel_hosts: List of Sentinel hosts
  40. :param sentinel_port: Sentinel Port
  41. :param redis_port: Redis Port (currently unsupported by aioredis!)
  42. :param service: Master service name in Sentinel
  43. :param db: Redis database to use
  44. :param username: Redis username (if any) (currently unsupported by aioredis!)
  45. :param password: Redis password (if any)
  46. :param channel: The channel name on which the server sends and receives
  47. notifications. Must be the same in all the servers.
  48. :param write_only: If set to ``True``, only initialize to emit events. The
  49. default of ``False`` initializes the class for emitting
  50. and receiving.
  51. :param redis_options: additional keyword arguments to be passed to
  52. ``aioredis.from_url()``.
  53. """
  54. self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
  55. self._redis_port=redis_port
  56. self._service = service
  57. self._db = db
  58. self._username = username
  59. self._password = password
  60. self._channel = channel
  61. self.redis_options = redis_options or {}
  62. # connect and call grandparent constructor
  63. self._redis_connect()
  64. super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
  65. def _redis_connect(self):
  66. """Establish connections to Redis through Sentinel."""
  67. sentinel = aioredis.sentinel.Sentinel(
  68. self._sentinels,
  69. port=self._redis_port,
  70. db=self._db,
  71. password=self._password,
  72. **self.redis_options
  73. )
  74. self.redis = sentinel.master_for(self._service)
  75. self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)