from __future__ import annotations
from typing import Any
from deprecated.sphinx import versionadded
from coredis._utils import EncodingInsensitiveDict, nativestr
from coredis.client import Client
from coredis.exceptions import (
ResponseError,
StreamConsumerInitializationError,
StreamDuplicateConsumerGroupError,
)
from coredis.response.types import StreamEntry
from coredis.tokens import PureToken
from coredis.typing import (
AnyStr,
ClassVar,
Dict,
Generator,
Generic,
KeyT,
List,
Optional,
Parameters,
Set,
StringT,
Tuple,
TypedDict,
ValueT,
)
[docs]
class StreamParameters(TypedDict):
#: Starting ``identifier`` for the consumer. If not present it will start
#: from the latest entry
identifier: StringT
class State(TypedDict, total=False):
identifier: Optional[StringT]
pending: Optional[bool]
[docs]
class Consumer(Generic[AnyStr]):
state: Dict[KeyT, State]
DEFAULT_START_ID: ClassVar[bytes] = b"0-0"
def __init__(
self,
client: Client[AnyStr],
streams: Parameters[KeyT],
buffer_size: int = 0,
timeout: Optional[int] = 0,
**stream_parameters: StreamParameters,
):
"""
Standalone stream consumer that starts reading from the latest entry
of each stream provided in :paramref:`streams`.
The latest entry is determined by calling :meth:`coredis.Redis.xinfo_stream`
and using the :data:`last-entry` attribute
at the point of initializing the consumer instance or on first fetch (whichever comes
first). If the stream(s) do not exist at the time of consumer creation, the
consumer will simply start from the minimum identifier (``0-0``)
:param client: The redis client to use
:param streams: the stream identifiers to consume from
:param buffer_size: Size of buffer (per stream) to maintain. This
translates to the maximum number of stream entries that are fetched
on each request to redis.
:param timeout: Maximum amount of time in milliseconds to block for new
entries to appear on the streams the consumer is reading from.
:param stream_parameters: Mapping of optional parameters to use
by stream for the streams provided in :paramref:`streams`.
"""
self.client: Client[AnyStr] = client
self.streams: Set[KeyT] = set(streams)
self.state: Dict[StringT, State] = EncodingInsensitiveDict(
{stream: stream_parameters.get(nativestr(stream), {}) for stream in streams}
)
self.buffer: Dict[AnyStr, List[StreamEntry]] = EncodingInsensitiveDict({})
self.buffer_size = buffer_size
self.timeout = timeout
self._initialized = False
self._initialized_streams: Dict[StringT, bool] = {}
def chunk_streams(self) -> List[Dict[ValueT, StringT]]:
import coredis.client
if isinstance(self.client, coredis.client.RedisCluster):
return [
{
stream: self.state[stream].get("identifier", None)
or self.DEFAULT_START_ID
}
for stream in self.streams
]
else:
return [
{
stream: self.state[stream].get("identifier", None)
or self.DEFAULT_START_ID
for stream in self.streams
}
]
async def initialize(self, partial: bool = False) -> "Consumer[AnyStr]":
if self._initialized and not partial:
return self
for stream in self.streams:
if partial and self._initialized_streams.get(stream):
continue
try:
info = await self.client.xinfo_stream(stream)
if info:
last_entry = info["last-entry"]
if last_entry:
self.state[stream].setdefault(
"identifier", last_entry.identifier
)
except ResponseError:
pass
self._initialized_streams[stream] = True
self._initialized = True
return self
[docs]
async def add_stream(
self, stream: StringT, identifier: Optional[StringT] = None
) -> bool:
"""
Adds a new stream identifier to this consumer
:param stream: The stream identifier
:return: ``True`` if the stream was added successfully, ``False`` otherwise
"""
self.streams.add(stream)
self.state.setdefault(stream, {"identifier": identifier} if identifier else {})
await self.initialize(partial=True)
return stream in self._initialized_streams
def __await__(self) -> Generator[Any, None, Consumer[AnyStr]]:
return self.initialize().__await__()
[docs]
def __aiter__(self) -> Consumer[AnyStr]:
"""
Returns the instance of the consumer itself which can be iterated over
"""
return self
[docs]
async def __anext__(self) -> Tuple[AnyStr, StreamEntry]:
"""
Returns the next available stream entry available from any of
:paramref:`Consumer.streams`.
:raises: :exc:`StopIteration` if no more entries are available
"""
stream, entry = await self.get_entry()
if not (stream and entry):
raise StopAsyncIteration()
return stream, entry
[docs]
async def get_entry(self) -> Tuple[Optional[AnyStr], Optional[StreamEntry]]:
"""
Fetches the next available entry from the streams specified in
:paramref:`Consumer.streams`. If there were any entries
previously fetched and buffered, they will be returned before
making a new request to the server.
"""
await self.initialize()
cur = None
cur_stream = None
for stream, buffer_entries in list(self.buffer.items()):
if buffer_entries:
cur_stream, cur = stream, self.buffer[stream].pop(0)
break
else:
consumed_entries: Dict[AnyStr, Tuple[StreamEntry, ...]] = {}
for chunk in self.chunk_streams():
consumed_entries.update(
await self.client.xread(
chunk,
count=self.buffer_size + 1,
block=(
self.timeout
if (self.timeout and self.timeout > 0)
else None
),
)
or {}
)
for stream, entries in consumed_entries.items():
if entries:
if not cur:
cur = entries[0]
cur_stream = stream
if entries[1:]:
self.buffer.setdefault(stream, []).extend(entries[1:])
else:
self.buffer.setdefault(stream, []).extend(entries)
if cur and cur_stream:
self.state[cur_stream]["identifier"] = cur.identifier
return cur_stream, cur
[docs]
class GroupConsumer(Consumer[AnyStr]):
DEFAULT_START_ID: ClassVar[bytes] = b">"
def __init__(
self,
client: Client[AnyStr],
streams: Parameters[KeyT],
group: StringT,
consumer: StringT,
buffer_size: int = 0,
auto_create: bool = True,
auto_acknowledge: bool = False,
start_from_backlog: bool = False,
timeout: Optional[int] = None,
**stream_parameters: StreamParameters,
):
"""
A member of a stream consumer group. The consumer has an identical
interface as :class:`coredis.stream.Consumer`.
:param client: The redis client to use
:param streams: The stream identifiers to consume from
:param group: The name of the group this consumer is part of
:param consumer: The unique name (within :paramref:`group`) of the consumer
:param auto_create: If True the group will be created upon initialization
or first fetch if it doesn't already exist.
:param auto_acknowledge: If ``True`` the stream entries fetched will be fetched
without needing to be acknowledged with :meth:`coredis.Redis.xack` to remove
them from the pending entries list.
:param start_from_backlog: If ``True`` the consumer will start by fetching any pending
entries from the pending entry list before considering any new messages
not seen by any other consumer in the :paramref:`group`
:param buffer_size: Size of buffer (per stream) to maintain. This
translates to the maximum number of stream entries that are fetched
on each request to redis.
:param timeout: Maximum amount of time to block for new
entries to appear on the streams the consumer is reading from.
:param stream_parameters: Mapping of optional parameters to use
by stream for the streams provided in :paramref:`streams`.
.. warning:: Providing an ``identifier`` in ``stream_parameters`` has a different
meaning for a group consumer. If the value is any valid identifier other than ``>``
the consumer will only access the history of pending messages. That is, the set of
messages that were delivered to this consumer (identified by :paramref:`consumer`)
and never acknowledged.
"""
super().__init__(
client, # type: ignore[arg-type]
streams,
buffer_size,
timeout,
**stream_parameters,
)
self.group = group
self.consumer = consumer
self.auto_create = auto_create
self.auto_acknowledge = auto_acknowledge
self.start_from_backlog = start_from_backlog
async def initialize(self, partial: bool = False) -> "GroupConsumer[AnyStr]":
if not self._initialized or partial:
group_presence: Dict[KeyT, bool] = {
stream: stream in self._initialized_streams for stream in self.streams
}
for stream in self.streams:
try:
if self._initialized_streams.get(stream):
continue
group_presence[stream] = (
len(
[
info
for info in [
EncodingInsensitiveDict(d)
for d in await self.client.xinfo_groups(stream)
]
if nativestr(info["name"]) == self.group
]
)
== 1
)
if group_presence[stream] and self.start_from_backlog:
self.state[stream]["pending"] = True
self.state[stream]["identifier"] = "0-0"
except ResponseError:
self.state[stream].setdefault("identifier", ">")
if not (self.auto_create or all(group_presence.values())):
missing_streams = self.streams - {
k for k in group_presence if not group_presence[k]
}
raise StreamConsumerInitializationError(
f"Consumer group: {self.group!r} does not exist for streams: {missing_streams}"
)
for stream in self.streams:
if self.auto_create and not group_presence.get(stream):
try:
await self.client.xgroup_create(
stream, self.group, PureToken.NEW_ID, mkstream=True
)
except StreamDuplicateConsumerGroupError: # noqa
pass
self._initialized_streams[stream] = True
self.state[stream].setdefault("identifier", ">")
self._initialized = True
return self
[docs]
@versionadded(version="4.12.0")
async def add_stream(
self, stream: StringT, identifier: Optional[StringT] = ">"
) -> bool:
"""
Adds a new stream identifier to this consumer
:param stream: The stream identifier
:param identifier: The identifier to start consuming from. For group
consumers this should almost always be ``>`` (the default).
:return: ``True`` if the stream was added successfully, ``False`` otherwise
"""
return await super().add_stream(stream, identifier)
def __await__(self) -> Generator[Any, None, GroupConsumer[AnyStr]]:
return self.initialize().__await__()
[docs]
def __aiter__(self) -> GroupConsumer[AnyStr]:
"""
Returns the instance of the consumer itself which can be iterated over
"""
return self
[docs]
async def get_entry(self) -> Tuple[Optional[AnyStr], Optional[StreamEntry]]:
"""
Fetches the next available entry from the streams specified in
:paramref:`GroupConsumer.streams`. If there were any entries
previously fetched and buffered, they will be returned before
making a new request to the server.
"""
await self.initialize()
cur = None
cur_stream = None
for stream, buffer_entries in list(self.buffer.items()):
if buffer_entries:
cur_stream, cur = stream, self.buffer[stream].pop(0)
break
else:
consumed_entries: Dict[AnyStr, Tuple[StreamEntry, ...]] = {}
for chunk in self.chunk_streams():
consumed_entries.update(
await self.client.xreadgroup(
self.group,
self.consumer,
count=self.buffer_size + 1,
block=(
self.timeout
if (self.timeout and self.timeout > 0)
else None
),
noack=self.auto_acknowledge,
streams=chunk,
)
or {}
)
for stream, entries in consumed_entries.items():
if entries:
if not cur:
cur = entries[0]
cur_stream = stream
if entries[1:]:
self.buffer.setdefault(stream, []).extend(entries[1:])
else:
self.buffer.setdefault(stream, []).extend(entries)
if self.state[stream].get("pending"):
self.state[stream]["identifier"] = entries[-1].identifier
else:
if self.state[stream].get("pending"):
self.state[stream].pop("identifier", None)
self.state[stream].pop("pending", None)
if not cur:
return await self.get_entry()
return cur_stream, cur