from __future__ import annotations
import dataclasses
import datetime
import re
import shlex
from typing import Pattern
from coredis.typing import (
AbstractSet,
ClassVar,
Dict,
List,
Literal,
Mapping,
NamedTuple,
Optional,
OrderedDict,
Set,
StringT,
Tuple,
TypedDict,
Union,
ValueT,
)
#: Response from `CLIENT INFO <https://redis.io/commands/client-info>`__
#:
#: - ``id``: a unique 64-bit client ID
#: - ``addr``: address/port of the client
#: - ``laddr``: address/port of local address client connected to (bind address)
#: - ``fd``: file descriptor corresponding to the socket
#: - ``name``: the name set by the client with CLIENT SETNAME
#: - ``age``: total duration of the connection in seconds
#: - ``idle``: idle time of the connection in seconds
#: - ``flags``: client flags
#: - ``db``: current database ID
#: - ``sub``: number of channel subscriptions
#: - ``psub``: number of pattern matching subscriptions
#: - ``multi``: number of commands in a MULTI/EXEC context
#: - ``qbuf``: query buffer length (0 means no query pending)
#: - ``qbuf-free``: free space of the query buffer (0 means the buffer is full)
#: - ``argv-mem``: incomplete arguments for the next command (already extracted from query buffer)
#: - ``multi-mem``: memory is used up by buffered multi commands. Added in Redis 7.0
#: - ``obl``: output buffer length
#: - ``oll``: output list length (replies are queued in this list when the buffer is full)
#: - ``omem``: output buffer memory usage
#: - ``tot-mem``: total memory consumed by this client in its various buffers
#: - ``events``: file descriptor events
#: - ``cmd``: last command played
#: - ``user``: the authenticated username of the client
#: - ``redir``: client id of current client tracking redirection
#: - ``resp``: client RESP protocol version. Added in Redis 7.0
ClientInfo = TypedDict(
"ClientInfo",
{
"id": int,
"addr": str,
"laddr": str,
"fd": int,
"name": str,
"age": int,
"idle": int,
"flags": str,
"db": int,
"sub": int,
"psub": int,
"multi": int,
"qbuf": int,
"qbuf-free": int,
"argv-mem": int,
"multi-mem": int,
"obl": int,
"oll": int,
"omem": int,
"tot-mem": int,
"events": str,
"cmd": str,
"user": str,
"redir": int,
"resp": str,
},
)
#: Script/Function flags
#: See: `<https://redis.io/topics/lua-api#a-namescriptflagsa-script-flags>`__
ScriptFlag = Literal[
"no-writes",
"allow-oom",
"allow-stale",
"no-cluster",
b"no-writes",
b"allow-oom",
b"allow-stale",
b"no-cluster",
]
[docs]
class FunctionDefinition(TypedDict):
"""
Function definition as returned by `FUNCTION LIST <https://redis.io/commands/function-list>`__
"""
#: the name of the function
name: StringT
#: the description of the function
description: StringT
#: function flags
flags: Set[ScriptFlag]
[docs]
class LibraryDefinition(TypedDict):
"""
Library definition as returned by `FUNCTION LIST <https://redis.io/commands/function-list>`__
"""
#: the name of the library
name: StringT
#: the engine used by the library
engine: Literal["LUA"]
#: the library's description
description: StringT
#: Mapping of function names to functions in the library
functions: Dict[StringT, FunctionDefinition]
#: The library's source code
library_code: Optional[StringT]
[docs]
class ScoredMember(NamedTuple):
"""
Member of a sorted set
"""
#: The sorted set member name
member: StringT
#: Score of the member
score: float
[docs]
class GeoCoordinates(NamedTuple):
"""
A longitude/latitude pair identifying a location
"""
#: Longitude
longitude: float
#: Latitude
latitude: float
ScoredMembers = Tuple[ScoredMember, ...]
[docs]
class GeoSearchResult(NamedTuple):
"""
Structure of a geo query
"""
#: Place name
name: StringT
#: Distance
distance: Optional[float]
#: GeoHash
geohash: Optional[int]
#: Lat/Lon
coordinates: Optional[GeoCoordinates]
#: Definition of a redis command
#: See: `<https://redis.io/topics/key-specs>`__
#:
#: - ``name``: This is the command's name in lowercase
#: - ``arity``: Arity is the number of arguments a command expects.
#: - ``flags``: See `<https://redis.io/commands/command#flags>`__
#: - ``first-key``: This value identifies the position of the command's first key name argumen
#: - ``last-key``: This value identifies the position of the command's last key name argument
#: - ``step``: This value is the step, or increment, between the first key and last key values
#: where the keys are.
#: - ``acl-categories``: This is an array of simple strings that are the ACL categories to which
#: the command belongs
#: - ``tips``: Helpful information about the command
#: - ``key-specification``: This is an array consisting of the command's key specifications
#: - ``sub-commands``: This is an array containing all of the command's subcommands, if any
Command = TypedDict(
"Command",
{
"name": str,
"arity": int,
"flags": AbstractSet[str],
"first-key": int,
"last-key": int,
"step": int,
"acl-categories": Optional[AbstractSet[str]],
"tips": Optional[AbstractSet[str]],
"key-specifications": Optional[
AbstractSet[Mapping[str, Union[int, str, Mapping]]] # type: ignore
],
"sub-commands": Optional[Tuple[str, ...]],
},
)
[docs]
class RoleInfo(NamedTuple):
"""
Redis instance role information
"""
#:
role: str
#:
offset: Optional[int] = None
#:
status: Optional[str] = None
#:
slaves: Optional[Tuple[Dict[str, Union[str, int]], ...]] = None
#:
masters: Optional[Tuple[str, ...]] = None
[docs]
class StreamEntry(NamedTuple):
"""
Structure representing an entry in a redis stream
"""
identifier: StringT
field_values: OrderedDict[StringT, StringT]
#: Details of a stream
#: See: `<https://redis.io/commands/xinfo-stream>`__
StreamInfo = TypedDict(
"StreamInfo",
{
"first-entry": Optional[StreamEntry],
"last-entry": Optional[StreamEntry],
"length": int,
"radix-tree-keys": int,
"radix-tree-nodes": int,
"groups": Union[int, List[Dict]], # type: ignore
"last-generated-id": str,
"max-deleted-entry-id": str,
"recorded-first-entry-id": str,
"entries-added": int,
"entries-read": int,
"entries": Optional[Tuple[StreamEntry, ...]],
},
)
[docs]
class StreamPending(NamedTuple):
"""
Summary response from
`XPENDING <https://redis.io/commands/xpending#summary-form-of-xpending>`__
"""
pending: int
minimum_identifier: StringT
maximum_identifier: StringT
consumers: OrderedDict[StringT, int]
[docs]
class StreamPendingExt(NamedTuple):
"""
Extended form response from
`XPENDING <https://redis.io/commands/xpending#extended-form-of-xpending>`__
"""
identifier: StringT
consumer: StringT
idle: int
delivered: int
#: Response from `SLOWLOG GET <https://redis.io/commands/slowlog-get>`__
[docs]
class SlowLogInfo(NamedTuple):
#: A unique progressive identifier for every slow log entry.
id: int
#: The unix timestamp at which the logged command was processed.
start_time: int
#: The amount of time needed for its execution, in microseconds.
duration: int
#: The array composing the arguments of the command.
command: Tuple[StringT, ...]
#: Client IP address and port
client_addr: Tuple[StringT, int]
#: Client name
client_name: str
[docs]
class LCSMatch(NamedTuple):
"""
An instance of an LCS match
"""
#: Start/end offset of the first string
first: Tuple[int, int]
#: Start/end offset of the second string
second: Tuple[int, int]
#: Length of the match
length: Optional[int]
[docs]
class LCSResult(NamedTuple):
"""
Results from `LCS <https://redis.io/commands/lcs>`__
"""
#: matches
matches: Tuple[LCSMatch, ...]
#: Length of longest match
length: int
[docs]
@dataclasses.dataclass
class MonitorResult:
"""
Details of issued commands received by the client when
listening with the `MONITOR <https://redis.io/commands/monitor>`__
command
"""
#: Time command was received
time: datetime.datetime
#: db number
db: int
#: (host, port) or path if the server is listening on a unix domain socket
client_addr: Optional[Union[Tuple[str, int], str]]
#: The type of the client that send the command
client_type: Literal["tcp", "unix", "lua"]
#: The name of the command
command: str
#: Arguments passed to the command
args: Optional[Tuple[str, ...]]
EXPR: ClassVar[Pattern[str]] = re.compile(r"\[(\d+) (.*?)\] (.*)$")
@classmethod
def parse_response_string(cls, response: str) -> MonitorResult:
command_time, command_data = response.split(" ", 1)
match = cls.EXPR.match(command_data)
assert match
db_id, client_info, command = match.groups()
command = shlex.split(command)
client_addr = None
client_type: Literal["tcp", "unix", "lua"]
if client_info == "lua":
client_type = "lua"
elif client_info.startswith("unix"):
client_type = "unix"
client_addr = client_info[5:]
else:
host, port = client_info.rsplit(":", 1)
client_addr = (host, int(port))
client_type = "tcp"
return cls(
time=datetime.datetime.fromtimestamp(float(command_time)),
db=int(db_id),
client_addr=client_addr,
client_type=client_type,
command=command[0],
args=tuple(command[1:]),
)
[docs]
class ClusterNode(TypedDict):
host: str
port: int
node_id: Optional[str]
server_type: Optional[Literal["master", "slave"]]
[docs]
class ClusterNodeDetail(TypedDict):
id: str
flags: Tuple[str, ...]
host: str
port: int
master: Optional[str]
ping_sent: int
pong_recv: int
link_state: str
slots: List[int]
migrations: List[Dict[str, ValueT]]
[docs]
class PubSubMessage(TypedDict):
#: One of the following:
#:
#: subscribe
#: Server response when a client subscribes to a channel(s)
#: unsubscribe
#: Server response when a client unsubscribes from a channel(s)
#: psubscribe
#: Server response when a client subscribes to a pattern(s)
#: punsubscribe
#: Server response when a client unsubscribes from a pattern(s)
#: ssubscribe
#: Server response when a client subscribes to a shard channel(s)
#: sunsubscribe
#: Server response when a client unsubscribes from a shard channel(s)
#: message
#: A message received from subscribing to a channel
#: pmessage
#: A message received from subscribing to a pattern
type: str
#: The channel subscribed to or unsubscribed from or the channel a message was published to
channel: StringT
#: The pattern that was subscribed to or unsubscribed from or to which a received message was
#: routed to
pattern: Optional[StringT]
#: - If ``type`` is one of ``{message, pmessage}`` this is the actual published message
#: - If ``type`` is one of
#: ``{subscribe, psubscribe, ssubscribe, unsubscribe, punsubscribe, sunsubscribe}``
#: this will be an :class:`int` corresponding to the number of channels and patterns that the
#: connection is currently subscribed to.
data: Union[int, StringT]