PubSub

PubSub is implemented through the following classes:

Creating an instance can be done through the coredis.Redis.pubsub(), coredis.RedisCluster.pubsub(), coredis.RedisCluster.sharded_pubsub() methods exposed by the client or directly via the specific constructors.

Subscription management

Channels or patterns can either be subscribed to on instantiation through constructor parameters or explicitly through the subscribe() or psubscribe() methods.

Upon instantiation:

async with client.pubsub(
    channels=["my-first-channel", "my-second-channel"], patterns=["my-*"]
) as consumer:
    ...

or explicitly:

async with client.pubsub() as consumer:
    await consumer.subscribe("my-first-channel", "my-second-channel", ...)
    await consumer.psubscribe("my-*")

The async context manager automatically manages unsubscribing and cleanup on exit:

async with client.pubsub(
    channels=["my-first-channel", "my-second-channel"], patterns=["my-*"]
) as consumer:
    async for message in consumer:
        print(message)
# remaining subscriptions are unsubscribed and connection is released
# back to the connection pool when the context manager exits.

If desired unsubscription can also be done explicitly by calling unsubscribe() for channels and punsubscribe() for patterns.

In the following example if any of the channels that the client was initially subscribed to contain a STOP message the consumer will unsubscribe from the channel and continue processing until there are no more subscriptions left (the async iterator will automatically exit when the consumer has no subscriptions):

async with client.pubsub(
    channels=[f"channel-{i}" for i in range(10)]
) as consumer:
    async for message in consumer:
        if message["data"] == "STOP":
            await consumer.unsubscribe(message["channel"])
        else:
            print(message["data"])

Consuming Messages

Messages received on the subscribed topics or patterns can be read either by using the pubsub instance itself as an async iterator or explicitly by calling the get_message() method.

Every message read from a PubSub instance will be a typed dictionary defined as:

class PubSubMessage[source]

Bases: TypedDict

type: str

One of the following:

subscribe

Server response when a client subscribes to a channel(s)

unsubscribe

Server response when a client unsubscribes from a channel(s)

psubscribe

Server response when a client subscribes to a pattern(s)

punsubscribe

Server response when a client unsubscribes from a pattern(s)

ssubscribe

Server response when a client subscribes to a shard channel(s)

sunsubscribe

Server response when a client unsubscribes from a shard channel(s)

message

A message received from subscribing to a channel

pmessage

A message received from subscribing to a pattern

channel: StringT

The channel subscribed to or unsubscribed from or the channel a message was published to

pattern: StringT | None

The pattern that was subscribed to or unsubscribed from or to which a received message was routed to

data: int | StringT
  • If type is one of {message, pmessage} this is the actual published message

  • If type is one of {subscribe, psubscribe, ssubscribe, unsubscribe, punsubscribe, sunsubscribe} this will be an int corresponding to the number of channels and patterns that the connection is currently subscribed to.

With the iterator:

await consumer.subscribe("my-channel")
async for message in consumer:
    # do something with the message

Note

Unsubscribing from all subscribed channels will result in the iterator ending (i.e. raising StopAsyncIteration)

Explicitly with get_message():

while True:
    message = await consumer.get_message()
    if message:
        # do something with the message

Note

When using get_message() the return could be None either if timeout is exceeded without receiving a message or:

  • if the message was a subscription / unsubscription response and the instance was created with ignore_subscribe_messages set to True

  • if the message was received on a channel or pattern that has a handler registered (See Callbacks below)

Callbacks

coredis also allows you to register callback functions to handle published messages. Message handlers take a single argument, the message, which is a dictionary just like the examples above. To subscribe to a channel or pattern with a message handler, pass the channel or pattern name as a keyword argument with its value being the callback function.

When a message is read on a channel or pattern with a message handler, the message dictionary is created and passed to the message handler. In this case, a None value is returned from get_message() since the message was already handled.

def my_handler(message):
    print('MY HANDLER: ', message['data'])
await consumer.subscribe(**{'my-channel': my_handler})
# read the subscribe confirmation message
await consumer.get_message()
# {'pattern': None, 'type': 'subscribe', 'channel': 'my-channel', 'data': 1L}
await client.publish('my-channel', 'awesome data')
# 1

# for the message handler to work, we need tell the instance to read data.
# this can be done in several ways (read more below). we'll just use
# the familiar get_message() function for now
await message = consumer.get_message()
# 'MY HANDLER:  awesome data'

# note here that the my_handler callback printed the string above.
# `message` is None because the message was handled by our handler.
print(message)
# None

PubSub instances remember what channels and patterns they are subscribed to. In the event of a disconnection such as a network error or timeout, the PubSub instance will re-subscribe to all prior channels and patterns when reconnecting. Messages that were published while the client was disconnected cannot be delivered.

The Pub/Sub support commands PUBSUB-CHANNELS, PUBSUB-NUMSUB and PUBSUB-NUMPAT are also supported:

await client.pubsub_channels()
# ['foo', 'bar']
await client.pubsub_numsub('foo', 'bar')
# [('foo', 9001), ('bar', 42)]
await client.pubsub_numsub('baz')
# [('baz', 0)]
await client.pubsub_numpat()
# 1204

Cluster Pub/Sub

The RedisCluster client exposes two ways of building a Pub/Sub application.

pubsub() returns an instance of ClusterPubSub which exposes identical functionality to the non clustered client. This is possible without worrying about sharding as the PUBLISH command in clustered redis results in messages being broadcasted to every node in the cluster.

On the consumer side of the equation coredis simply picks a random node and consumes the messages from all subscribed topics.

This approach, though functional does pose limited opportunity for horizontal scaling as all the nodes in the cluster will have to process the published messages for all channels.

Sharded Pub/Sub

Redis Cluster also supports Sharded Pub/Sub through the SSUBSCRIBE, SUNSUBSCRIBE and SPUBLISH commands which restricts publishing of messages to individual shards based on the same algorithm used to route keys to shards.

Note

There is no corresponding support for pattern based subscriptions (as you might have guessed, it wouldn’t be possible to shard those).

Access to Sharded Pub/Sub is available through the sharded_pubsub() method which exposes the same api and functionality (except for pattern support) as the other previously mentioned PubSub classes.

To publish a messages that is meant to be consumed by a Sharded Pub/Sub consumer use spublish() instead of publish()

Sharded Pub/Sub can provide much better performance as each node in the cluster only routes messages for channels that reside on the node (which in turn means that coredis can use a dedicated connection per node to drain messages).

Additionally, the read_from_replicas parameter can be set to True when constructing a ShardedPubSub instance to further increase throughput by letting the consumer use read replicas.