Stream Consumers¶
coredis.streams
- class Consumer(client: Client[AnyStr], streams: Parameters[KeyT], buffer_size: int = 0, timeout: int | None = 0, **stream_parameters: StreamParameters)[source]¶
Bases:
Generic
Standalone stream consumer that starts reading from the latest entry of each stream provided in
streams
.The latest entry is determined by calling
coredis.Redis.xinfo_stream()
and using thelast-entry
attribute at the point of initializing the consumer instance or on first fetch (whichever comes first). If the stream(s) do not exist at the time of consumer creation, the consumer will simply start from the minimum identifier (0-0
)- Parameters:
client¶ – The redis client to use
streams¶ – the stream identifiers to consume from
buffer_size¶ – Size of buffer (per stream) to maintain. This translates to the maximum number of stream entries that are fetched on each request to redis.
timeout¶ – Maximum amount of time in milliseconds to block for new entries to appear on the streams the consumer is reading from.
stream_parameters¶ – Mapping of optional parameters to use by stream for the streams provided in
streams
.
- async add_stream(stream: StringT, identifier: StringT | None = None) bool [source]¶
Adds a new stream identifier to this consumer
- Parameters:
stream¶ – The stream identifier
- Returns:
True
if the stream was added successfully,False
otherwise
- __aiter__() Consumer [source]¶
Returns the instance of the consumer itself which can be iterated over
- async __anext__() tuple[AnyStr, StreamEntry] [source]¶
Returns the next available stream entry available from any of
Consumer.streams
.- Raises:
StopIteration
if no more entries are available
- async get_entry() tuple[AnyStr | None, StreamEntry | None] [source]¶
Fetches the next available entry from the streams specified in
Consumer.streams
. If there were any entries previously fetched and buffered, they will be returned before making a new request to the server.
- class GroupConsumer(client: Client[AnyStr], streams: Parameters[KeyT], group: StringT, consumer: StringT, buffer_size: int = 0, auto_create: bool = True, auto_acknowledge: bool = False, start_from_backlog: bool = False, timeout: int | None = None, **stream_parameters: StreamParameters)[source]¶
Bases:
Consumer
A member of a stream consumer group. The consumer has an identical interface as
coredis.stream.Consumer
.- Parameters:
client¶ – The redis client to use
streams¶ – The stream identifiers to consume from
group¶ – The name of the group this consumer is part of
auto_create¶ – If True the group will be created upon initialization or first fetch if it doesn’t already exist.
auto_acknowledge¶ – If
True
the stream entries fetched will be fetched without needing to be acknowledged withcoredis.Redis.xack()
to remove them from the pending entries list.start_from_backlog¶ – If
True
the consumer will start by fetching any pending entries from the pending entry list before considering any new messages not seen by any other consumer in thegroup
buffer_size¶ – Size of buffer (per stream) to maintain. This translates to the maximum number of stream entries that are fetched on each request to redis.
timeout¶ – Maximum amount of time to block for new entries to appear on the streams the consumer is reading from.
stream_parameters¶ – Mapping of optional parameters to use by stream for the streams provided in
streams
.
Warning
Providing an
identifier
instream_parameters
has a different meaning for a group consumer. If the value is any valid identifier other than>
the consumer will only access the history of pending messages. That is, the set of messages that were delivered to this consumer (identified byconsumer
) and never acknowledged.- async add_stream(stream: StringT, identifier: StringT | None = '>') bool [source]¶
Adds a new stream identifier to this consumer
- Parameters:
- Returns:
True
if the stream was added successfully,False
otherwise
Added in version 4.12.0.
- __aiter__() GroupConsumer [source]¶
Returns the instance of the consumer itself which can be iterated over
- async __anext__() tuple[AnyStr, StreamEntry] ¶
Returns the next available stream entry available from any of
Consumer.streams
.- Raises:
StopIteration
if no more entries are available
- async get_entry() tuple[AnyStr | None, StreamEntry | None] [source]¶
Fetches the next available entry from the streams specified in
GroupConsumer.streams
. If there were any entries previously fetched and buffered, they will be returned before making a new request to the server.