Stream Consumers

coredis.streams

class Consumer(client: Client[AnyStr], streams: Parameters[KeyT], buffer_size: int = 0, timeout: int | None = 0, **stream_parameters: StreamParameters)[source]

Bases: Generic

Standalone stream consumer that starts reading from the latest entry of each stream provided in streams.

The latest entry is determined by calling coredis.Redis.xinfo_stream() and using the last-entry attribute at the point of initializing the consumer instance or on first fetch (whichever comes first). If the stream(s) do not exist at the time of consumer creation, the consumer will simply start from the minimum identifier (0-0)

Parameters:
  • client – The redis client to use

  • streams – the stream identifiers to consume from

  • buffer_size – Size of buffer (per stream) to maintain. This translates to the maximum number of stream entries that are fetched on each request to redis.

  • timeout – Maximum amount of time in milliseconds to block for new entries to appear on the streams the consumer is reading from.

  • stream_parameters – Mapping of optional parameters to use by stream for the streams provided in streams.

async add_stream(stream: StringT, identifier: StringT | None = None) bool[source]

Adds a new stream identifier to this consumer

Parameters:

stream – The stream identifier

Returns:

True if the stream was added successfully, False otherwise

__aiter__() Consumer[source]

Returns the instance of the consumer itself which can be iterated over

async __anext__() tuple[AnyStr, StreamEntry][source]

Returns the next available stream entry available from any of Consumer.streams.

Raises:

StopIteration if no more entries are available

async get_entry() tuple[AnyStr | None, StreamEntry | None][source]

Fetches the next available entry from the streams specified in Consumer.streams. If there were any entries previously fetched and buffered, they will be returned before making a new request to the server.

class GroupConsumer(client: Client[AnyStr], streams: Parameters[KeyT], group: StringT, consumer: StringT, buffer_size: int = 0, auto_create: bool = True, auto_acknowledge: bool = False, start_from_backlog: bool = False, timeout: int | None = None, **stream_parameters: StreamParameters)[source]

Bases: Consumer

A member of a stream consumer group. The consumer has an identical interface as coredis.stream.Consumer.

Parameters:
  • client – The redis client to use

  • streams – The stream identifiers to consume from

  • group – The name of the group this consumer is part of

  • consumer – The unique name (within group) of the consumer

  • auto_create – If True the group will be created upon initialization or first fetch if it doesn’t already exist.

  • auto_acknowledge – If True the stream entries fetched will be fetched without needing to be acknowledged with coredis.Redis.xack() to remove them from the pending entries list.

  • start_from_backlog – If True the consumer will start by fetching any pending entries from the pending entry list before considering any new messages not seen by any other consumer in the group

  • buffer_size – Size of buffer (per stream) to maintain. This translates to the maximum number of stream entries that are fetched on each request to redis.

  • timeout – Maximum amount of time to block for new entries to appear on the streams the consumer is reading from.

  • stream_parameters – Mapping of optional parameters to use by stream for the streams provided in streams.

Warning

Providing an identifier in stream_parameters has a different meaning for a group consumer. If the value is any valid identifier other than > the consumer will only access the history of pending messages. That is, the set of messages that were delivered to this consumer (identified by consumer) and never acknowledged.

async add_stream(stream: StringT, identifier: StringT | None = '>') bool[source]

Adds a new stream identifier to this consumer

Parameters:
  • stream – The stream identifier

  • identifier – The identifier to start consuming from. For group consumers this should almost always be > (the default).

Returns:

True if the stream was added successfully, False otherwise

Added in version 4.12.0.

__aiter__() GroupConsumer[source]

Returns the instance of the consumer itself which can be iterated over

async __anext__() tuple[AnyStr, StreamEntry]

Returns the next available stream entry available from any of Consumer.streams.

Raises:

StopIteration if no more entries are available

async get_entry() tuple[AnyStr | None, StreamEntry | None][source]

Fetches the next available entry from the streams specified in GroupConsumer.streams. If there were any entries previously fetched and buffered, they will be returned before making a new request to the server.

class StreamParameters[source]

Bases: TypedDict

identifier: StringT

Starting identifier for the consumer. If not present it will start from the latest entry