PubSub¶
PubSub is implemented through the following classes:
Single Redis Server:
PubSubRedis Cluster:
ClusterPubSubRedis Cluster (with sharded pubsub):
ShardedPubSub
Creating an instance can be done through the coredis.Redis.pubsub(),
coredis.RedisCluster.pubsub(), coredis.RedisCluster.sharded_pubsub() methods
exposed by the client or directly via the specific constructors.
Subscription management¶
Channels or patterns can either be subscribed to on instantiation through
constructor parameters or explicitly through the subscribe()
or psubscribe() methods.
Upon instantiation:
async with client.pubsub(
channels=["my-first-channel", "my-second-channel"], patterns=["my-*"]
) as consumer:
...
or explicitly:
async with client.pubsub() as consumer:
await consumer.subscribe("my-first-channel", "my-second-channel", ...)
await consumer.psubscribe("my-*")
The async context manager automatically manages unsubscribing and cleanup on exit:
async with client.pubsub(
channels=["my-first-channel", "my-second-channel"], patterns=["my-*"]
) as consumer:
async for message in consumer:
print(message)
# remaining subscriptions are unsubscribed and connection is released
# back to the connection pool when the context manager exits.
If desired unsubscription can also be done explicitly by calling
unsubscribe() for channels
and punsubscribe() for patterns.
In the following example if any of the channels that the client was initially subscribed
to contain a STOP message the consumer will unsubscribe from the channel and continue
processing until there are no more subscriptions left (the async iterator will automatically
exit when the consumer has no subscriptions):
async with client.pubsub(
channels=[f"channel-{i}" for i in range(10)]
) as consumer:
async for message in consumer:
if message["data"] == "STOP":
await consumer.unsubscribe(message["channel"])
else:
print(message["data"])
Consuming Messages¶
Messages received on the subscribed topics or patterns can be read either by
using the pubsub instance itself as an async iterator or explicitly by calling
the get_message() method.
Every message read from a PubSub instance
will be a typed dictionary defined as:
- class PubSubMessage[source]
Bases:
TypedDict- type: str
One of the following:
- subscribe
Server response when a client subscribes to a channel(s)
- unsubscribe
Server response when a client unsubscribes from a channel(s)
- psubscribe
Server response when a client subscribes to a pattern(s)
- punsubscribe
Server response when a client unsubscribes from a pattern(s)
- ssubscribe
Server response when a client subscribes to a shard channel(s)
- sunsubscribe
Server response when a client unsubscribes from a shard channel(s)
- message
A message received from subscribing to a channel
- pmessage
A message received from subscribing to a pattern
- channel: StringT
The channel subscribed to or unsubscribed from or the channel a message was published to
- pattern: StringT | None
The pattern that was subscribed to or unsubscribed from or to which a received message was routed to
- data: int | StringT
If
typeis one of{message, pmessage}this is the actual published messageIf
typeis one of{subscribe, psubscribe, ssubscribe, unsubscribe, punsubscribe, sunsubscribe}this will be anintcorresponding to the number of channels and patterns that the connection is currently subscribed to.
With the iterator:
await consumer.subscribe("my-channel")
async for message in consumer:
# do something with the message
Note
Unsubscribing from all subscribed channels will result in the iterator
ending (i.e. raising StopAsyncIteration)
Explicitly with get_message():
while True:
message = await consumer.get_message()
if message:
# do something with the message
Note
When using get_message() the return
could be None either if timeout is
exceeded without receiving a message or:
if the message was a subscription / unsubscription response and the instance was created with
ignore_subscribe_messagesset toTrueif the message was received on a channel or pattern that has a handler registered (See Callbacks below)
Callbacks¶
coredis also allows you to register callback functions to handle published messages. Message handlers take a single argument, the message, which is a dictionary just like the examples above. To subscribe to a channel or pattern with a message handler, pass the channel or pattern name as a keyword argument with its value being the callback function.
When a message is read on a channel or pattern with a message handler, the
message dictionary is created and passed to the message handler. In this case,
a None value is returned from get_message()
since the message was already handled.
def my_handler(message):
print('MY HANDLER: ', message['data'])
await consumer.subscribe(**{'my-channel': my_handler})
# read the subscribe confirmation message
await consumer.get_message()
# {'pattern': None, 'type': 'subscribe', 'channel': 'my-channel', 'data': 1L}
await client.publish('my-channel', 'awesome data')
# 1
# for the message handler to work, we need tell the instance to read data.
# this can be done in several ways (read more below). we'll just use
# the familiar get_message() function for now
await message = consumer.get_message()
# 'MY HANDLER: awesome data'
# note here that the my_handler callback printed the string above.
# `message` is None because the message was handled by our handler.
print(message)
# None
PubSub instances remember what channels and patterns they are subscribed to. In the event of a disconnection such as a network error or timeout, the PubSub instance will re-subscribe to all prior channels and patterns when reconnecting. Messages that were published while the client was disconnected cannot be delivered.
The Pub/Sub support commands PUBSUB-CHANNELS, PUBSUB-NUMSUB and PUBSUB-NUMPAT are also supported:
await client.pubsub_channels()
# ['foo', 'bar']
await client.pubsub_numsub('foo', 'bar')
# [('foo', 9001), ('bar', 42)]
await client.pubsub_numsub('baz')
# [('baz', 0)]
await client.pubsub_numpat()
# 1204
Cluster Pub/Sub¶
The RedisCluster client exposes two ways of building a Pub/Sub
application.
pubsub() returns an instance of ClusterPubSub
which exposes identical functionality to the non clustered client. This is possible
without worrying about sharding as the PUBLISH command in clustered redis results
in messages being broadcasted to every node in the cluster.
On the consumer side of the equation coredis simply picks a random node and consumes the messages from all subscribed topics.
This approach, though functional does pose limited opportunity for horizontal scaling as all the nodes in the cluster will have to process the published messages for all channels.