Source code for coredis.commands.request

from __future__ import annotations

import copy
import functools
from types import GenericAlias
from typing import Any, cast, get_origin

from coredis._protocols import CommandResolver
from coredis.globals import COMMAND_FLAGS, READONLY_COMMANDS, ROUTING_STRATEGIES
from coredis.response._callbacks import NoopCallback
from coredis.retry import RetryPolicy
from coredis.typing import (
    Awaitable,
    Callable,
    ExecutionParameters,
    Generator,
    Key,
    RedisValueT,
    ResponseType,
    Serializable,
    TypeAdapter,
    TypeIs,
    TypeVar,
    ValueT,
)

from .constants import CommandFlag

#: Covariant type used for generalizing :class:`~coredis.command.CommandRequest`
CommandResponseT = TypeVar("CommandResponseT", covariant=True)

TransformedResponse = TypeVar("TransformedResponse")
empty_adapter = TypeAdapter()


def is_type_like(obj: object) -> TypeIs[type[Any]]:
    """
    Return True if ``obj`` is type-like and should be treated as a
    deserialization target rather than a callable transformer.
    """
    return isinstance(obj, type) or isinstance(obj, GenericAlias) or get_origin(obj) is not None


[docs] class CommandRequest(Awaitable[CommandResponseT]): __slots__ = ( "name", "arguments", "serialized_arguments", "execution_parameters", "executor", "callback", "decode", "blocking", "readonly", "noreply", "routing_strategy", "resolver", "type_adapter", "kwargs", "_response", ) _response: Awaitable[CommandResponseT] def __init__( self, name: bytes, *arguments: ValueT | Key, callback: Callable[..., CommandResponseT] = NoopCallback(), execution_parameters: ExecutionParameters, resolver: CommandResolver | None = None, type_adapter: TypeAdapter = empty_adapter, **kwargs: Any, ) -> None: """ The default command request object which is returned by all methods mirroring redis commands. :param client: The instance of the :class:`coredis.Redis` that will be used to call :meth:`~coredis.Redis.execute_command` :param name: The name of the command :param arguments: All arguments (in redis format) to be passed to the command :param callback: The callback to be used to transform the RESP response :param execution_parameters: Any additional parameters to be passed to :meth:`coredis.Redis.execute_command` """ self.name = name self.callback = callback self.execution_parameters = execution_parameters or {} self.arguments = arguments self.blocking = CommandFlag.BLOCKING in COMMAND_FLAGS[name] self.readonly = name in READONLY_COMMANDS self.noreply = execution_parameters.get("noreply", False) self.decode = execution_parameters.get("decode", None) self.routing_strategy = ROUTING_STRATEGIES.get(name) self.type_adapter = type_adapter self.resolver = resolver self.kwargs = kwargs self.serialized_arguments = tuple( self.type_adapter.serialize(k) if isinstance(k, Serializable) else k.key if isinstance(k, Key) else k for k in arguments ) @property @functools.cache def keys(self) -> tuple[RedisValueT, ...]: return tuple([k.key for k in self.arguments if isinstance(k, Key)]) @property @functools.cache def affected_slots(self) -> tuple[int, ...]: return tuple({k.slot for k in self.arguments if isinstance(k, Key)}) @property @functools.cache def slots_to_keys(self) -> dict[int, list[tuple[int, RedisValueT]]]: mapping: dict[int, list[tuple[int, RedisValueT]]] = {} for idx, k in enumerate([k for k in self.arguments if isinstance(k, Key)]): mapping.setdefault(k.slot, []).append((idx, k.key)) return mapping @property @functools.cache def key_indices(self) -> tuple[int, ...]: indices: list[int] = [] for idx, arg in enumerate(self.arguments): if isinstance(arg, Key): indices.append(idx) return tuple(indices) def run(self) -> Awaitable[CommandResponseT]: assert self.resolver if not hasattr(self, "_response"): self._response = self.resolver(self) return self._response
[docs] def retry( self, policy: RetryPolicy, failure_hook: Callable[..., Awaitable[Any] | None] | dict[type[BaseException], Callable[..., Awaitable[None] | None]] | None = None, ) -> CommandRequest[CommandResponseT]: """ :param policy: Retry policy to use :param failure_hook: if provided and is a callable it will be called after catching any retryable exception and before retrying. If it is a mapping of exception types to callables, the first exception type that is a parent of any encountered exception will be called. :return: A retryable version of the command object Calling ``retry`` is essentially the same as explicitly using a retry policy when calling a redis command. For example the following two examples that try to push to a list upto 2 times if a :exc:`~coredis.exceptions.RedisError` is encountered, are equivalent. With :meth:`retry`:: async with coredis.Redis() as client: await client.lpush("mylist", [1,2,3]).retry( coredis.retry.ConstantRetryPolicy( (coredis.exceptions.RedisError,), retries=2, delay=1 ), ) Explicitly retrying:: retry_policy = coredis.retry.ConstantRetryPolicy( (coredis.exceptions.RedisError,), retries=2, delay=1 ) async with coredis.Redis() as client: await retry_policy.call_with_retries(lambda:client.lpush("mylist", [1,2,3]) """ return RetryableCommandRequest( self, policy=policy, failure_hook=failure_hook, )
[docs] def transform( self, transformer: type[TransformedResponse] | Callable[[CommandResponseT], TransformedResponse], ) -> CommandRequest[TransformedResponse]: """ :param transformer: A type that was registered with the client using :meth:`~coredis.typing.TypeAdapter.register_deserializer` or decorated by :meth:`~coredis.typing.TypeAdapter.deserializer` or a callable that takes a single argument (the original response) and returns the transformed response. :return: a command request object that when awaited will return the transformed response For example when used with a redis command:: client = coredis.Redis(....) @client.type_adapter.deserializer def _(value: bytes) -> int: return int(value) async with client: await client.set("fubar", 1) raw: bytes = await client.get("fubar") int_value: int = await client.get("fubar").transform(int) float_value: float = await client.get("fubar").transform(lambda value: float(value)) """ return TransformedCommandRequest( self, transformer=transformer, )
[docs] def raw(self, decode_response: bool | None = None) -> CommandRequest[ResponseType]: """ :param decode_resonse: Whether to decode any bulk strings from the server. If ``None`` the setting the client was configured with will be used. :return: a command request object that when awaited will return the original response from the server without any callback applied to transform the response For example when used with a redis command:: client = coredis.Redis(....) async with client: assert True == await client.set("fubar", 1) assert b"OK" == await client.set("fubar", 1).raw(decode_response=False) assert "OK" == await client.set("fubar", 1).raw(decode_response=True) """ return CommandRequest( self.name, *self.arguments, execution_parameters=( self.execution_parameters if decode_response is None else {**self.execution_parameters, **{"decode": decode_response}} ), resolver=self.resolver, type_adapter=self.type_adapter, )
[docs] def route(self, by: bytes | str | int) -> CommandRequest[CommandResponseT]: """ Explicitly set the node the command should be routed to in cluster mode. :param by: either a key to hash by or a slot For example to explicitly route the scan call to the cluster node serving slot "0":: client = coredis.RedisCluster(...) async with client: cursor = None while cursor != 0: cursor, keys = await client.scan(cursor).route(0) """ from coredis.commands._routing import ExplicitSlotStrategy self.routing_strategy = ExplicitSlotStrategy(by) return self
def __await__(self) -> Generator[Any, Any, CommandResponseT]: return self.run().__await__()
class TransformedCommandRequest(CommandRequest[TransformedResponse]): def __init__( self, parent: CommandRequest[CommandResponseT], transformer: type[TransformedResponse] | Callable[[CommandResponseT], TransformedResponse], ) -> None: """ A command request object that has a tranformed response based on the :paramref:`transformer` parameter. :param parent: The original command request that the transformer will be applied to. :param transformer: A type that was registered with the client using :meth:`~coredis.typing.TypeAdapter.register_deserializer` or decorated by :meth:`~coredis.typing.TypeAdapter.deserializer` or a callable that takes a single argument (the original response) and returns the transformed response. """ super().__init__( parent.name, *parent.arguments, execution_parameters=parent.execution_parameters, type_adapter=parent.type_adapter, ) self.parent = parent self.transformer = transformer self.transform_func = cast( Callable[..., TransformedResponse], ( functools.partial(self.type_adapter.deserialize, return_type=self.transformer) if is_type_like(self.transformer) else self.transformer ), ) def __await__(self) -> Generator[Any, Any, TransformedResponse]: async def _parent() -> TransformedResponse: original = await self.parent return self.transform_func(original) return _parent().__await__() class RetryableCommandRequest(CommandRequest[CommandResponseT]): def __init__( self, request: CommandRequest[CommandResponseT], policy: RetryPolicy, failure_hook: Callable[..., Awaitable[Any] | None] | dict[type[BaseException], Callable[..., Awaitable[None] | None]] | None = None, **kwargs: Any, ) -> None: """ A retryable command request object. :param request: The original request to retry :param policy: The retry policy to use when executing the command :param failure_hook: if provided and is a callable it will be called after catching any retryable exception and before retrying. If it is a mapping of exception types to callables, the first exception type that is a parent of any encountered exception will be called. """ self.policy = policy self.failure_hook = failure_hook super().__init__( request.name, *request.arguments, callback=request.callback, execution_parameters=request.execution_parameters, resolver=request.resolver, policy=self.policy, type_adapter=request.type_adapter, failure_hook=self.failure_hook, **kwargs, ) def __await__(self) -> Generator[Any, Any, CommandResponseT]: return self.policy.call_with_retries( lambda: copy.copy(self).run(), failure_hook=self.failure_hook ).__await__()