Source code for coredis.client.sentinel

from __future__ import annotations

import warnings
from contextlib import AsyncExitStack, asynccontextmanager
from typing import Any, overload

from anyio import AsyncContextManagerMixin

from coredis._utils import nativestr
from coredis.connection._tcp import TCPLocation
from coredis.exceptions import (
    ConnectionError,
    PrimaryNotFoundError,
    ResponseError,
)
from coredis.patterns.cache import AbstractCache
from coredis.pool._sentinel import SentinelConnectionPool
from coredis.retry import NoRetryPolicy, RetryPolicy
from coredis.typing import (
    AnyStr,
    AsyncGenerator,
    Generic,
    Iterable,
    Literal,
    ResponsePrimitive,
    Self,
    TypeAdapter,
)

from .basic import Redis


[docs] class Sentinel(AsyncContextManagerMixin, Generic[AnyStr]): """ Example use:: from coredis import Sentinel from coredis.connection import TCPLocation async def test(): async with Sentinel([TCPLocation('localhost', 26379)], stream_timeout=0.1) as sentinel: primary = sentinel.primary_for('my-instance', stream_timeout=0.1) replica = sentinel.replica_for('my-instance', stream_timeout=0.1) async with primary, replica: await primary.set('foo', 'bar') await replica.get('foo') """ @overload def __init__( self: Sentinel[bytes], sentinels: Iterable[TCPLocation | tuple[str, int]], min_other_sentinels: int = ..., sentinel_kwargs: dict[str, Any] | None = ..., decode_responses: Literal[False] = ..., cache: AbstractCache | None = None, type_adapter: TypeAdapter | None = ..., retry_policy: RetryPolicy = ..., **connection_kwargs: Any, ) -> None: ... @overload def __init__( self: Sentinel[str], sentinels: Iterable[TCPLocation | tuple[str, int]], min_other_sentinels: int = ..., sentinel_kwargs: dict[str, Any] | None = ..., decode_responses: Literal[True] = ..., cache: AbstractCache | None = None, type_adapter: TypeAdapter | None = None, retry_policy: RetryPolicy = ..., **connection_kwargs: Any, ) -> None: ... def __init__( self, sentinels: Iterable[TCPLocation | tuple[str, int]], min_other_sentinels: int = 0, sentinel_kwargs: dict[str, Any] | None = None, decode_responses: bool = False, cache: AbstractCache | None = None, type_adapter: TypeAdapter | None = None, retry_policy: RetryPolicy = NoRetryPolicy(), **connection_kwargs: Any, ) -> None: """ Changes - .. versionadded:: 3.10.0 Accept :paramref:`cache` parameter to be used with primaries and replicas returned from the sentinel instance. :param sentinels: is a list of sentinel nodes. Each node is represented by a pair (hostname, port). :param min_other_sentinels: defined a minimum number of peers for a sentinel. When querying a sentinel, if it doesn't meet this threshold, responses from that sentinel won't be considered valid. :param sentinel_kwargs: is a dictionary of connection arguments used when connecting to sentinel instances. Any argument that can be passed to a normal Redis connection can be specified here. If :paramref:`sentinel_kwargs` is not specified, ``stream_timeout``, ``socket_keepalive`` and ``decode_responses`` options specified in :paramref:`connection_kwargs` will be used. :param cache: If provided the cache will be shared between both primaries and replicas returned by this sentinel. :param type_adapter: The adapter to use for serializing / deserializing customs types when interacting with redis commands. If provided this adapter will be used for both primaries and replicas returned by this sentinel. :param retry_policy: The retry policy to use when interacting with the the primary and replica instances. :param connection_kwargs: are keyword arguments that will be used when establishing a connection to a Redis server (i.e. are passed on to the constructor of :class:`Redis` for all primary and replicas). """ # if sentinel_kwargs isn't defined, use the socket_* options from # connection_kwargs if sentinel_kwargs is None: sentinel_kwargs = { k: v for k, v in connection_kwargs.items() if k in { "connect_timeout", "socket_timeout", "socket_keepalive", "encoding", } } self.sentinel_kwargs = sentinel_kwargs self.min_other_sentinels = min_other_sentinels self.connection_kwargs = connection_kwargs self.__cache = cache self.__type_adapter = type_adapter self.__retry_policy = retry_policy self.connection_kwargs["decode_responses"] = self.sentinel_kwargs["decode_responses"] = ( decode_responses ) if any(isinstance(sentinel, tuple) for sentinel in sentinels): warnings.warn( "Use coredis.connection.TCPLocation to specify sentinels", DeprecationWarning, stacklevel=2, ) sentinel_locations = [ location if isinstance(location, TCPLocation) else TCPLocation(*location) for location in sentinels ] self.sentinels = [ Redis( location.host, location.port, retry_policy=self.__retry_policy, **self.sentinel_kwargs, ) for location in sentinel_locations ] @asynccontextmanager async def __asynccontextmanager__(self) -> AsyncGenerator[Self]: async with AsyncExitStack() as stack: for sentinel in self.sentinels: await stack.enter_async_context(sentinel) yield self def __repr__(self) -> str: sentinels = [ f"{location}" for sentinel in self.sentinels if (location := sentinel.connection_pool.location) ] return f"Sentinel<sentinels=[{','.join(sentinels)}]>" def __filter_replicas( self, replicas: Iterable[dict[str, ResponsePrimitive]] ) -> list[tuple[str, int]]: """Removes replicas that are in an ODOWN or SDOWN state""" replicas_alive: list[tuple[str, int]] = [] for replica in replicas: if replica["is_odown"] or replica["is_sdown"] or replica["is_disconnected"]: continue ip, port = replica["ip"], replica["port"] assert ip is not None and port is not None replicas_alive.append((nativestr(ip), int(port))) return replicas_alive
[docs] async def discover_primary(self, service_name: str) -> tuple[str, int]: """ Asks sentinel servers for the Redis primary's location corresponding to the service labeled :paramref:`service_name`. :return: A pair (location, port) or raises :exc:`~coredis.exceptions.PrimaryNotFoundError` if no primary is found. """ for idx, sentinel in enumerate(self.sentinels): try: primary_location = await sentinel.sentinel_get_master_addr_by_name(service_name) except (ConnectionError, TimeoutError): continue if not primary_location: continue # Put this sentinel at the top of the list sentinel = self.sentinels.pop(idx) self.sentinels.insert(0, sentinel) return nativestr(primary_location[0]), primary_location[1] raise PrimaryNotFoundError(f"No primary found for {service_name!r}")
[docs] async def discover_replicas(self, service_name: str) -> list[tuple[str, int]]: """Returns a list of alive replicas for service :paramref:`service_name`""" for sentinel in self.sentinels: try: replicas = await sentinel.sentinel_replicas(service_name) except (ConnectionError, ResponseError, TimeoutError): continue return self.__filter_replicas(replicas) return []
@overload def primary_for( self: Sentinel[bytes], service_name: str, *, redis_class: type[Redis[bytes]] = ..., connection_pool_class: type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[bytes]: ... @overload def primary_for( self: Sentinel[str], service_name: str, *, redis_class: type[Redis[str]] = ..., connection_pool_class: type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[str]: ...
[docs] def primary_for( self, service_name: str, *, redis_class: type[Redis[Any]] = Redis[Any], connection_pool_class: type[SentinelConnectionPool] = SentinelConnectionPool, **kwargs: Any, ) -> Redis[bytes] | Redis[str]: """ Returns a redis client instance for the :paramref:`service_name` primary. A :class:`coredis.sentinel.SentinelConnectionPool` class is used to retrive the primary's location before establishing a new connection. NOTE: If the primary's location has changed, any cached connections to the old primary are closed. By default clients will be a :class:`~coredis.Redis` instances. Specify a different class to the :paramref:`redis_class` argument if you desire something different. The :paramref:`connection_pool_class` specifies the connection pool to use. The :class:`~coredis.sentinel.SentinelConnectionPool` will be used by default. All other keyword arguments are merged with any :paramref:`Sentinel.connection_kwargs` passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ kwargs["is_primary"] = True connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return redis_class( connection_pool=connection_pool_class( service_name, self, _cache=self.__cache, **connection_kwargs, ), retry_policy=self.__retry_policy, type_adapter=self.__type_adapter, )
@overload def replica_for( self: Sentinel[bytes], service_name: str, redis_class: type[Redis[bytes]] = ..., connection_pool_class: type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[bytes]: ... @overload def replica_for( self: Sentinel[str], service_name: str, redis_class: type[Redis[str]] = ..., connection_pool_class: type[SentinelConnectionPool] = ..., **kwargs: Any, ) -> Redis[str]: ...
[docs] def replica_for( self, service_name: str, redis_class: type[Redis[Any]] = Redis[Any], connection_pool_class: type[SentinelConnectionPool] = SentinelConnectionPool, **kwargs: Any, ) -> Redis[bytes] | Redis[str]: """ Returns redis client instance for the :paramref:`service_name` replica(s). A SentinelConnectionPool class is used to retrieve the replica's location before establishing a new connection. By default clients will be a redis.Redis instance. Specify a different class to the :paramref:`redis_class` argument if you desire something different. The :paramref:`connection_pool_class` specifies the connection pool to use. The SentinelConnectionPool will be used by default. All other keyword arguments are merged with any :paramref:`Sentinel.connection_kwargs` passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ kwargs["is_primary"] = False connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return redis_class( connection_pool=connection_pool_class( service_name, self, _cache=self.__cache, **connection_kwargs, ), retry_policy=self.__retry_policy, type_adapter=self.__type_adapter, )