123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import socketio
- import redis
- from redis import asyncio as aioredis
- from urllib.parse import urlparse
- def parse_redis_sentinel_url(redis_url):
- parsed_url = urlparse(redis_url)
- if parsed_url.scheme != "redis":
- raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
- return {
- "username": parsed_url.username or None,
- "password": parsed_url.password or None,
- "service": parsed_url.hostname or "mymaster",
- "port": parsed_url.port or 6379,
- "db": int(parsed_url.path.lstrip("/") or 0),
- }
- def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
- if redis_sentinels:
- redis_config = parse_redis_sentinel_url(redis_url)
- sentinel = redis.sentinel.Sentinel(
- redis_sentinels,
- port=redis_config["port"],
- db=redis_config["db"],
- username=redis_config["username"],
- password=redis_config["password"],
- decode_responses=decode_responses,
- )
- # Get a master connection from Sentinel
- return sentinel.master_for(redis_config["service"])
- else:
- # Standard Redis connection
- return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
- def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
- if sentinel_hosts_env:
- sentinel_hosts = sentinel_hosts_env.split(",")
- sentinel_port = int(sentinel_port_env)
- return [(host, sentinel_port) for host in sentinel_hosts]
- return []
- class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
- def __init__(
- self,
- sentinel_hosts,
- sentinel_port=26379,
- redis_port=6379,
- service="mymaster",
- db=0,
- username=None,
- password=None,
- channel="socketio",
- write_only=False,
- logger=None,
- redis_options=None,
- ):
- """
- Initialize the Redis Sentinel Manager.
- This implementation mostly replicates the __init__ of AsyncRedisManager and
- overrides _redis_connect() with a version that uses Redis Sentinel
- :param sentinel_hosts: List of Sentinel hosts
- :param sentinel_port: Sentinel Port
- :param redis_port: Redis Port (currently unsupported by aioredis!)
- :param service: Master service name in Sentinel
- :param db: Redis database to use
- :param username: Redis username (if any) (currently unsupported by aioredis!)
- :param password: Redis password (if any)
- :param channel: The channel name on which the server sends and receives
- notifications. Must be the same in all the servers.
- :param write_only: If set to ``True``, only initialize to emit events. The
- default of ``False`` initializes the class for emitting
- and receiving.
- :param redis_options: additional keyword arguments to be passed to
- ``aioredis.from_url()``.
- """
- self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
- self._redis_port = redis_port
- self._service = service
- self._db = db
- self._username = username
- self._password = password
- self._channel = channel
- self.redis_options = redis_options or {}
- # connect and call grandparent constructor
- self._redis_connect()
- super(socketio.AsyncRedisManager, self).__init__(
- channel=channel, write_only=write_only, logger=logger
- )
- def _redis_connect(self):
- """Establish connections to Redis through Sentinel."""
- sentinel = aioredis.sentinel.Sentinel(
- self._sentinels,
- port=self._redis_port,
- db=self._db,
- password=self._password,
- **self.redis_options,
- )
- self.redis = sentinel.master_for(self._service)
- self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
|