PubSub

PubSub(connection_pool[, ...])

Pub/Sub implementation to be used with Redis that is returned by pubsub()

ClusterPubSub(connection_pool[, ...])

Pub/Sub implementation to be used with RedisCluster that is returned by pubsub()

ShardedPubSub(connection_pool[, ...])

Sharded Pub/Sub implementation to be used with RedisCluster that is returned by sharded_pubsub()

SubscriptionCallback

Callables for message handler callbacks.

class PubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ExponentialBackoffRetryPolicy((ConnectionError,), retries=None, base_delay=0.1, max_delay=16, jitter=True), ConstantRetryPolicy((TimeoutError,), retries=2, delay=0.1)), channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, patterns: Parameters[StringT] | None = None, pattern_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]

Pub/Sub implementation to be used with Redis that is returned by pubsub()

An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels or patterns that it is subscribed to.

Recommended use:

async with coredis.Redis(decode_responses=True) as client:
    async with client.pubsub(
      ignore_subscribe_messages=True,
      channels=["channel-1", "channel-2"]
    ) as pubsub:
        async for message in pubsub:
            match message["channel"]:
                case "channel-1":
                    print("first", message["data"])
                case "channel-2":
                    print("second", message["data"])

Or to explicitly subscribe:

async with coredis.Redis(decode_responses=True) as client:
    async with client.pubsub() as pubsub:
        await pubsub.subscribe("channel-1")
        assert (await pubsub.get_message())["channel"] == "channel-1"
        async for message in pubsub:
            print(message["data"])

For more details see PubSub

Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such

Parameters:
  • connection_pool – Connection pool used to acquire a connection to use for the pubsub consumer

  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • retry_policy – An explicit retry policy to use in the subscriber.

  • channels – channels that the constructed Pubsub instance should automatically subscribe to

  • channel_handlers – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.

  • patterns – patterns that the constructed Pubsub instance should automatically subscribe to

  • pattern_handlers – Mapping of patterns to automatically subscribe to and the associated handlers that will be invoked when a message is received on channel matching the pattern.

  • subscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.

  • unsubscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.

  • max_idle_seconds – Maximum duration (in seconds) to tolerate no messages from the server before performing a keepalive check with a PING.

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None

Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via get_message() or the iterator.

async punsubscribe(*patterns: StringT) None

Unsubscribes from the supplied patterns. If empty, unsubscribe from all patterns.

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None

Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via get_message() or the iterator.

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

async unsubscribe(*channels: StringT) None

Unsubscribes from the supplied channels. If empty, unsubscribe from all channels

class ClusterPubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ExponentialBackoffRetryPolicy((ConnectionError,), retries=None, base_delay=0.1, max_delay=16, jitter=True), ConstantRetryPolicy((TimeoutError,), retries=2, delay=0.1)), channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, patterns: Parameters[StringT] | None = None, pattern_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]

Pub/Sub implementation to be used with RedisCluster that is returned by pubsub()

Note

This implementation does not particularly benefit from having multiple nodes in a cluster as it subscribes to messages sent to channels using PUBLISH which in cluster mode results in the message being broadcasted to every node in the cluster. For this reason the subscribing client can subscribe to any node in the cluster to receive messages sent to any channel - which inherently limits the potential for scaling.

Redis version: 7.0 introduces the concept of Sharded Pub/Sub which can be accessed by instead using sharded_pubsub() which uses the implementation in ShardedPubSub.

An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels or patterns that it is subscribed to.

For more details see Cluster Pub/Sub

Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such

Parameters:
  • connection_pool – Connection pool used to acquire a connection to use for the pubsub consumer

  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • retry_policy – An explicit retry policy to use in the subscriber.

  • channels – channels that the constructed Pubsub instance should automatically subscribe to

  • channel_handlers – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.

  • patterns – patterns that the constructed Pubsub instance should automatically subscribe to

  • pattern_handlers – Mapping of patterns to automatically subscribe to and the associated handlers that will be invoked when a message is received on channel matching the pattern.

  • subscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.

  • unsubscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.

  • max_idle_seconds – Maximum duration (in seconds) to tolerate no messages from the server before performing a keepalive check with a PING.

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None

Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via get_message() or the iterator.

async punsubscribe(*patterns: StringT) None

Unsubscribes from the supplied patterns. If empty, unsubscribe from all patterns.

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None

Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via get_message() or the iterator.

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

async unsubscribe(*channels: StringT) None

Unsubscribes from the supplied channels. If empty, unsubscribe from all channels

class ShardedPubSub(connection_pool: coredis.pool.ClusterConnectionPool, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = None, read_from_replicas: bool = False, channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]

Sharded Pub/Sub implementation to be used with RedisCluster that is returned by sharded_pubsub()

For details about the server architecture refer to the Redis manual entry on Sharded Pub/sub

New in Redis version: 7.0.0

Warning

Sharded PubSub only supports subscription by channel and does NOT support pattern based subscriptions.

An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels that it is subscribed to.

For more details see Sharded Pub/Sub

Added in version 3.6.0.

Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such

Parameters:
  • connection_pool – Connection pool used to acquire a connection to use for the pubsub consumer

  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • retry_policy – An explicit retry policy to use in the subscriber.

  • channels – channels that the constructed Pubsub instance should automatically subscribe to

  • channel_handlers – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.

  • subscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.

  • unsubscription_timeout – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.

  • max_idle_seconds – Maximum duration (in seconds) to tolerate no messages from the cluster before performing a keepalive check with a PING`.

async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None[source]
Parameters:
  • channels – The shard channels to subscribe to.

  • channel_handlers – Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via get_message() or the iterator.

async unsubscribe(*channels: StringT) None[source]
Parameters:

channels – The shard channels to unsubscribe from. If None are provided, this will effectively unsubscribe the client from all channels previously subscribed to.

async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None

Gets the next message if one is available, otherwise None.

Parameters:
  • ignore_subscribe_messages – Whether to skip subscription acknowledgement messages

  • timeout – Number of seconds to wait for a message to be available on the connection. If the None the command will block forever.

property subscribed: bool

Indicates if there are subscriptions to any channels or patterns

SubscriptionCallback = collections.abc.Callable[[coredis.response.types.PubSubMessage], collections.abc.Awaitable[None]] | collections.abc.Callable[[coredis.response.types.PubSubMessage], None]

Callables for message handler callbacks. The callbacks can be sync or async.