Streams

Redis Streams are a flexible datastructure that can be applied to various use cases including but not limited to timeseries and Pub/Sub for event driven architectures. The coredis clients support all stream related commands and provides two high level abstractions for building stream consumers.

Simple Consumer

The Consumer returned by xconsumer() can be used as an independent consumer that can read from one or many streams. The consumer has limited scope and can be configured with a collection of streams to read from which by default starts from the latest entry observed upon initialization.

Given either a regular client or a cluster client:

import coredis
client = coredis.Redis()
# or cluster
# client = coredis.RedisCluster("localhost", 7000)
  1. Create the consumer:

    consumer = client.xconsumer(streams=["one", "two", "three"])
    # or directly
    # import coredis.patterns.streams
    # consumer = coredis.patterns.streams.Consumer(client, streams=["one", "two", "three"])
    
  2. Entries can be fetched explicitly by calling get_entry():

    async with client:
        async with consumer:
            stream, entry = await consumer.get_entry()
    
  3. or, by using the consumer as an asynchronous iterator:

    async with client:
        async for stream, entry in consumer:
            # do something with the entry
    

    Note

    The iterator will end the moment there are no entries returned or pending in the buffer.

    The above example can be converted into an infinite iterator over the configured streams:

    async with client:
      while True:
          async for stream, entry in consumer:
              # do something with the entry
    

Configuration

The consumer could have been configured with a few parameters to align better with the performance characteristics of your application such as:

  1. With a blocking timeout:

    consumer = client.xconsumer(
        client,
        streams=["one", "two", "three"],
        timeout=30*1000  # 30 seconds
    )
    
  2. With an internal buffer:

    consumer = client.xconsumer(
        client,
        streams=["one", "two", "three"],
        # Will fetch upto 10 extra entries per stream
        # on every request to redis
        buffer_size=10,
        timeout=30*1000  # 30 seconds
    )
    

Group Consumer

GroupConsumer returned by xconsumer() when group and consumer are both provided, has an identical interface as that provided by the standalone consumer. It differs significantly however, in the use cases for which it is applicable. Group consumers work cooperatively by only fetching new entries that have not been seen by other consumers within the same group. As a consequence they have a concept of checkpointing by acknowledging received stream entries and revisiting backlogs of entries that have not been acknowledged either by themselves, or by other consumers in the same group.

The following two consumers will cooperatively consume from streams {one, two, three} without ever seeing an entry that the other has fetched:

consumer1 = client.xconsumer(
    streams = ["one", "two", "three"],
    group = "group-a",
    consumer = "consumer-1",
    auto_acknowledge = True,
)
consumer2 = client.xconsumer(
    streams = ["one", "two", "three"],
    group = "group-a",
    consumer = "consumer-2",
    auto_acknowledge = True,
)

# Or directly
# import coredis.patterns.streams
# consumer1 = coredis.patterns.streams.GroupConsumer(
#   client,
#   streams= ["one", "two", "three"],
#   group = "group-a",
#   consumer = "consumer-1",
#   auto_acknowledge = True,
# )

Note

Setting the auto_acknowledge parameter ensures that the consumers don’t need to explicitly acknowledge the entries that they fetch thus resulting in the received entries not populating the PEL.

  1. Add some entries to the three streams:

    async with client:
        [await client.xadd("one", {"id": i}) for i in range(10)]
        [await client.xadd("two", {"id": i}) for i in range(10)]
        [await client.xadd("three", {"id": i}) for i in range(10)]
    
  2. Concurrently initiate a full drain with both consumers:

    async def processor(consumer):
        return [(stream, entry) async for (stream, entry) in consumer]
    
    consumer1_results, consumer2_results = await asyncio.gather(
        processor(consumer1),
        processor(consumer2)
    )
    
    assert len(consumer1_results) + len(consumer2_results) == 30
    

Backlog management

The above examples use the most common configuration for a consumer that is a member of a consumer group. The group consumer respects the configuration parameters expected by the standalone consumer (Consumer) for example with respect to blocking and buffer sizes. It has a few additional optional parameters that can be used modify the behavior with respect to backlogs and checkpointing.

Setting the start_from_backlog parameter to True creates a consumer that considers any old entries that were not acknowledged before picking up any new entries from the stream:

import random

async with client:
    async with client.xconsumer(
        streams=["one"],
        group = "group-a",
        consumer = "consumer-1",
        start_from_backlog = True
    ) as consumer:
        [await client.xadd("one", {"id": i}) for i in range(10)]


        # fetch all ten entries and simulate a bug occurring 50% of the time
        # when processing the entry
        async for stream, entry in consumer:
            if random.random() > 0.5:
                print("success", await client.xack(stream, consumer.group, [entry.identifier]))
            else:
                print("oh nos!")

        pending = await client.xpending("one", "group-a")
        assert pending.consumers[b"consumer-1"] > 0

    print("round two")
    # Let's pretend the consumer crashed and started again
    # and now doesn't have a bug that fails 50% of the time
    async with client.xconsumer(
        streams=["one"],
        group = "group-a",
        consumer = "consumer-1",
        start_from_backlog = True
    ) as consumer:
        async for stream, entry in consumer:
             await client.xack(stream, consumer.group, [entry.identifier])

        pending = await client.xpending("one", "group-a")
        assert pending.consumers.get(b"consumer-1") is None

If no checkpointing is desired the group consumer can be initialized with the auto_acknowledge parameter set to True which effectively results in redis not maintaining a PEL for the entries received by the consumers in this group.