1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import socketio
- import redis
- from redis import asyncio as aioredis
- from urllib.parse import urlparse
- def parse_redis_sentinel_url(redis_url):
- parsed_url = urlparse(redis_url)
- if parsed_url.scheme != "redis":
- raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
- return {
- "username": parsed_url.username or None,
- "password": parsed_url.password or None,
- "service": parsed_url.hostname or 'mymaster',
- "port": parsed_url.port or 6379,
- "db": int(parsed_url.path.lstrip("/") or 0),
- }
- def get_redis_connection(redis_url, sentinels, decode_responses=True):
- if sentinels:
- redis_config = parse_redis_sentinel_url(redis_url)
- sentinel = redis.sentinel.Sentinel(
- sentinels,
- port=redis_config['port'],
- db=redis_config['db'],
- username=redis_config['username'],
- password=redis_config['password'],
- decode_responses=decode_responses
- )
- # Get a master connection from Sentinel
- return sentinel.master_for(redis_config['service'])
- else:
- # Standard Redis connection
- return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
- def get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT):
- sentinel_hosts=SENTINEL_HOSTS.split(',')
- sentinel_port=int(SENTINEL_PORT)
- return [(host, sentinel_port) for host in sentinel_hosts]
- class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
- def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
- username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
- """
- Initialize the Redis Sentinel Manager.
- This implementation mostly replicates the __init__ of AsyncRedisManager and
- overrides _redis_connect() with a version that uses Redis Sentinel
- :param sentinel_hosts: List of Sentinel hosts
- :param sentinel_port: Sentinel Port
- :param redis_port: Redis Port (currently unsupported by aioredis!)
- :param service: Master service name in Sentinel
- :param db: Redis database to use
- :param username: Redis username (if any) (currently unsupported by aioredis!)
- :param password: Redis password (if any)
- :param channel: The channel name on which the server sends and receives
- notifications. Must be the same in all the servers.
- :param write_only: If set to ``True``, only initialize to emit events. The
- default of ``False`` initializes the class for emitting
- and receiving.
- :param redis_options: additional keyword arguments to be passed to
- ``aioredis.from_url()``.
- """
- self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
- self._redis_port=redis_port
- self._service = service
- self._db = db
- self._username = username
- self._password = password
- self._channel = channel
- self.redis_options = redis_options or {}
- # connect and call grandparent constructor
- self._redis_connect()
- super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
- def _redis_connect(self):
- """Establish connections to Redis through Sentinel."""
- sentinel = aioredis.sentinel.Sentinel(
- self._sentinels,
- port=self._redis_port,
- db=self._db,
- password=self._password,
- **self.redis_options
- )
- self.redis = sentinel.master_for(self._service)
- self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
|