Source code for coredis.modules.graph

from __future__ import annotations

from datetime import timedelta

from ..commands._utils import normalized_milliseconds
from ..commands._wrappers import ClusterCommandConfig
from ..commands.constants import CommandGroup, CommandName, NodeFlag
from ..response._callbacks import (
    ClusterEnsureConsistent,
    ClusterMergeSets,
    ListCallback,
    SetCallback,
    SimpleStringCallback,
)
from ..tokens import PrefixToken, PureToken
from ..typing import (
    AnyStr,
    CommandArgList,
    Dict,
    KeyT,
    List,
    Literal,
    Optional,
    Parameters,
    ResponsePrimitive,
    Set,
    StringT,
    Tuple,
    Union,
    ValueT,
)
from .base import Module, ModuleGroup, module_command
from .response._callbacks.graph import (
    ConfigGetCallback,
    GraphSlowLogCallback,
    QueryCallback,
)
from .response.types import GraphQueryResult, GraphSlowLogInfo


class RedisGraph(Module[AnyStr]):
    NAME = "graph"
    FULL_NAME = "RedisGraph"
    DESCRIPTION = """RedisGraph is a queryable Property Graph database that uses sparse matrices 
to represent the adjacency matrix in graphs and linear algebra to query the graph.
    """
    DOCUMENTATION_URL = "https://redis.io/docs/stack/graph/"


[docs] class Graph(ModuleGroup[AnyStr]): MODULE = RedisGraph COMMAND_GROUP = CommandGroup.GRAPH
[docs] @module_command( CommandName.GRAPH_QUERY, module=MODULE, version_introduced="1.0.0", group=COMMAND_GROUP, ) async def query( self, graph: KeyT, query: StringT, timeout: Optional[Union[int, timedelta]] = None, ) -> GraphQueryResult[AnyStr]: """ Executes the given query against a specified graph :param graph: The name of the graph to query. :param query: The query to execute. :param timeout: The maximum amount of time (milliseconds) to wait for the query to complete :return: The result set of the executed query. """ pieces: CommandArgList = [graph, query] if timeout is not None: pieces.extend([PrefixToken.TIMEOUT, normalized_milliseconds(timeout)]) pieces.append(b"--compact") return await self.execute_module_command( CommandName.GRAPH_QUERY, *pieces, callback=QueryCallback[AnyStr](graph), query=query, )
[docs] @module_command( CommandName.GRAPH_RO_QUERY, module=MODULE, version_introduced="2.2.8", group=COMMAND_GROUP, ) async def ro_query( self, graph: KeyT, query: StringT, timeout: Optional[Union[int, timedelta]] = None, ) -> GraphQueryResult[AnyStr]: """ Executes a given read only query against a specified graph :param graph: The name of the graph to query. :param query: The query to execute. :param timeout: The maximum amount of time (milliseconds) to wait for the query to complete. :return: The result set for the read-only query or an error if a write query was given. """ pieces: CommandArgList = [graph, query] if timeout is not None: pieces.extend([PrefixToken.TIMEOUT, normalized_milliseconds(timeout)]) pieces.append(b"--compact") return await self.execute_module_command( CommandName.GRAPH_RO_QUERY, *pieces, callback=QueryCallback[AnyStr](graph), query=query, )
[docs] @module_command( CommandName.GRAPH_DELETE, module=MODULE, version_introduced="1.0.0", group=COMMAND_GROUP, ) async def delete(self, graph: KeyT) -> bool: """ Completely removes the graph and all of its entities Deletes the entire graph and all of its entities. :param graph: The name of the graph to be deleted. """ return await self.execute_module_command( CommandName.GRAPH_DELETE, graph, callback=SimpleStringCallback( prefix_match=True, ok_values={"Graph removed"} ), )
[docs] @module_command( CommandName.GRAPH_EXPLAIN, module=MODULE, version_introduced="2.0.0", group=COMMAND_GROUP, ) async def explain(self, graph: KeyT, query: StringT) -> List[AnyStr]: """ Constructs a query execution plan for the given :paramref:`graph` and :paramref:`query`, but does not execute it. :param graph: The name of the graph to execute the query on. :param query: The query to construct the execution plan for. :return: A list of strings representing the query execution plan. """ return await self.execute_module_command( CommandName.GRAPH_EXPLAIN, graph, query, callback=ListCallback[AnyStr]() )
[docs] @module_command( CommandName.GRAPH_PROFILE, module=MODULE, version_introduced="2.0.0", group=COMMAND_GROUP, ) async def profile( self, graph: KeyT, query: StringT, timeout: Optional[Union[int, timedelta]] = None, ) -> List[AnyStr]: """ Executes a query and returns an execution plan augmented with metrics for each operation's execution :param graph: The name of the graph to execute the query on. :param query: The query to execute and return a profile for. :param timeout: Optional timeout for the query execution in milliseconds. :return: A string representation of a query execution plan, with details on results produced by and time spent in each operation. """ pieces: CommandArgList = [graph, query] if timeout is not None: pieces.extend([PrefixToken.TIMEOUT, normalized_milliseconds(timeout)]) return await self.execute_module_command( CommandName.GRAPH_PROFILE, *pieces, callback=ListCallback[AnyStr]() )
[docs] @module_command( CommandName.GRAPH_SLOWLOG, module=MODULE, version_introduced="2.0.12", arguments={"reset": {"version_introduced": "2.12.0"}}, group=COMMAND_GROUP, ) async def slowlog( self, graph: KeyT, reset: bool = False ) -> Union[Tuple[GraphSlowLogInfo, ...], bool]: """ Returns a list containing up to 10 of the slowest queries issued against the given graph :param graph: The name of the graph :param reset: If ``True``, the slowlog will be reset :return: The slowlog for the given graph or ``True`` if the slowlog was reset """ pieces: CommandArgList = [graph] if reset: pieces.append(PureToken.RESET) return await self.execute_module_command( CommandName.GRAPH_SLOWLOG, *pieces, callback=SimpleStringCallback(), ) else: return await self.execute_module_command( CommandName.GRAPH_SLOWLOG, *pieces, callback=GraphSlowLogCallback(), )
[docs] @module_command( CommandName.GRAPH_CONFIG_GET, module=MODULE, version_introduced="2.2.11", group=COMMAND_GROUP, cluster=ClusterCommandConfig( route=NodeFlag.RANDOM, ), ) async def config_get( self, name: StringT ) -> Union[Dict[AnyStr, ResponsePrimitive], ResponsePrimitive]: """ Retrieves a RedisGraph configuration :param name: The name of the configuration parameter to retrieve. :return: The value of the configuration parameter. If :paramref:`name` is ``*``, a mapping of all configuration parameters to their values """ return await self.execute_module_command( CommandName.GRAPH_CONFIG_GET, name, callback=ConfigGetCallback[AnyStr](), )
[docs] @module_command( CommandName.GRAPH_CONFIG_SET, module=MODULE, version_introduced="2.2.11", group=COMMAND_GROUP, cluster=ClusterCommandConfig( route=NodeFlag.PRIMARIES, combine=ClusterEnsureConsistent[AnyStr](), ), ) async def config_set(self, name: StringT, value: ValueT) -> bool: """ Updates a RedisGraph configuration :param name: The name of the configuration parameter to set. :param value: The value to set the configuration parameter to. :return: True if the configuration parameter was set successfully, False otherwise. """ return await self.execute_module_command( CommandName.GRAPH_CONFIG_SET, name, value, callback=SimpleStringCallback() )
[docs] @module_command( CommandName.GRAPH_LIST, module=MODULE, version_introduced="2.4.3", group=COMMAND_GROUP, cluster=ClusterCommandConfig( route=NodeFlag.PRIMARIES, combine=ClusterMergeSets[AnyStr](), ), ) async def list(self) -> Set[AnyStr]: """ Lists all graph keys in the keyspace :return: A list of graph keys in the keyspace. """ return await self.execute_module_command( CommandName.GRAPH_LIST, callback=SetCallback[AnyStr]() )
[docs] @module_command( CommandName.GRAPH_CONSTRAINT_DROP, module=MODULE, version_introduced="2.12.0", group=COMMAND_GROUP, ) async def constraint_drop( self, graph: KeyT, type: Literal[PureToken.MANDATORY, PureToken.UNIQUE], node: Optional[StringT] = None, relationship: Optional[StringT] = None, properties: Optional[Parameters[StringT]] = None, ) -> bool: """ Deletes a constraint from specified graph :param graph: The name of the RedisGraph. :param type: The type of constraint to drop. :param node: The name of the node to drop the constraint from :param relationship: The name of the relationship to drop the constraint from :param properties: The properties to drop the constraint from :return: True if the constraint was successfully dropped, False otherwise. """ pieces: CommandArgList = [graph, type] if node is not None: pieces.extend([PrefixToken.NODE, node]) if relationship is not None: pieces.extend([PrefixToken.RELATIONSHIP, relationship]) if properties: _props: List[StringT] = list(properties) pieces.extend([PrefixToken.PROPERTIES, len(_props), *_props]) return await self.execute_module_command( CommandName.GRAPH_CONSTRAINT_DROP, *pieces, callback=SimpleStringCallback(), )
[docs] @module_command( CommandName.GRAPH_CONSTRAINT_CREATE, module=MODULE, version_introduced="2.12.0", group=COMMAND_GROUP, ) async def constraint_create( self, graph: KeyT, type: Literal[PureToken.MANDATORY, PureToken.UNIQUE], node: Optional[StringT] = None, relationship: Optional[StringT] = None, properties: Optional[Parameters[StringT]] = None, ) -> bool: """ Creates a constraint on specified graph :param graph: The name of the graph. :param type: The type of constraint to create. :param node: The label of the node to apply the constraint to. :param relationship: The type of relationship to apply the constraint to. :param properties: The properties to apply the constraint to. :return: True if the constraint was created successfully, False otherwise. """ pieces: CommandArgList = [graph, type] if node is not None: pieces.extend([PrefixToken.NODE, node]) if relationship is not None: pieces.extend([PrefixToken.RELATIONSHIP, relationship]) if properties: _props: List[StringT] = list(properties) pieces.extend([PrefixToken.PROPERTIES, len(_props), *_props]) return await self.execute_module_command( CommandName.GRAPH_CONSTRAINT_CREATE, *pieces, callback=SimpleStringCallback(ok_values={"PENDING"}), )