Connections & Connection Pooling

Connection Pools

coredis.pool

class ConnectionPool(*, connection_class: type[ConnectionT], location: Location | None = ..., max_connections: int | None = ..., timeout: float | None = ..., _cache: AbstractCache | None = ..., **connection_kwargs: Unpack[BaseConnectionParams])[source]
class ConnectionPool(*, connection_class: None = ..., location: TCPLocation, max_connections: int | None = ..., timeout: float | None = ..., _cache: AbstractCache | None = ..., **connection_kwargs: Unpack[BaseConnectionParams])
class ConnectionPool(*, connection_class: None = ..., location: UnixDomainSocketLocation, max_connections: int | None = ..., timeout: float | None = ..., _cache: AbstractCache | None = ..., **connection_kwargs: Unpack[BaseConnectionParams])
class ConnectionPool(*, connection_class: None = ..., location: None = ..., host: str = ..., port: int = ..., max_connections: int | None = ..., timeout: float | None = ..., _cache: AbstractCache | None = ..., **connection_kwargs: Unpack[BaseConnectionParams])

Blocking connection pool for single instance redis clients

Parameters:
  • connection_class – The connection class to use when creating new connections

  • max_connections – Maximum connections to grow the pool. Once the limit is reached clients will block to wait for a connection to be returned to the pool.

  • timeout – Number of seconds to block when trying to obtain a connection.

  • connection_kwargs – arguments to pass to the connection_class constructor when creating a new connection

classmethod from_url(url: str, *, decode_components: bool = False, **kwargs: Unpack[ConnectionPoolParams[Any]]) ConnectionPool[TCPConnection] | ConnectionPool[UnixDomainSocketConnection][source]
classmethod from_url(url: str, *, decode_components: bool = False, **kwargs: Unpack[ConnectionPoolParams[Any]]) Self

Returns a connection pool configured from the given URL.

For example:

  • redis://[:password]@localhost:6379/0

  • rediss://[:password]@localhost:6379/0

  • unix://[:password]@/path/to/socket.sock?db=0

Three URL schemes are supported:

  • redis:// creates a normal TCP socket connection

  • rediss:// creates a SSL wrapped TCP socket connection

  • unix:// creates a Unix Domain Socket connection

There are several ways to specify a database number. The parse function will return the first specified option:

  • A db querystring option, e.g. redis://localhost?db=0

  • If using the redis:// scheme, the path argument of the url, e.g. redis://localhost/0

  • The db argument to this function.

If none of these options are specified, db=0 is used.

The decode_components argument allows this function to work with percent-encoded URLs. If this argument is set to True all %xx escapes will be replaced by their single-character equivalents after the URL has been parsed. This only applies to the hostname, path, username & password components.

Any additional querystring arguments and keyword arguments will be passed along to the class constructor.

Note

In the case of conflicting arguments, querystring arguments always win.

async get_connection(**_: Any) ConnectionT[source]

Gets or create a connection from the pool. Be careful to only release the connection AFTER all commands are sent, or race conditions are possible.

acquire(**_: Any) AsyncGenerator[ConnectionT][source]

Gets or creates a connection from the pool, then release it afterwards. Multiplexing is automatic if you exit the context manager before waiting for command results.

Caution

Do not explicitly release connections acquired using this context manager.

release(connection: ConnectionT) None[source]

Checks connection for liveness and releases it back to the pool.

disconnect() None[source]

Disconnect all active connections in the pool

class ConnectionPoolParams[source]

Bases: BaseConnectionPoolParams[ConnectionT]

Parameters accepted by coredis.pool.ConnectionPool

stream_timeout: NotRequired[float | None]
connect_timeout: NotRequired[float | None]
encoding: NotRequired[str]
decode_responses: NotRequired[bool]
client_name: NotRequired[str | None]
noreply: NotRequired[bool]
noevict: NotRequired[bool]
notouch: NotRequired[bool]
max_idle_time: NotRequired[int | None]
processing_budget: NotRequired[CapacityLimiter]
username: NotRequired[str | None]
password: NotRequired[str | None]
credential_provider: NotRequired[AbstractCredentialProvider | None]
db: NotRequired[int | None]
ssl_context: NotRequired[SSLContext | None]
connection_class: NotRequired[type[ConnectionT]]
max_connections: NotRequired[int | None]
timeout: NotRequired[float | None]
class ClusterConnectionPool(startup_nodes: Iterable[Node | TCPLocation], *, connection_class: type[ClusterConnection] = ClusterConnection, max_connections: int | None = None, max_connections_per_node: bool = False, reinitialize_steps: int | None = None, skip_full_coverage_check: bool = False, nodemanager_follow_cluster: bool = True, readonly: bool = False, read_from_replicas: bool = False, timeout: float | None = None, gc_interval: int = 30, _cache: AbstractCache | None = None, **connection_kwargs: Unpack[BaseConnectionParams])[source]

Bases: BaseConnectionPool[ClusterConnection]

Custom connection pool for RedisCluster client

Cluster aware connection pool that tracks and manages sub pools for each node in the redis cluster

Changes
Parameters:
  • startup_nodes – The initial collection of nodes to use to map the cluster solts to individual primary & replica nodes.

  • connection_class – The connection class to use when creating new connections

  • max_connections – Maximum number of connections to allow concurrently from this client. If the value is None it will default to 64.

  • max_connections_per_node – Whether to use the value of max_connections on a per node basis or cluster wide. If False the per-node connection pools will have a maximum size of max_connections divided by the number of nodes in the cluster.

  • timeout – Number of seconds to block when trying to obtain a connection.

  • skip_full_coverage_check – Skips the check of cluster-require-full-coverage config, useful for clusters without the CONFIG command (For example with AWS Elasticache)

  • nodemanager_follow_cluster – The node manager will during initialization try the last set of nodes that it was operating on. This will allow the client to drift along side the cluster if the cluster nodes move around alot.

  • read_from_replicas – If True connections to replicas will be returned for readonly commands

  • gc_interval – Interval (in seconds) for performing a cleanup of the pool to remove any connections that are no longer in the cluster layout.

  • connection_kwargs – arguments to pass to the connection_class constructor when creating a new connection

classmethod from_url(url: str, *, decode_components: bool = False, **kwargs: Unpack[ClusterConnectionPoolParams]) Self[source]

Returns a cluster connection pool configured from the given URL.

async get_connection(node: ClusterNodeLocation | None = None, primary: bool = True, **options: Any) ClusterConnection[source]

Acquires a connection from the cluster pool. If no node is specified a random node is picked. The connection must be returned back to the pool using the release() method.

Parameters:
  • node – The node for which to get a connection from

  • primary – If False a connection from the replica will be returned

acquire(node: ClusterNodeLocation | None = None, primary: bool = True, **options: Any) AsyncGenerator[ClusterConnection][source]

Acquires a connection from the cluster pool. If no node is specified a random node is picked. The connection will be automatically released back to the pool when the context manager exits.

Parameters:
  • node – The node for which to get a connection from

  • primary – If False a connection from the replica will be returned

release(connection: ClusterConnection) None[source]

Releases the connection back to the pool

disconnect() None[source]

Disconnect all active connections in the pool

class ClusterConnectionPoolParams[source]

Bases: ConnectionPoolParams[ClusterConnection]

Parameters accepted by coredis.pool.ClusterConnectionPool

startup_nodes: NotRequired[Iterable[Node | TCPLocation]]

The initial collection of nodes to use to map the cluster solts to individual primary & replica nodes.

skip_full_coverage_check: NotRequired[bool]

Skips the check of cluster-require-full-coverage config, useful for clusters without the CONFIG command (For example with AWS Elasticache)

max_connections_per_node: NotRequired[bool]

Whether to use the value of max_connections on a per node basis or cluster wide. If False the per-node connection pools will have a maximum size of max_connections divided by the number of nodes in the cluster.

read_from_replicas: NotRequired[bool]

If True connections to replicas will be returned for readonly commands

gc_interval: NotRequired[int]

Interval (in seconds) for performing a cleanup of the pool to remove any connections that are no longer in the cluster layout.

stream_timeout: NotRequired[float | None]
connect_timeout: NotRequired[float | None]
encoding: NotRequired[str]
decode_responses: NotRequired[bool]
client_name: NotRequired[str | None]
noreply: NotRequired[bool]
noevict: NotRequired[bool]
notouch: NotRequired[bool]
max_idle_time: NotRequired[int | None]
processing_budget: NotRequired[CapacityLimiter]
username: NotRequired[str | None]
password: NotRequired[str | None]
credential_provider: NotRequired[AbstractCredentialProvider | None]
db: NotRequired[int | None]
ssl_context: NotRequired[SSLContext | None]
connection_class: NotRequired[type[ConnectionT]]
max_connections: NotRequired[int | None]
timeout: NotRequired[float | None]
class SentinelConnectionPool(service_name: StringT, sentinel_manager: Sentinel[Any], is_primary: bool = True, check_connection: bool = False, _cache: AbstractCache | None = None, **kwargs: Unpack[BaseConnectionParams])[source]

Bases: ConnectionPool[SentinelManagedConnection]

Sentinel backed connection pool.

Blocking connection pool for single instance redis clients

Parameters:
  • connection_class – The connection class to use when creating new connections

  • max_connections – Maximum connections to grow the pool. Once the limit is reached clients will block to wait for a connection to be returned to the pool.

  • timeout – Number of seconds to block when trying to obtain a connection.

  • connection_kwargs – arguments to pass to the connection_class constructor when creating a new connection

async get_primary_location() TCPLocation[source]

Returns the location of the primary instance

async get_replica() TCPLocation[source]

Returns the location of a replica using a round robin approach

acquire(**_: Any) AsyncGenerator[ConnectionT]

Gets or creates a connection from the pool, then release it afterwards. Multiplexing is automatic if you exit the context manager before waiting for command results.

Caution

Do not explicitly release connections acquired using this context manager.

disconnect() None

Disconnect all active connections in the pool

classmethod from_url(url: str, *, decode_components: bool = False, **kwargs: Unpack[ConnectionPoolParams[Any]]) Self | ConnectionPool[TCPConnection] | ConnectionPool[UnixDomainSocketConnection]

Returns a connection pool configured from the given URL.

For example:

  • redis://[:password]@localhost:6379/0

  • rediss://[:password]@localhost:6379/0

  • unix://[:password]@/path/to/socket.sock?db=0

Three URL schemes are supported:

  • redis:// creates a normal TCP socket connection

  • rediss:// creates a SSL wrapped TCP socket connection

  • unix:// creates a Unix Domain Socket connection

There are several ways to specify a database number. The parse function will return the first specified option:

  • A db querystring option, e.g. redis://localhost?db=0

  • If using the redis:// scheme, the path argument of the url, e.g. redis://localhost/0

  • The db argument to this function.

If none of these options are specified, db=0 is used.

The decode_components argument allows this function to work with percent-encoded URLs. If this argument is set to True all %xx escapes will be replaced by their single-character equivalents after the URL has been parsed. This only applies to the hostname, path, username & password components.

Any additional querystring arguments and keyword arguments will be passed along to the class constructor.

Note

In the case of conflicting arguments, querystring arguments always win.

async get_connection(**_: Any) ConnectionT

Gets or create a connection from the pool. Be careful to only release the connection AFTER all commands are sent, or race conditions are possible.

release(connection: ConnectionT) None

Checks connection for liveness and releases it back to the pool.

All connection pools derive from the same base-class:

class BaseConnectionPool(*, connection_class: type[ConnectionT], location: Location | None = None, max_connections: int | None = None, timeout: float | None = None, **connection_kwargs: Unpack[BaseConnectionParams])[source]

Base (Abstract) connection pool

Parameters:
  • connection_class – The connection class to use when creating new connections

  • max_connections – Maximum connections to grow the pool. Once the limit is reached clients will block to wait for a connection to be returned to the pool.

  • timeout – Number of seconds to block when trying to obtain a connection.

  • connection_kwargs – arguments to pass to the connection_class constructor when creating a new connection

abstractmethod async get_connection(**_: Any) ConnectionT[source]

Gets or create a connection from the pool.

abstractmethod acquire(**_: Any) AsyncContextManager[ConnectionT, bool | None][source]

Yields a connection from the pool and releases it back.

abstractmethod release(connection: ConnectionT) None[source]

Release a connection back to the pool.

abstractmethod disconnect() None[source]

Disconnect all active connections in the pool

class BaseConnectionPoolParams[source]

Bases: BaseConnectionParams, Generic[ConnectionT]

Connection pool parameters accepted by coredis.pool.BaseConnectionPool

connection_class: NotRequired[type[ConnectionT]]

The connection class to use when creating new connections

max_connections: NotRequired[int | None]

Maximum connections to grow the pool. Once the limit is reached clients will block to wait for a connection to be returned to the pool.

timeout: NotRequired[float | None]

Number of seconds to block when trying to obtain a connection.

stream_timeout: NotRequired[float | None]
connect_timeout: NotRequired[float | None]
encoding: NotRequired[str]
decode_responses: NotRequired[bool]
client_name: NotRequired[str | None]
noreply: NotRequired[bool]
noevict: NotRequired[bool]
notouch: NotRequired[bool]
max_idle_time: NotRequired[int | None]
processing_budget: NotRequired[CapacityLimiter]
username: NotRequired[str | None]
password: NotRequired[str | None]
credential_provider: NotRequired[AbstractCredentialProvider | None]
db: NotRequired[int | None]
ssl_context: NotRequired[SSLContext | None]

Connection Classes

coredis.connection

class TCPConnection(location: TCPLocation, *, socket_keepalive: bool | None = None, socket_keepalive_options: dict[int, int | bytes] | None = None, **kwargs: Unpack[BaseConnectionParams])[source]

Bases: BaseConnection

Parameters:
  • location – The location of the server this connection is connecting to

  • stream_timeout – Maximum time to wait for receiving a response for requests created through this connection.

  • connect_timeout – Maximum time to wait for establishing a connection

  • max_idle_time – Maximum idle time in seconds before the connection is closed.

  • encoding – Default encoding for command responses.

  • decode_responses – Whether to automatically decode responses.

  • credential_provider – If provided the connection handshake will include authentication using this provider.

  • username – The username to use for authenticating against the redis server

  • password – The password to use for authenticating against the redis server

  • client_name – Optional name to register with the server.

  • db – If provided the connection will immediately switch to this db as part of the handshake

  • noreply – If True, disables replies for all commands.

  • noevict – If True, sets CLIENT NO-EVICT on the connection.

  • notouch – If True, sets CLIENT NO-TOUCH on the connection.

  • ssl_context – For TLS connections, the ssl context to use when performing the TLS handshake.

  • processing_budget – limiter to throttle CPU-bound processing.

create_request(command: bytes, *args: RedisValueT, noreply: bool | None = None, decode: RedisValueT | None = None, encoding: str | None = None, raise_exceptions: bool = True, timeout: float | None = None, disconnect_on_cancellation: bool = False) Request

Queue a command to send to the server and create an associated request that can awaited for the response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request_batch(commands: Sequence[CommandRequest[Any]], timeout: float | None = None) RequestBatch

None :raises: coredis.exceptions.ConnectionError if the connection is

not usable.

invalidate(reason: str | None = None) None

Forcefully mark the connection as unusable and release any associated resources.

Call this when the connection state can no longer be trusted — for example, after a cancelled or timed-out request, or when unconsumed messages remain in the buffer. An invalidated connection should never be returned to the pool.

Note

Calling this method on an already disconnected connection is safe and has no side effects

property push_messages: AsyncGenerator[list[ResponseType], None]

Generator to retrieve RESP3 push type messages sent by the server. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

register_connect_callback(callback: Callable[[Self], None] | Callable[[Self], Awaitable[None]]) None

Registers a callback that will be executed after the initial handshake is performed and before the connection is marked usable for regular commands.

Caution

Any exception raised by a connect callback will not be handled and will result in an unusable connection.

property reusable: bool

Whether the connection can be reused

This property should be tested before returning a connection from the connection pool for reuse. In scenarios such as after using a connection for receiving push messages there might still be unconsumed messages in the internal buffer.

If the connection is not reusable, invalidate it with a call to invalidate() and discard it

async run(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) None

Establish a connection to the redis server and initiate any post connect callbacks.

Note

This method can only be called once for an BaseConnection instance. Once the connection closes either due to cancellation or errors it should be discarded.

send_command(command: bytes, *args: RedisValueT) None

Queue a command to send to the server without associating a request to it. At the moment this is only useful in pubsub scenarios where commands such as SUBSCRIBE, UNSUBSCRIBE etc do not result in a response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property streamed_messages: AsyncGenerator[ResponseType, None]

Generator to retrieve messages received from the server that were not sent as a response to a request made through this connection. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property transport_healthy: bool

Whether the underlying transport stream is healthy

async update_tracking_client(enabled: bool, client_id: int | None = None) bool

Associate this connection to client_id to relay any tracking notifications to.

property usable: bool

Whether the connection is established and initial handshakes were performed without error

class UnixDomainSocketConnection(location: UnixDomainSocketLocation, **kwargs: Unpack[BaseConnectionParams])[source]

Bases: BaseConnection

Parameters:
  • location – The location of the server this connection is connecting to

  • stream_timeout – Maximum time to wait for receiving a response for requests created through this connection.

  • connect_timeout – Maximum time to wait for establishing a connection

  • max_idle_time – Maximum idle time in seconds before the connection is closed.

  • encoding – Default encoding for command responses.

  • decode_responses – Whether to automatically decode responses.

  • credential_provider – If provided the connection handshake will include authentication using this provider.

  • username – The username to use for authenticating against the redis server

  • password – The password to use for authenticating against the redis server

  • client_name – Optional name to register with the server.

  • db – If provided the connection will immediately switch to this db as part of the handshake

  • noreply – If True, disables replies for all commands.

  • noevict – If True, sets CLIENT NO-EVICT on the connection.

  • notouch – If True, sets CLIENT NO-TOUCH on the connection.

  • ssl_context – For TLS connections, the ssl context to use when performing the TLS handshake.

  • processing_budget – limiter to throttle CPU-bound processing.

create_request(command: bytes, *args: RedisValueT, noreply: bool | None = None, decode: RedisValueT | None = None, encoding: str | None = None, raise_exceptions: bool = True, timeout: float | None = None, disconnect_on_cancellation: bool = False) Request

Queue a command to send to the server and create an associated request that can awaited for the response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request_batch(commands: Sequence[CommandRequest[Any]], timeout: float | None = None) RequestBatch

None :raises: coredis.exceptions.ConnectionError if the connection is

not usable.

invalidate(reason: str | None = None) None

Forcefully mark the connection as unusable and release any associated resources.

Call this when the connection state can no longer be trusted — for example, after a cancelled or timed-out request, or when unconsumed messages remain in the buffer. An invalidated connection should never be returned to the pool.

Note

Calling this method on an already disconnected connection is safe and has no side effects

property push_messages: AsyncGenerator[list[ResponseType], None]

Generator to retrieve RESP3 push type messages sent by the server. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

register_connect_callback(callback: Callable[[Self], None] | Callable[[Self], Awaitable[None]]) None

Registers a callback that will be executed after the initial handshake is performed and before the connection is marked usable for regular commands.

Caution

Any exception raised by a connect callback will not be handled and will result in an unusable connection.

property reusable: bool

Whether the connection can be reused

This property should be tested before returning a connection from the connection pool for reuse. In scenarios such as after using a connection for receiving push messages there might still be unconsumed messages in the internal buffer.

If the connection is not reusable, invalidate it with a call to invalidate() and discard it

async run(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) None

Establish a connection to the redis server and initiate any post connect callbacks.

Note

This method can only be called once for an BaseConnection instance. Once the connection closes either due to cancellation or errors it should be discarded.

send_command(command: bytes, *args: RedisValueT) None

Queue a command to send to the server without associating a request to it. At the moment this is only useful in pubsub scenarios where commands such as SUBSCRIBE, UNSUBSCRIBE etc do not result in a response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property streamed_messages: AsyncGenerator[ResponseType, None]

Generator to retrieve messages received from the server that were not sent as a response to a request made through this connection. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property transport_healthy: bool

Whether the underlying transport stream is healthy

async update_tracking_client(enabled: bool, client_id: int | None = None) bool

Associate this connection to client_id to relay any tracking notifications to.

property usable: bool

Whether the connection is established and initial handshakes were performed without error

class ClusterConnection(location: TCPLocation, *, read_from_replicas: bool = False, **kwargs: Unpack[BaseConnectionParams])[source]

Bases: TCPConnection

Manages TCP communication to and from a Redis Cluster node

Parameters:
  • location – The location of the server this connection is connecting to

  • stream_timeout – Maximum time to wait for receiving a response for requests created through this connection.

  • connect_timeout – Maximum time to wait for establishing a connection

  • max_idle_time – Maximum idle time in seconds before the connection is closed.

  • encoding – Default encoding for command responses.

  • decode_responses – Whether to automatically decode responses.

  • credential_provider – If provided the connection handshake will include authentication using this provider.

  • username – The username to use for authenticating against the redis server

  • password – The password to use for authenticating against the redis server

  • client_name – Optional name to register with the server.

  • db – If provided the connection will immediately switch to this db as part of the handshake

  • noreply – If True, disables replies for all commands.

  • noevict – If True, sets CLIENT NO-EVICT on the connection.

  • notouch – If True, sets CLIENT NO-TOUCH on the connection.

  • ssl_context – For TLS connections, the ssl context to use when performing the TLS handshake.

  • processing_budget – limiter to throttle CPU-bound processing.

create_request(command: bytes, *args: RedisValueT, noreply: bool | None = None, decode: RedisValueT | None = None, encoding: str | None = None, raise_exceptions: bool = True, timeout: float | None = None, disconnect_on_cancellation: bool = False) Request

Queue a command to send to the server and create an associated request that can awaited for the response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request_batch(commands: Sequence[CommandRequest[Any]], timeout: float | None = None) RequestBatch

None :raises: coredis.exceptions.ConnectionError if the connection is

not usable.

invalidate(reason: str | None = None) None

Forcefully mark the connection as unusable and release any associated resources.

Call this when the connection state can no longer be trusted — for example, after a cancelled or timed-out request, or when unconsumed messages remain in the buffer. An invalidated connection should never be returned to the pool.

Note

Calling this method on an already disconnected connection is safe and has no side effects

property push_messages: AsyncGenerator[list[ResponseType], None]

Generator to retrieve RESP3 push type messages sent by the server. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

register_connect_callback(callback: Callable[[Self], None] | Callable[[Self], Awaitable[None]]) None

Registers a callback that will be executed after the initial handshake is performed and before the connection is marked usable for regular commands.

Caution

Any exception raised by a connect callback will not be handled and will result in an unusable connection.

property reusable: bool

Whether the connection can be reused

This property should be tested before returning a connection from the connection pool for reuse. In scenarios such as after using a connection for receiving push messages there might still be unconsumed messages in the internal buffer.

If the connection is not reusable, invalidate it with a call to invalidate() and discard it

async run(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) None

Establish a connection to the redis server and initiate any post connect callbacks.

Note

This method can only be called once for an BaseConnection instance. Once the connection closes either due to cancellation or errors it should be discarded.

send_command(command: bytes, *args: RedisValueT) None

Queue a command to send to the server without associating a request to it. At the moment this is only useful in pubsub scenarios where commands such as SUBSCRIBE, UNSUBSCRIBE etc do not result in a response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property streamed_messages: AsyncGenerator[ResponseType, None]

Generator to retrieve messages received from the server that were not sent as a response to a request made through this connection. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property transport_healthy: bool

Whether the underlying transport stream is healthy

async update_tracking_client(enabled: bool, client_id: int | None = None) bool

Associate this connection to client_id to relay any tracking notifications to.

property usable: bool

Whether the connection is established and initial handshakes were performed without error

class SentinelManagedConnection(location: TCPLocation, primary_name: str, is_primary: bool, **kwargs: Unpack[BaseConnectionParams])[source]

Bases: TCPConnection

Parameters:
  • location – The location of the server this connection is connecting to

  • stream_timeout – Maximum time to wait for receiving a response for requests created through this connection.

  • connect_timeout – Maximum time to wait for establishing a connection

  • max_idle_time – Maximum idle time in seconds before the connection is closed.

  • encoding – Default encoding for command responses.

  • decode_responses – Whether to automatically decode responses.

  • credential_provider – If provided the connection handshake will include authentication using this provider.

  • username – The username to use for authenticating against the redis server

  • password – The password to use for authenticating against the redis server

  • client_name – Optional name to register with the server.

  • db – If provided the connection will immediately switch to this db as part of the handshake

  • noreply – If True, disables replies for all commands.

  • noevict – If True, sets CLIENT NO-EVICT on the connection.

  • notouch – If True, sets CLIENT NO-TOUCH on the connection.

  • ssl_context – For TLS connections, the ssl context to use when performing the TLS handshake.

  • processing_budget – limiter to throttle CPU-bound processing.

create_request(command: bytes, *args: RedisValueT, noreply: bool | None = None, decode: RedisValueT | None = None, encoding: str | None = None, raise_exceptions: bool = True, timeout: float | None = None, disconnect_on_cancellation: bool = False) Request

Queue a command to send to the server and create an associated request that can awaited for the response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request_batch(commands: Sequence[CommandRequest[Any]], timeout: float | None = None) RequestBatch

None :raises: coredis.exceptions.ConnectionError if the connection is

not usable.

invalidate(reason: str | None = None) None

Forcefully mark the connection as unusable and release any associated resources.

Call this when the connection state can no longer be trusted — for example, after a cancelled or timed-out request, or when unconsumed messages remain in the buffer. An invalidated connection should never be returned to the pool.

Note

Calling this method on an already disconnected connection is safe and has no side effects

property push_messages: AsyncGenerator[list[ResponseType], None]

Generator to retrieve RESP3 push type messages sent by the server. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

register_connect_callback(callback: Callable[[Self], None] | Callable[[Self], Awaitable[None]]) None

Registers a callback that will be executed after the initial handshake is performed and before the connection is marked usable for regular commands.

Caution

Any exception raised by a connect callback will not be handled and will result in an unusable connection.

property reusable: bool

Whether the connection can be reused

This property should be tested before returning a connection from the connection pool for reuse. In scenarios such as after using a connection for receiving push messages there might still be unconsumed messages in the internal buffer.

If the connection is not reusable, invalidate it with a call to invalidate() and discard it

async run(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) None

Establish a connection to the redis server and initiate any post connect callbacks.

Note

This method can only be called once for an BaseConnection instance. Once the connection closes either due to cancellation or errors it should be discarded.

send_command(command: bytes, *args: RedisValueT) None

Queue a command to send to the server without associating a request to it. At the moment this is only useful in pubsub scenarios where commands such as SUBSCRIBE, UNSUBSCRIBE etc do not result in a response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property streamed_messages: AsyncGenerator[ResponseType, None]

Generator to retrieve messages received from the server that were not sent as a response to a request made through this connection. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property transport_healthy: bool

Whether the underlying transport stream is healthy

async update_tracking_client(enabled: bool, client_id: int | None = None) bool

Associate this connection to client_id to relay any tracking notifications to.

property usable: bool

Whether the connection is established and initial handshakes were performed without error

All connection classes derive from the same base-class:

class BaseConnection(location: Location, *, stream_timeout: float | None = None, connect_timeout: float | None = None, max_idle_time: int | None = None, encoding: str = 'utf-8', decode_responses: bool = False, credential_provider: AbstractCredentialProvider | None = None, username: str | None = None, password: str | None = None, client_name: str | None = None, db: int | None = 0, noreply: bool = False, noevict: bool = False, notouch: bool = False, ssl_context: SSLContext | None = None, processing_budget: CapacityLimiter | None = None)[source]

Bases: TelemetryAttributeProvider

Base class for Redis connections.

Manages a low-level connection to a single Redis server: sending commands, queuing requests, receiving and parsing responses, handling RESP3 push messages, and connection lifecycle management.

Subclasses must implement the _connect() method to establish the underlying transport (TCP, UNIX socket, etc.).

Parameters:
  • location – The location of the server this connection is connecting to

  • stream_timeout – Maximum time to wait for receiving a response for requests created through this connection.

  • connect_timeout – Maximum time to wait for establishing a connection

  • max_idle_time – Maximum idle time in seconds before the connection is closed.

  • encoding – Default encoding for command responses.

  • decode_responses – Whether to automatically decode responses.

  • credential_provider – If provided the connection handshake will include authentication using this provider.

  • username – The username to use for authenticating against the redis server

  • password – The password to use for authenticating against the redis server

  • client_name – Optional name to register with the server.

  • db – If provided the connection will immediately switch to this db as part of the handshake

  • noreply – If True, disables replies for all commands.

  • noevict – If True, sets CLIENT NO-EVICT on the connection.

  • notouch – If True, sets CLIENT NO-TOUCH on the connection.

  • ssl_context – For TLS connections, the ssl context to use when performing the TLS handshake.

  • processing_budget – limiter to throttle CPU-bound processing.

property transport_healthy: bool

Whether the underlying transport stream is healthy

property usable: bool

Whether the connection is established and initial handshakes were performed without error

property reusable: bool

Whether the connection can be reused

This property should be tested before returning a connection from the connection pool for reuse. In scenarios such as after using a connection for receiving push messages there might still be unconsumed messages in the internal buffer.

If the connection is not reusable, invalidate it with a call to invalidate() and discard it

register_connect_callback(callback: Callable[[Self], None] | Callable[[Self], Awaitable[None]]) None[source]

Registers a callback that will be executed after the initial handshake is performed and before the connection is marked usable for regular commands.

Caution

Any exception raised by a connect callback will not be handled and will result in an unusable connection.

async run(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) None[source]

Establish a connection to the redis server and initiate any post connect callbacks.

Note

This method can only be called once for an BaseConnection instance. Once the connection closes either due to cancellation or errors it should be discarded.

invalidate(reason: str | None = None) None[source]

Forcefully mark the connection as unusable and release any associated resources.

Call this when the connection state can no longer be trusted — for example, after a cancelled or timed-out request, or when unconsumed messages remain in the buffer. An invalidated connection should never be returned to the pool.

Note

Calling this method on an already disconnected connection is safe and has no side effects

async update_tracking_client(enabled: bool, client_id: int | None = None) bool[source]

Associate this connection to client_id to relay any tracking notifications to.

property push_messages: AsyncGenerator[list[ResponseType], None]

Generator to retrieve RESP3 push type messages sent by the server. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

property streamed_messages: AsyncGenerator[ResponseType, None]

Generator to retrieve messages received from the server that were not sent as a response to a request made through this connection. The generator will yield each message received in order until the connection is lost and will raise an ConnectionError to signal that the generator has ended.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

send_command(command: bytes, *args: RedisValueT) None[source]

Queue a command to send to the server without associating a request to it. At the moment this is only useful in pubsub scenarios where commands such as SUBSCRIBE, UNSUBSCRIBE etc do not result in a response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request(command: bytes, *args: RedisValueT, noreply: bool | None = None, decode: RedisValueT | None = None, encoding: str | None = None, raise_exceptions: bool = True, timeout: float | None = None, disconnect_on_cancellation: bool = False) Request[source]

Queue a command to send to the server and create an associated request that can awaited for the response.

Raises:

coredis.exceptions.ConnectionError if the connection is not usable.

create_request_batch(commands: Sequence[CommandRequest[Any]], timeout: float | None = None) RequestBatch[source]

None :raises: coredis.exceptions.ConnectionError if the connection is

not usable.

class BaseConnectionParams[source]

Bases: TypedDict

The common parameters accepted by coredis.connection.BaseConnection

stream_timeout: NotRequired[float | None]

Maximum time to wait for receiving a response for requests created through this connection.

connect_timeout: NotRequired[float | None]

Maximum time to wait for establishing a connection

encoding: NotRequired[str]

Default encoding for command responses.

decode_responses: NotRequired[bool]

Whether to automatically decode responses.

client_name: NotRequired[str | None]

Optional name to register with the server.

noreply: NotRequired[bool]

If True, disables replies for all commands.

noevict: NotRequired[bool]

If True, sets CLIENT NO-EVICT on the connection.

notouch: NotRequired[bool]

If True, sets CLIENT NO-TOUCH on the connection.

max_idle_time: NotRequired[int | None]

Maximum idle time in seconds before the connection is closed.

processing_budget: NotRequired[CapacityLimiter]

Limiter to throttle CPU-bound processing.

username: NotRequired[str | None]

The username to use for authenticating against the redis server

password: NotRequired[str | None]

The password to use for authenticating against the redis server

credential_provider: NotRequired[AbstractCredentialProvider | None]

If provided the connection handshake will include authentication using this provider.

db: NotRequired[int | None]

If provided the connection will immediately switch to this db as part of the handshake

ssl_context: NotRequired[SSLContext | None]

For TLS connections, the ssl context to use when performing the TLS handshake

Location classes

All classes that accept a location parameter to configure the location of a redis server require either a:

class TCPLocation(host: str, port: int)[source]

Location of a redis instance listening on a tcp port

host: str

hostname of the server

port: int

the port the server is listening on

async check() bool[source]

Returns whether the location can be connected to

or

class UnixDomainSocketLocation(path: str)[source]

Location of a redis instance listening on a unix domain socket

path: str

The absolute path of the socket

async check() bool[source]

Returns whether the location can be connected to

If a class can handle both tcp & unix domain sockets it will accept the base class:

class Location[source]

Abstract location

abstractmethod async check() bool[source]

Returns whether the location can be connected to