PubSub¶
|
Pub/Sub implementation to be used with |
|
Pub/Sub implementation to be used with |
|
Sharded Pub/Sub implementation to be used with |
Callables for message handler callbacks. |
- class PubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ExponentialBackoffRetryPolicy((ConnectionError,), retries=None, base_delay=0.1, max_delay=16, jitter=True), ConstantRetryPolicy((TimeoutError,), retries=2, delay=0.1)), channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, patterns: Parameters[StringT] | None = None, pattern_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]¶
Pub/Sub implementation to be used with
Redisthat is returned bypubsub()An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels or patterns that it is subscribed to.
Recommended use:
async with coredis.Redis(decode_responses=True) as client: async with client.pubsub( ignore_subscribe_messages=True, channels=["channel-1", "channel-2"] ) as pubsub: async for message in pubsub: match message["channel"]: case "channel-1": print("first", message["data"]) case "channel-2": print("second", message["data"])
Or to explicitly subscribe:
async with coredis.Redis(decode_responses=True) as client: async with client.pubsub() as pubsub: await pubsub.subscribe("channel-1") assert (await pubsub.get_message())["channel"] == "channel-1" async for message in pubsub: print(message["data"])
For more details see PubSub
Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such
- Parameters:
connection_pool¶ – Connection pool used to acquire a connection to use for the pubsub consumer
ignore_subscribe_messages¶ – Whether to skip subscription acknowledgement messages
retry_policy¶ – An explicit retry policy to use in the subscriber.
channels¶ – channels that the constructed Pubsub instance should automatically subscribe to
channel_handlers¶ – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.
patterns¶ – patterns that the constructed Pubsub instance should automatically subscribe to
pattern_handlers¶ – Mapping of patterns to automatically subscribe to and the associated handlers that will be invoked when a message is received on channel matching the pattern.
subscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.
unsubscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.
max_idle_seconds¶ – Maximum duration (in seconds) to tolerate no messages from the server before performing a keepalive check with a
PING.
- async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None¶
Gets the next message if one is available, otherwise None.
- async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None¶
Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via
get_message()or the iterator.
- async punsubscribe(*patterns: StringT) None¶
Unsubscribes from the supplied patterns. If empty, unsubscribe from all patterns.
- async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None¶
Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via
get_message()or the iterator.
- class ClusterPubSub(connection_pool: PoolT, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = CompositeRetryPolicy(ExponentialBackoffRetryPolicy((ConnectionError,), retries=None, base_delay=0.1, max_delay=16, jitter=True), ConstantRetryPolicy((TimeoutError,), retries=2, delay=0.1)), channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, patterns: Parameters[StringT] | None = None, pattern_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]¶
Pub/Sub implementation to be used with
RedisClusterthat is returned bypubsub()Note
This implementation does not particularly benefit from having multiple nodes in a cluster as it subscribes to messages sent to channels using
PUBLISHwhich in cluster mode results in the message being broadcasted to every node in the cluster. For this reason the subscribing client can subscribe to any node in the cluster to receive messages sent to any channel - which inherently limits the potential for scaling.Redis version: 7.0 introduces the concept of Sharded Pub/Sub which can be accessed by instead using
sharded_pubsub()which uses the implementation inShardedPubSub.An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels or patterns that it is subscribed to.
For more details see Cluster Pub/Sub
Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such
- Parameters:
connection_pool¶ – Connection pool used to acquire a connection to use for the pubsub consumer
ignore_subscribe_messages¶ – Whether to skip subscription acknowledgement messages
retry_policy¶ – An explicit retry policy to use in the subscriber.
channels¶ – channels that the constructed Pubsub instance should automatically subscribe to
channel_handlers¶ – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.
patterns¶ – patterns that the constructed Pubsub instance should automatically subscribe to
pattern_handlers¶ – Mapping of patterns to automatically subscribe to and the associated handlers that will be invoked when a message is received on channel matching the pattern.
subscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.
unsubscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.
max_idle_seconds¶ – Maximum duration (in seconds) to tolerate no messages from the server before performing a keepalive check with a
PING.
- async get_message(ignore_subscribe_messages: bool = False, timeout: int | float | None = None) PubSubMessage | None¶
Gets the next message if one is available, otherwise None.
- async psubscribe(*patterns: StringT, **pattern_handlers: SubscriptionCallback | None) None¶
Subscribes to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern’s callable will be invoked automatically when a message is received on that pattern rather than producing a message via
get_message()or the iterator.
- async punsubscribe(*patterns: StringT) None¶
Unsubscribes from the supplied patterns. If empty, unsubscribe from all patterns.
- async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None¶
Subscribes to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via
get_message()or the iterator.
- class ShardedPubSub(connection_pool: coredis.pool.ClusterConnectionPool, ignore_subscribe_messages: bool = False, retry_policy: RetryPolicy | None = None, read_from_replicas: bool = False, channels: Parameters[StringT] | None = None, channel_handlers: Mapping[StringT, SubscriptionCallback] | None = None, subscription_timeout: float = 1, unsubscription_timeout: float = 0.1, max_idle_seconds: float = 15)[source]¶
Sharded Pub/Sub implementation to be used with
RedisClusterthat is returned bysharded_pubsub()For details about the server architecture refer to the Redis manual entry on Sharded Pub/sub
New in Redis version: 7.0.0
Warning
Sharded PubSub only supports subscription by channel and does NOT support pattern based subscriptions.
An instance of this class is both an async context manager (to ensure that proper clean up of connections & subscriptions happens automatically) and an async iterator to consume messages from channels that it is subscribed to.
For more details see Sharded Pub/Sub
Added in version 3.6.0.
Changed in version 6.0.0: The class supports the async context manager protocol and must always be used as such
- Parameters:
connection_pool¶ – Connection pool used to acquire a connection to use for the pubsub consumer
ignore_subscribe_messages¶ – Whether to skip subscription acknowledgement messages
retry_policy¶ – An explicit retry policy to use in the subscriber.
channels¶ – channels that the constructed Pubsub instance should automatically subscribe to
channel_handlers¶ – Mapping of channels to automatically subscribe to and the associated handlers that will be invoked when a message is received on the specific channel.
subscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of subscriptions.
unsubscription_timeout¶ – Maximum amount of time in seconds to wait for acknowledgement of unsubscriptions.
max_idle_seconds¶ – Maximum duration (in seconds) to tolerate no messages from the cluster before performing a keepalive check with a PING`.
- async subscribe(*channels: StringT, **channel_handlers: SubscriptionCallback | None) None[source]¶
- Parameters:
channels¶ – The shard channels to subscribe to.
channel_handlers¶ – Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel’s callable will be invoked automatically when a message is received on that channel rather than producing a message via
get_message()or the iterator.
- async unsubscribe(*channels: StringT) None[source]¶
- Parameters:
channels¶ – The shard channels to unsubscribe from. If None are provided, this will effectively unsubscribe the client from all channels previously subscribed to.
- SubscriptionCallback = collections.abc.Callable[[coredis.response.types.PubSubMessage], collections.abc.Awaitable[None]] | collections.abc.Callable[[coredis.response.types.PubSubMessage], None]¶
Callables for message handler callbacks. The callbacks can be sync or async.