[docs]classMonitor(Generic[AnyStr]):""" Monitor is useful for handling the ``MONITOR`` command to the redis server. It can be used as an infinite async iterator:: async for command in client.monitor(): print(command.time, command.client_type, command.command, command.args) Alternatively, each command can be fetched explicitly:: monitor = client.monitor() command1 = await monitor.get_command() command2 = await monitor.get_command() monitor.stop() """def__init__(self,client:coredis.client.Client[AnyStr]):self.client:coredis.client.Client[AnyStr]=clientself.encoding=client.encodingself.connection:Optional[coredis.connection.Connection]=Noneself.monitoring=Falsedef__aiter__(self)->Monitor[AnyStr]:returnselfasyncdef__anext__(self)->MonitorResult:""" Infinite iterator that streams back the next command processed by the monitored server. """returnawaitself.get_command()def__await__(self:MonitorT)->Generator[Any,None,MonitorT]:returnself.__start_monitor().__await__()
[docs]asyncdefget_command(self)->MonitorResult:""" Wait for the next command issued and return the details """awaitself.__start_monitor()assertself.connectionresponse=awaitself.connection.fetch_push_message(block=True)ifisinstance(response,bytes):response=response.decode(self.encoding)assertisinstance(response,str)returnMonitorResult.parse_response_string(response)
[docs]asyncdefstop(self)->None:""" Stop monitoring by issuing a ``RESET`` command and release the connection. """returnawaitself.__stop_monitoring()
[docs]defrun_in_thread(self,response_handler:Callable[[MonitorResult],None],loop:Optional[AbstractEventLoop]=None,)->MonitorThread:""" Runs the monitor in a :class:`MonitorThread` and invokes :paramref:`response_handler` for every command received. To stop the processing call :meth:`MonitorThread.stop` on the instance returned by this method. """monitor_thread=MonitorThread(self,looporasyncio.get_event_loop(),response_handler)monitor_thread.start()returnmonitor_thread
asyncdef__connect(self)->None:ifself.connectionisNone:self.connection=awaitself.client.connection_pool.get_connection()asyncdef__start_monitor(self:MonitorT)->MonitorT:ifself.monitoring:returnselfawaitself.__connect()assertself.connectionrequest=awaitself.connection.create_request(CommandName.MONITOR,decode=False)response=awaitrequestifnotresponse==b"OK":# noqaraiseRedisError(f"Failed to start MONITOR {response!r}")self.monitoring=Truereturnselfasyncdef__stop_monitoring(self)->None:ifself.connection:request=awaitself.connection.create_request(CommandName.RESET,decode=False)response=awaitrequestifnotresponse==CommandName.RESET:# noqaraiseRedisError("Failed to reset connection")self.__reset()def__reset(self)->None:ifself.connection:self.connection.disconnect()self.client.connection_pool.release(self.connection)self.monitoring=Falseself.connection=None
[docs]classMonitorThread(threading.Thread):""" Thread to be used to run monitor """def__init__(self,monitor:Monitor[Any],loop:asyncio.events.AbstractEventLoop,response_handler:Callable[[MonitorResult],None],):self._monitor=monitorself._loop=loopself._response_handler=response_handlerself._future:Optional[Future[None]]=Nonesuper().__init__()