from __future__ import annotations
import copy
import functools
from types import GenericAlias
from typing import Any, cast, get_origin
from coredis._protocols import CommandResolver
from coredis.globals import COMMAND_FLAGS, READONLY_COMMANDS, ROUTING_STRATEGIES
from coredis.response._callbacks import NoopCallback
from coredis.retry import RetryPolicy
from coredis.typing import (
Awaitable,
Callable,
ExecutionParameters,
Generator,
Key,
RedisValueT,
ResponseType,
Serializable,
TypeAdapter,
TypeIs,
TypeVar,
ValueT,
)
from .constants import CommandFlag
#: Covariant type used for generalizing :class:`~coredis.command.CommandRequest`
CommandResponseT = TypeVar("CommandResponseT", covariant=True)
TransformedResponse = TypeVar("TransformedResponse")
empty_adapter = TypeAdapter()
def is_type_like(obj: object) -> TypeIs[type[Any]]:
"""
Return True if ``obj`` is type-like and should be treated as a
deserialization target rather than a callable transformer.
"""
return isinstance(obj, type) or isinstance(obj, GenericAlias) or get_origin(obj) is not None
[docs]
class CommandRequest(Awaitable[CommandResponseT]):
__slots__ = (
"name",
"arguments",
"serialized_arguments",
"execution_parameters",
"executor",
"callback",
"decode",
"blocking",
"readonly",
"noreply",
"routing_strategy",
"resolver",
"type_adapter",
"kwargs",
"_response",
)
_response: Awaitable[CommandResponseT]
def __init__(
self,
name: bytes,
*arguments: ValueT | Key,
callback: Callable[..., CommandResponseT] = NoopCallback(),
execution_parameters: ExecutionParameters,
resolver: CommandResolver | None = None,
type_adapter: TypeAdapter = empty_adapter,
**kwargs: Any,
) -> None:
"""
The default command request object which is returned by all
methods mirroring redis commands.
:param client: The instance of the :class:`coredis.Redis` that
will be used to call :meth:`~coredis.Redis.execute_command`
:param name: The name of the command
:param arguments: All arguments (in redis format) to be passed to the command
:param callback: The callback to be used to transform the RESP response
:param execution_parameters: Any additional parameters to be passed to
:meth:`coredis.Redis.execute_command`
"""
self.name = name
self.callback = callback
self.execution_parameters = execution_parameters or {}
self.arguments = arguments
self.blocking = CommandFlag.BLOCKING in COMMAND_FLAGS[name]
self.readonly = name in READONLY_COMMANDS
self.noreply = execution_parameters.get("noreply", False)
self.decode = execution_parameters.get("decode", None)
self.routing_strategy = ROUTING_STRATEGIES.get(name)
self.type_adapter = type_adapter
self.resolver = resolver
self.kwargs = kwargs
self.serialized_arguments = tuple(
self.type_adapter.serialize(k)
if isinstance(k, Serializable)
else k.key
if isinstance(k, Key)
else k
for k in arguments
)
@property
@functools.cache
def keys(self) -> tuple[RedisValueT, ...]:
return tuple([k.key for k in self.arguments if isinstance(k, Key)])
@property
@functools.cache
def affected_slots(self) -> tuple[int, ...]:
return tuple({k.slot for k in self.arguments if isinstance(k, Key)})
@property
@functools.cache
def slots_to_keys(self) -> dict[int, list[tuple[int, RedisValueT]]]:
mapping: dict[int, list[tuple[int, RedisValueT]]] = {}
for idx, k in enumerate([k for k in self.arguments if isinstance(k, Key)]):
mapping.setdefault(k.slot, []).append((idx, k.key))
return mapping
@property
@functools.cache
def key_indices(self) -> tuple[int, ...]:
indices: list[int] = []
for idx, arg in enumerate(self.arguments):
if isinstance(arg, Key):
indices.append(idx)
return tuple(indices)
def run(self) -> Awaitable[CommandResponseT]:
assert self.resolver
if not hasattr(self, "_response"):
self._response = self.resolver(self)
return self._response
[docs]
def retry(
self,
policy: RetryPolicy,
failure_hook: Callable[..., Awaitable[Any] | None]
| dict[type[BaseException], Callable[..., Awaitable[None] | None]]
| None = None,
) -> CommandRequest[CommandResponseT]:
"""
:param policy: Retry policy to use
:param failure_hook: if provided and is a callable it will be
called after catching any retryable exception and before retrying. If it is a mapping
of exception types to callables, the first exception type that is a parent
of any encountered exception will be called.
:return: A retryable version of the command object
Calling ``retry`` is essentially the same as explicitly using
a retry policy when calling a redis command. For example the following two
examples that try to push to a list upto 2 times if a :exc:`~coredis.exceptions.RedisError`
is encountered, are equivalent.
With :meth:`retry`::
async with coredis.Redis() as client:
await client.lpush("mylist", [1,2,3]).retry(
coredis.retry.ConstantRetryPolicy(
(coredis.exceptions.RedisError,), retries=2, delay=1
),
)
Explicitly retrying::
retry_policy = coredis.retry.ConstantRetryPolicy(
(coredis.exceptions.RedisError,), retries=2, delay=1
)
async with coredis.Redis() as client:
await retry_policy.call_with_retries(lambda:client.lpush("mylist", [1,2,3])
"""
return RetryableCommandRequest(
self,
policy=policy,
failure_hook=failure_hook,
)
[docs]
def raw(self, decode_response: bool | None = None) -> CommandRequest[ResponseType]:
"""
:param decode_resonse: Whether to decode any bulk strings from the server.
If ``None`` the setting the client was configured with will be used.
:return: a command request object that when awaited will return the
original response from the server without any callback applied to transform
the response
For example when used with a redis command::
client = coredis.Redis(....)
async with client:
assert True == await client.set("fubar", 1)
assert b"OK" == await client.set("fubar", 1).raw(decode_response=False)
assert "OK" == await client.set("fubar", 1).raw(decode_response=True)
"""
return CommandRequest(
self.name,
*self.arguments,
execution_parameters=(
self.execution_parameters
if decode_response is None
else {**self.execution_parameters, **{"decode": decode_response}}
),
resolver=self.resolver,
type_adapter=self.type_adapter,
)
[docs]
def route(self, by: bytes | str | int) -> CommandRequest[CommandResponseT]:
"""
Explicitly set the node the command should be routed to in cluster mode.
:param by: either a key to hash by or a slot
For example to explicitly route the scan call to the cluster node serving slot "0"::
client = coredis.RedisCluster(...)
async with client:
cursor = None
while cursor != 0:
cursor, keys = await client.scan(cursor).route(0)
"""
from coredis.commands._routing import ExplicitSlotStrategy
self.routing_strategy = ExplicitSlotStrategy(by)
return self
def __await__(self) -> Generator[Any, Any, CommandResponseT]:
return self.run().__await__()
class TransformedCommandRequest(CommandRequest[TransformedResponse]):
def __init__(
self,
parent: CommandRequest[CommandResponseT],
transformer: type[TransformedResponse] | Callable[[CommandResponseT], TransformedResponse],
) -> None:
"""
A command request object that has a tranformed response based
on the :paramref:`transformer` parameter.
:param parent: The original command request that the transformer will
be applied to.
:param transformer: A type that was registered with the client
using :meth:`~coredis.typing.TypeAdapter.register_deserializer`
or decorated by :meth:`~coredis.typing.TypeAdapter.deserializer`
or a callable that takes a single argument (the original response)
and returns the transformed response.
"""
super().__init__(
parent.name,
*parent.arguments,
execution_parameters=parent.execution_parameters,
type_adapter=parent.type_adapter,
)
self.parent = parent
self.transformer = transformer
self.transform_func = cast(
Callable[..., TransformedResponse],
(
functools.partial(self.type_adapter.deserialize, return_type=self.transformer)
if is_type_like(self.transformer)
else self.transformer
),
)
def __await__(self) -> Generator[Any, Any, TransformedResponse]:
async def _parent() -> TransformedResponse:
original = await self.parent
return self.transform_func(original)
return _parent().__await__()
class RetryableCommandRequest(CommandRequest[CommandResponseT]):
def __init__(
self,
request: CommandRequest[CommandResponseT],
policy: RetryPolicy,
failure_hook: Callable[..., Awaitable[Any] | None]
| dict[type[BaseException], Callable[..., Awaitable[None] | None]]
| None = None,
**kwargs: Any,
) -> None:
"""
A retryable command request object.
:param request: The original request to retry
:param policy: The retry policy to use when executing the command
:param failure_hook: if provided and is a callable it will be
called after catching any retryable exception and before retrying. If it is a mapping
of exception types to callables, the first exception type that is a parent
of any encountered exception will be called.
"""
self.policy = policy
self.failure_hook = failure_hook
super().__init__(
request.name,
*request.arguments,
callback=request.callback,
execution_parameters=request.execution_parameters,
resolver=request.resolver,
policy=self.policy,
type_adapter=request.type_adapter,
failure_hook=self.failure_hook,
**kwargs,
)
def __await__(self) -> Generator[Any, Any, CommandResponseT]:
return self.policy.call_with_retries(
lambda: copy.copy(self).run(), failure_hook=self.failure_hook
).__await__()