Source code for coredis.sentinel

from __future__ import annotations

import random
import ssl
import weakref
from typing import Any, cast, overload

from coredis import Redis
from coredis._utils import nativestr
from coredis.cache import AbstractCache
from coredis.connection import Connection
from coredis.exceptions import (
    ConnectionError,
    PrimaryNotFoundError,
    ReplicaNotFoundError,
    ResponseError,
    TimeoutError,
)
from coredis.pool import ConnectionPool
from coredis.typing import (
    AnyStr,
    Dict,
    Generic,
    Iterable,
    List,
    Literal,
    Optional,
    StringT,
    Tuple,
    Type,
    Union,
)


[docs] class SentinelManagedConnection(Connection, Generic[AnyStr]): def __init__( self, connection_pool: SentinelConnectionPool, host: str = "127.0.0.1", port: int = 6379, username: Optional[str] = None, password: Optional[str] = None, db: int = 0, stream_timeout: Optional[float] = None, connect_timeout: Optional[float] = None, ssl_context: Optional[ssl.SSLContext] = None, encoding: str = "utf-8", decode_responses: bool = False, socket_keepalive: Optional[bool] = None, socket_keepalive_options: Optional[Dict[int, Union[int, bytes]]] = None, *, client_name: Optional[str] = None, protocol_version: Literal[2, 3] = 2, ): self.connection_pool: SentinelConnectionPool = weakref.proxy(connection_pool) super().__init__( host=host, port=port, username=username, password=password, db=db, stream_timeout=stream_timeout, connect_timeout=connect_timeout, ssl_context=ssl_context, encoding=encoding, decode_responses=decode_responses, socket_keepalive=socket_keepalive, socket_keepalive_options=socket_keepalive_options, client_name=client_name, protocol_version=protocol_version, ) def __repr__(self) -> str: pool = self.connection_pool if self.host: host_info = f",host={self.host},port={self.port}" else: host_info = "" s = f"{type(self).__name__}<service={pool.service_name}{host_info}>" return s async def connect_to(self, address: Tuple[str, int]) -> None: self.host, self.port = address await super().connect()
[docs] async def connect(self) -> None: if not self.is_connected: if self.connection_pool.is_primary: await self.connect_to(await self.connection_pool.get_primary_address()) else: for replica in await self.connection_pool.rotate_replicas(): try: return await self.connect_to(replica) except ConnectionError: continue raise ReplicaNotFoundError # Never be here return None
[docs] class SentinelConnectionPool(ConnectionPool): """ Sentinel backed connection pool. """ primary_address: Optional[Tuple[str, int]] replica_counter: Optional[int] def __init__( self, service_name: StringT, sentinel_manager: Sentinel[Any], is_primary: bool = True, check_connection: bool = True, **kwargs: Any, ): self.is_primary = is_primary kwargs["connection_class"] = cast( Type[Connection], kwargs.get( "connection_class", SentinelManagedConnection[AnyStr] # type: ignore ), ) super().__init__(**kwargs) self.connection_kwargs["connection_pool"] = self self.service_name = nativestr(service_name) self.sentinel_manager = sentinel_manager self.check_connection = check_connection def __repr__(self) -> str: return ( f"{type(self).__name__}" f"<service={self.service_name}" f"({'primary' if self.is_primary else 'replica'})" ) def reset(self) -> None: super().reset() self.primary_address = None self.replica_counter = None async def get_primary_address(self) -> Tuple[str, int]: primary_address = await self.sentinel_manager.discover_primary( self.service_name ) if self.is_primary: if self.primary_address is None: self.primary_address = primary_address elif primary_address != self.primary_address: # Primary address changed, disconnect all clients in this pool self.disconnect() return primary_address
[docs] async def rotate_replicas(self) -> List[Tuple[str, int]]: """Round-robin replicas balancer""" replicas = await self.sentinel_manager.discover_replicas(self.service_name) replica_addresses: List[Tuple[str, int]] = [] if replicas: if self.replica_counter is None: self.replica_counter = random.randint(0, len(replicas) - 1) for _ in range(len(replicas)): self.replica_counter = (self.replica_counter + 1) % len(replicas) replica_addresses.append(replicas[self.replica_counter]) return replica_addresses # Fallback to primary try: return [await self.get_primary_address()] except PrimaryNotFoundError: pass raise ReplicaNotFoundError("No replica found for %r" % (self.service_name))
[docs] class Sentinel(Generic[AnyStr]): """ Example use:: from coredis.sentinel import Sentinel sentinel = Sentinel([('localhost', 26379)], stream_timeout=0.1) async def test(): primary = await sentinel.primary_for('my-instance', stream_timeout=0.1) await primary.set('foo', 'bar') replica = await sentinel.replica_for('my-instance', stream_timeout=0.1) await replica.get('foo') """ @overload def __init__( self: Sentinel[bytes], sentinels: Iterable[Tuple[str, int]], min_other_sentinels: int = ..., sentinel_kwargs: Optional[Dict[str, Any]] = ..., decode_responses: Literal[False] = ..., cache: Optional[AbstractCache] = None, **connection_kwargs: Any, ) -> None: ... @overload def __init__( self: Sentinel[str], sentinels: Iterable[Tuple[str, int]], min_other_sentinels: int = ..., sentinel_kwargs: Optional[Dict[str, Any]] = ..., decode_responses: Literal[True] = ..., cache: Optional[AbstractCache] = None, **connection_kwargs: Any, ) -> None: ... def __init__( self, sentinels: Iterable[Tuple[str, int]], min_other_sentinels: int = 0, sentinel_kwargs: Optional[Dict[str, Any]] = None, decode_responses: bool = False, cache: Optional[AbstractCache] = None, **connection_kwargs: Any, ) -> None: """ Changes - .. versionadded:: 3.10.0 Accept :paramref:`cache` parameter to be used with primaries and replicas returned from the sentinel instance. :param sentinels: is a list of sentinel nodes. Each node is represented by a pair (hostname, port). :param min_other_sentinels: defined a minimum number of peers for a sentinel. When querying a sentinel, if it doesn't meet this threshold, responses from that sentinel won't be considered valid. :param sentinel_kwargs: is a dictionary of connection arguments used when connecting to sentinel instances. Any argument that can be passed to a normal Redis connection can be specified here. If :paramref:`sentinel_kwargs` is not specified, ``stream_timeout``, ``socket_keepalive``, ``decode_responses`` and ``protocol_version`` options specified in :paramref:`connection_kwargs` will be used. :param cache: If provided the cache will be shared between both primaries and replicas returned by this sentinel. :param connection_kwargs: are keyword arguments that will be used when establishing a connection to a Redis server (i.e. are passed on to the constructor of :class:`Redis` for all primary and replicas). """ # if sentinel_kwargs isn't defined, use the socket_* options from # connection_kwargs if not sentinel_kwargs: sentinel_kwargs = { k: v for k, v in iter(connection_kwargs.items()) if k in { "socket_timeout", "socket_keepalive", "encoding", "protocol_version", } } self.sentinel_kwargs = sentinel_kwargs self.min_other_sentinels = min_other_sentinels self.connection_kwargs = connection_kwargs self.__cache = cache self.connection_kwargs["decode_responses"] = self.sentinel_kwargs[ "decode_responses" ] = decode_responses self.sentinels = [ Redis(hostname, port, **self.sentinel_kwargs) for hostname, port in sentinels ] def __repr__(self) -> str: sentinel_addresses: List[str] = [] for sentinel in self.sentinels: sentinel_addresses.append( "{}:{}".format( sentinel.connection_pool.connection_kwargs["host"], sentinel.connection_pool.connection_kwargs["port"], ) ) return "{}<sentinels=[{}]>".format( type(self).__name__, ",".join(sentinel_addresses) ) def __check_primary_state( self, state: Dict[str, Union[int, bool, str]], ) -> bool: if not state["is_master"] or state["is_sdown"] or state["is_odown"]: return False if int(state["num-other-sentinels"]) < self.min_other_sentinels: return False return True def __filter_replicas( self, replicas: Iterable[Dict[str, Union[str, int, bool]]] ) -> List[Tuple[str, int]]: """Removes replicas that are in an ODOWN or SDOWN state""" replicas_alive: List[Tuple[str, int]] = [] for replica in replicas: if replica["is_odown"] or replica["is_sdown"]: continue replicas_alive.append((nativestr(replica["ip"]), int(replica["port"]))) return replicas_alive
[docs] async def discover_primary(self, service_name: str) -> Tuple[str, int]: """ Asks sentinel servers for the Redis primary's address corresponding to the service labeled :paramref:`service_name`. :return: A pair (address, port) or raises :exc:`~coredis.exceptions.PrimaryNotFoundError` if no primary is found. """ for sentinel_no, sentinel in enumerate(self.sentinels): try: primaries = await sentinel.sentinel_masters() except (ConnectionError, TimeoutError): continue state = primaries.get(service_name) if state and self.__check_primary_state(state): # Put this sentinel at the top of the list self.sentinels[0], self.sentinels[sentinel_no] = ( sentinel, self.sentinels[0], ) return nativestr(state["ip"]), int(state["port"]) raise PrimaryNotFoundError(f"No primary found for {service_name!r}")
[docs] async def discover_replicas(self, service_name: str) -> List[Tuple[str, int]]: """Returns a list of alive replicas for service :paramref:`service_name`""" for sentinel in self.sentinels: try: replicas = await sentinel.sentinel_replicas(service_name) except (ConnectionError, ResponseError, TimeoutError): continue filtered_replicas = self.__filter_replicas(replicas) if filtered_replicas: return filtered_replicas return []
@overload def primary_for( self: Sentinel[bytes], service_name: str, *, redis_class: Type[Redis[bytes]] = ..., connection_pool_class: Type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[bytes]: ... @overload def primary_for( self: Sentinel[str], service_name: str, *, redis_class: Type[Redis[str]] = ..., connection_pool_class: Type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[str]: ...
[docs] def primary_for( self, service_name: str, *, redis_class: Type[Redis[Any]] = Redis[Any], connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, **kwargs: Any, ) -> Union[Redis[bytes], Redis[str]]: """ Returns a redis client instance for the :paramref:`service_name` primary. A :class:`coredis.sentinel.SentinelConnectionPool` class is used to retrive the primary's address before establishing a new connection. NOTE: If the primary's address has changed, any cached connections to the old primary are closed. By default clients will be a :class:`~coredis.Redis` instances. Specify a different class to the :paramref:`redis_class` argument if you desire something different. The :paramref:`connection_pool_class` specifies the connection pool to use. The :class:`~coredis.sentinel.SentinelConnectionPool` will be used by default. All other keyword arguments are merged with any :paramref:`Sentinel.connection_kwargs` passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ kwargs["is_primary"] = True connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return redis_class( connection_pool=connection_pool_class( service_name, self, **connection_kwargs, ), cache=self.__cache, )
@overload def replica_for( self: Sentinel[bytes], service_name: str, redis_class: Type[Redis[bytes]] = ..., connection_pool_class: Type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[bytes]: ... @overload def replica_for( self: Sentinel[str], service_name: str, redis_class: Type[Redis[str]] = ..., connection_pool_class: Type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[str]: ...
[docs] def replica_for( self, service_name: str, redis_class: Type[Redis[Any]] = Redis[Any], connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, **kwargs: Any, ) -> Union[Redis[bytes], Redis[str]]: """ Returns redis client instance for the :paramref:`service_name` replica(s). A SentinelConnectionPool class is used to retrieve the replica's address before establishing a new connection. By default clients will be a redis.Redis instance. Specify a different class to the :paramref:`redis_class` argument if you desire something different. The :paramref:`connection_pool_class` specifies the connection pool to use. The SentinelConnectionPool will be used by default. All other keyword arguments are merged with any :paramref:`Sentinel.connection_kwargs` passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ kwargs["is_primary"] = False connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return redis_class( connection_pool=connection_pool_class( service_name, self, **connection_kwargs, ), cache=self.__cache, )