PubSub

PubSub(connection_pool[, ...])

Pub/Sub implementation to be used with coredis.Redis that is returned by coredis.Redis.pubsub()

ClusterPubSub(connection_pool[, ...])

Pub/Sub implementation to be used with coredis.RedisCluster that is returned by coredis.RedisCluster.pubsub()

ShardedPubSub(connection_pool[, ...])

Sharded Pub/Sub implementation to be used with coredis.RedisCluster that is returned by coredis.RedisCluster.sharded_pubsub()

PubSubWorkerThread(pubsub[, poll_timeout])

SubscriptionCallback

Callables for message handler callbacks.

class PubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ConstantRetryPolicy((ConnectionError,), 3, 0.1), ConstantRetryPolicy((TimeoutError,), 2, 0.1)))[source]

Pub/Sub implementation to be used with coredis.Redis that is returned by coredis.Redis.pubsub()

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

async listen() PubSubMessage | None

Listens for messages on channels this client has been subscribed to

async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None

Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via listen().

async punsubscribe(*patterns: StringT) None

Unsubscribes from the supplied patterns. If empy, unsubscribe from all patterns.

reset() None

Clear subscriptions and disconnect and release any connection(s) back to the connection pool.

run_in_thread(poll_timeout: float = 1.0) PubSubWorkerThread

Run the listeners in a thread. For each message received on a subscribed channel or pattern the registered handlers will be invoked.

To stop listening invoke stop() on the returned instance

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None

Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via listen() or get_message().

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

async unsubscribe(*channels: StringT) None

Unsubscribes from the supplied channels. If empty, unsubscribe from all channels

class ClusterPubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ConstantRetryPolicy((ConnectionError,), 3, 0.1), ConstantRetryPolicy((TimeoutError,), 2, 0.1)))[source]

Pub/Sub implementation to be used with coredis.RedisCluster that is returned by coredis.RedisCluster.pubsub()

Note

This implementation does not particularly benefit from having multiple nodes in a cluster as it subscribes to messages sent to channels using PUBLISH which in cluster mode results in the message being broadcasted to every node in the cluster. For this reason the subscribing client can subscribe to any node in the cluster to receive messages sent to any channel - which inherently limits the potential for scaling.

Redis version: 7.0 introduces the concept of Sharded Pub/Sub which can be accessed by instead using coredis.RedisCluster.sharded_pubsub() which uses the implementation in coredis.commands.ShardedPubSub.

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

async listen() PubSubMessage | None

Listens for messages on channels this client has been subscribed to

async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None

Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via listen().

async punsubscribe(*patterns: StringT) None

Unsubscribes from the supplied patterns. If empy, unsubscribe from all patterns.

reset() None

Clear subscriptions and disconnect and release any connection(s) back to the connection pool.

run_in_thread(poll_timeout: float = 1.0) PubSubWorkerThread

Run the listeners in a thread. For each message received on a subscribed channel or pattern the registered handlers will be invoked.

To stop listening invoke stop() on the returned instance

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None

Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via listen() or get_message().

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

async unsubscribe(*channels: StringT) None

Unsubscribes from the supplied channels. If empty, unsubscribe from all channels

class ShardedPubSub(connection_pool: coredis.pool.ClusterConnectionPool, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = None, read_from_replicas: bool = False)[source]

Sharded Pub/Sub implementation to be used with coredis.RedisCluster that is returned by coredis.RedisCluster.sharded_pubsub()

For details about the server architecture refer to the Redis manual entry on Sharded Pub/sub.

New in Redis version: 7.0.0

Warning

Sharded PubSub only supports subscription by channel and does NOT support pattern based subscriptions.

Added in version 3.6.0.

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None[source]
Parameters:
  • channels – The shard channels to subscribe to.

  • channel_handlers – Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via listen() or get_message().

async unsubscribe(*channels: StringT) None[source]
Parameters:

channels – The shard channels to unsubscribe from. If None are provided, this will effectively unsubscribe the client from all channels previously subscribed to.

reset() None[source]

Clear subscriptions and disconnect and release any connection(s) back to the connection pool.

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

async listen() PubSubMessage | None

Listens for messages on channels this client has been subscribed to

run_in_thread(poll_timeout: float = 1.0) PubSubWorkerThread

Run the listeners in a thread. For each message received on a subscribed channel or pattern the registered handlers will be invoked.

To stop listening invoke stop() on the returned instance

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

class PubSubWorkerThread(pubsub: BasePubSub[Any, Any], poll_timeout: float = 1.0)[source]

Bases: Thread

stop() None[source]

Stop the worker thread from processing any more messages

SubscriptionCallback

Callables for message handler callbacks. The callbacks can be sync or async.

alias of Callable[[PubSubMessage], Awaitable[None]] | Callable[[PubSubMessage], None]