Source code for coredis.commands.monitor

from __future__ import annotations

import asyncio
import threading
from asyncio import AbstractEventLoop, CancelledError
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any

from coredis.commands.constants import CommandName
from coredis.exceptions import RedisError
from coredis.response.types import MonitorResult
from coredis.typing import AnyStr, Callable, Generator, Generic, Optional, TypeVar

if TYPE_CHECKING:
    import coredis.client
    import coredis.connection

MonitorT = TypeVar("MonitorT", bound="Monitor[Any]")


[docs] class Monitor(Generic[AnyStr]): """ Monitor is useful for handling the ``MONITOR`` command to the redis server. It can be used as an infinite async iterator:: async for command in client.monitor(): print(command.time, command.client_type, command.command, command.args) Alternatively, each command can be fetched explicitly:: monitor = client.monitor() command1 = await monitor.get_command() command2 = await monitor.get_command() monitor.stop() """ def __init__(self, client: coredis.client.Client[AnyStr]): self.client: coredis.client.Client[AnyStr] = client self.encoding = client.encoding self.connection: Optional[coredis.connection.Connection] = None self.monitoring = False def __aiter__(self) -> Monitor[AnyStr]: return self async def __anext__(self) -> MonitorResult: """ Infinite iterator that streams back the next command processed by the monitored server. """ return await self.get_command() def __await__(self: MonitorT) -> Generator[Any, None, MonitorT]: return self.__start_monitor().__await__()
[docs] async def get_command(self) -> MonitorResult: """ Wait for the next command issued and return the details """ await self.__start_monitor() assert self.connection response = await self.connection.fetch_push_message(block=True) if isinstance(response, bytes): response = response.decode(self.encoding) assert isinstance(response, str) return MonitorResult.parse_response_string(response)
[docs] async def stop(self) -> None: """ Stop monitoring by issuing a ``RESET`` command and release the connection. """ return await self.__stop_monitoring()
[docs] def run_in_thread( self, response_handler: Callable[[MonitorResult], None], loop: Optional[AbstractEventLoop] = None, ) -> MonitorThread: """ Runs the monitor in a :class:`MonitorThread` and invokes :paramref:`response_handler` for every command received. To stop the processing call :meth:`MonitorThread.stop` on the instance returned by this method. """ monitor_thread = MonitorThread( self, loop or asyncio.get_event_loop(), response_handler ) monitor_thread.start() return monitor_thread
async def __connect(self) -> None: if self.connection is None: self.connection = await self.client.connection_pool.get_connection() async def __start_monitor(self: MonitorT) -> MonitorT: if self.monitoring: return self await self.__connect() assert self.connection request = await self.connection.create_request( CommandName.MONITOR, decode=False ) response = await request if not response == b"OK": # noqa raise RedisError(f"Failed to start MONITOR {response!r}") self.monitoring = True return self async def __stop_monitoring(self) -> None: if self.connection: request = await self.connection.create_request( CommandName.RESET, decode=False ) response = await request if not response == CommandName.RESET: # noqa raise RedisError("Failed to reset connection") self.__reset() def __reset(self) -> None: if self.connection: self.connection.disconnect() self.client.connection_pool.release(self.connection) self.monitoring = False self.connection = None
[docs] class MonitorThread(threading.Thread): """ Thread to be used to run monitor """ def __init__( self, monitor: Monitor[Any], loop: asyncio.events.AbstractEventLoop, response_handler: Callable[[MonitorResult], None], ): self._monitor = monitor self._loop = loop self._response_handler = response_handler self._future: Optional[Future[None]] = None super().__init__()
[docs] def run(self) -> None: self._future = asyncio.run_coroutine_threadsafe(self._run(), self._loop)
async def _run(self) -> None: try: async for command in self._monitor: self._response_handler(command) except CancelledError: await self._monitor.stop() def stop(self) -> None: if self._future: self._future.cancel()