Source code for coredis.connection._sentinel

from __future__ import annotations

from typing import cast

from coredis.exceptions import StalePrimaryError
from coredis.typing import ResponseType, Unpack

from ._base import BaseConnectionParams
from ._tcp import TCPConnection, TCPLocation


[docs] class SentinelManagedConnection(TCPConnection): def __init__( self, location: TCPLocation, primary_name: str, is_primary: bool, **kwargs: Unpack[BaseConnectionParams], ): self.primary_name = primary_name self.is_primary = is_primary super().__init__(location=location, **kwargs) if self.is_primary: self.register_connect_callback(self._verify_primary_role) async def _verify_primary_role(self, connection: SentinelManagedConnection) -> None: role = cast(list[ResponseType], await connection.create_request(b"ROLE", decode=True)) if not role[0] == "master": raise StalePrimaryError( f"Primary {self.primary_name} at {self.location} reports a role of {role[0]!r}" ) def __repr__(self) -> str: host_info = f",host={self.location.host},port={self.location.port}" return f"{type(self).__name__}<service={self.primary_name}{host_info}>"