Source code for aiomonitor.monitor

from __future__ import annotations

import asyncio
import contextvars
import functools
import logging
import os
import shlex
import signal
import sys
import threading
import time
import traceback
import weakref
from asyncio.coroutines import _format_coroutine  # type: ignore
from contextvars import ContextVar, copy_context
from datetime import timedelta
from types import TracebackType
from typing import (

import click
import janus
from click.parser import split_arg_string
from click.shell_completion import CompletionItem, _resolve_context, _resolve_incomplete
from prompt_toolkit import PromptSession
from prompt_toolkit.application.current import get_app_session
from prompt_toolkit.completion import CompleteEvent, Completer, Completion
from prompt_toolkit.contrib.telnet.server import TelnetConnection, TelnetServer
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.shortcuts import print_formatted_text
from terminaltables import AsciiTable

from . import console
from .task import TracedTask, persistent_coro
from .types import CancellationChain, TerminatedTaskInfo
from .utils import (

__all__ = (

log = logging.getLogger(__name__)
current_monitor: ContextVar[Monitor] = ContextVar("current_monitor")
current_stdout: ContextVar[TextIO] = ContextVar("current_stdout")
command_done: ContextVar[asyncio.Event] = ContextVar("command_done")

MONITOR_HOST: Final = ""
MONITOR_PORT: Final = 50101
CONSOLE_PORT: Final = 50102

T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True), add_help_option=False)
def monitor_cli():
    To see the usage of each command, run them with "--help" option.

def _get_current_stdout() -> TextIO:
    stdout = current_stdout.get(None)
    if stdout is None:
        return sys.stdout
        return stdout

def _get_current_stderr() -> TextIO:
    stdout = current_stdout.get(None)
    if stdout is None:
        return sys.stderr
        return stdout

class ClickCompleter(Completer):
    def __init__(self, root_command: click.BaseCommand) -> None:
        self._root_command = root_command

    def get_completions(
        document: Document,
        complete_event: CompleteEvent,
    ) -> Iterable[Completion]:
        args = split_arg_string(document.current_line)
        incomplete = document.get_word_under_cursor()
        if incomplete and args and args[-1] == incomplete:
        click_ctx = _resolve_context(self._root_command, {}, "", args)
        cmd_or_param, incomplete = _resolve_incomplete(click_ctx, args, incomplete)
        completions: List[CompletionItem] = cmd_or_param.shell_complete(
            click_ctx, incomplete
        start_position = document.find_backwards(incomplete) or 0
        for completion in completions:
            yield Completion(

def complete_task_id(
    ctx: click.Context,
    param: click.Parameter,
    incomplete: str,
) -> Iterable[str]:
    # ctx here is created in the completer and does not have ctx.obj set as monitor.
    # We take the monitor instance from the global context variable instead.
        self: Monitor = current_monitor.get()
    except LookupError:
        return []
    return [
        for task_id in map(str, sorted(map(id, all_tasks(loop=self._monitored_loop))))
        if task_id.startswith(incomplete)

def complete_trace_id(
    ctx: click.Context,
    param: click.Parameter,
    incomplete: str,
) -> Iterable[str]:
        self: Monitor = current_monitor.get()
    except LookupError:
        return []
    return [
        for trace_id in map(str, sorted(self._terminated_tasks.keys()))
        if trace_id.startswith(incomplete)

def complete_signal_names(
    ctx: click.Context,
    param: click.Parameter,
    incomplete: str,
) -> Iterable[str]:
    return [ for sig in signal.Signals if]

[docs]class Monitor: _event_loop_thread_id: Optional[int] = None _cmd_prefix = "do_" _empty_result = object() intro = "\nAsyncio Monitor: {tasknum} task{s} running\nType help for available commands\n" # noqa help_template = "{cmd_name} {arg_list}\n {doc}\n" help_short_template = ( " {cmd_name}{cmd_arg_sep}{arg_list}: {doc_firstline}" # noqa ) console_locals: Dict[str, Any] _console_tasks: weakref.WeakSet[asyncio.Task[Any]] _created_traceback_chains: weakref.WeakKeyDictionary[ asyncio.Task[Any], weakref.ReferenceType[asyncio.Task[Any]], ] _created_tracebacks: weakref.WeakKeyDictionary[ asyncio.Task[Any], List[traceback.FrameSummary] ] _terminated_tasks: Dict[str, TerminatedTaskInfo] _terminated_history: List[str] _termination_info_queue: janus.Queue[TerminatedTaskInfo] _canceller_chain: Dict[str, str] _canceller_stacks: Dict[str, List[traceback.FrameSummary] | None] _cancellation_chain_queue: janus.Queue[CancellationChain] def __init__( self, loop: asyncio.AbstractEventLoop, *, host: str = MONITOR_HOST, port: int = MONITOR_PORT, console_port: int = CONSOLE_PORT, console_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: int = 1000, locals: Optional[Dict[str, Any]] = None, ) -> None: self._monitored_loop = loop or asyncio.get_running_loop() self._host = host self._port = port self._console_port = console_port self._console_enabled = console_enabled if locals is None: self.console_locals = {"__name__": "__console__", "__doc__": None} else: self.console_locals = locals self.prompt = "monitor >>> ""Starting aiomonitor at %s:%d", host, port) self._closed = False self._started = False self._console_tasks = weakref.WeakSet() self._hook_task_factory = hook_task_factory self._created_traceback_chains = weakref.WeakKeyDictionary() self._created_tracebacks = weakref.WeakKeyDictionary() self._terminated_tasks = {} self._canceller_chain = {} self._canceller_stacks = {} self._terminated_history = [] self._max_termination_history = max_termination_history self._ui_started = threading.Event() self._ui_thread = threading.Thread(target=self._ui_main, args=(), daemon=True) @property def host(self) -> str: return self._host @property def port(self) -> int: return self._port def __repr__(self) -> str: name = self.__class__.__name__ return "<{name}: {host}:{port}>".format( name=name, host=self._host, port=self._port )
[docs] def start(self) -> None: assert not self._closed assert not self._started self._started = True self._original_task_factory = self._monitored_loop.get_task_factory() if self._hook_task_factory: self._monitored_loop.set_task_factory(self._create_task) self._event_loop_thread_id = threading.get_ident() self._ui_thread.start() self._ui_started.wait() # Override the Click's stdout/stderr reference cache functions # to let them use the correct stdout handler. click.utils._default_text_stdout = _get_current_stdout click.utils._default_text_stderr = _get_current_stderr
@property def closed(self) -> bool: return self._closed def __enter__(self) -> Monitor: if not self._started: self.start() return self # exc_type should be Optional[Type[BaseException]], but # this runs into # on Python 3.5. def __exit__( self, exc_type: Any, exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: self.close()
[docs] def close(self) -> None: assert self._started, "The monitor must have been started to close it." if not self._closed: self._ui_loop.call_soon_threadsafe( self._ui_forever_future.cancel, ) self._ui_thread.join() self._monitored_loop.set_task_factory(self._original_task_factory) self._closed = True
async def _coro_wrapper(self, coro: Awaitable[T_co]) -> T_co: myself = asyncio.current_task() assert isinstance(myself, TracedTask) try: return await coro except BaseException as e: myself._termination_stack = _extract_stack_from_exception(e)[:-1] raise def _create_task( self, loop: asyncio.AbstractEventLoop, coro: Coroutine[Any, Any, T_co] | Generator[Any, None, T_co], *, name: str | None = None, context: contextvars.Context | None = None, ) -> asyncio.Future[T_co]: assert loop is self._monitored_loop try: parent_task = asyncio.current_task() except RuntimeError: parent_task = None persistent = coro in persistent_coro task = TracedTask( self._coro_wrapper(coro), # type: ignore termination_info_queue=self._termination_info_queue.sync_q, cancellation_chain_queue=self._cancellation_chain_queue.sync_q, persistent=persistent, loop=self._monitored_loop, name=name, # since Python 3.8 context=context, # since Python 3.11 ) task._orig_coro = cast(Coroutine[Any, Any, T_co], coro) self._created_tracebacks[task] = _extract_stack_from_frame(sys._getframe())[ :-1 ] # strip this wrapper method if parent_task is not None: self._created_traceback_chains[task] = weakref.ref(parent_task) return task def _ui_main(self) -> None: async def _ui_main_async(self) -> None: loop = asyncio.get_running_loop() self._termination_info_queue = janus.Queue() self._cancellation_chain_queue = janus.Queue() self._ui_loop = loop self._ui_forever_future = loop.create_future() self._ui_termination_handler_task = asyncio.create_task( self._ui_handle_termination_updates() ) self._ui_cancellation_handler_task = asyncio.create_task( self._ui_handle_cancellation_updates() ) telnet_server = TelnetServer( interact=self._interact, host=self._host, port=self._port ) telnet_server.start() await asyncio.sleep(0) self._ui_started.set() try: await self._ui_forever_future except asyncio.CancelledError: pass finally: console_tasks = {*self._console_tasks} for console_task in console_tasks: console_task.cancel() await asyncio.gather(*console_tasks, return_exceptions=True) self._ui_termination_handler_task.cancel() self._ui_cancellation_handler_task.cancel() await self._ui_termination_handler_task await self._ui_cancellation_handler_task await telnet_server.stop() async def _ui_handle_termination_updates(self) -> None: while True: try: update: TerminatedTaskInfo = ( await self._termination_info_queue.async_q.get() ) except asyncio.CancelledError: return self._terminated_tasks[] = update if not update.persistent: self._terminated_history.append( # canceller stack is already put in _ui_handle_cancellation_updates() if canceller_stack := self._canceller_stacks.pop(, None): update.canceller_stack = canceller_stack while len(self._terminated_history) > self._max_termination_history: removed_id = self._terminated_history.pop(0) self._terminated_tasks.pop(removed_id, None) self._canceller_chain.pop(removed_id, None) self._canceller_stacks.pop(removed_id, None) async def _ui_handle_cancellation_updates(self) -> None: while True: try: update: CancellationChain = ( await self._cancellation_chain_queue.async_q.get() ) except asyncio.CancelledError: return self._canceller_stacks[update.target_id] = update.canceller_stack self._canceller_chain[update.target_id] = update.canceller_id def print_ok(self, msg: str) -> None: print_formatted_text( FormattedText( [ ("ansibrightgreen", "✓ "), ("", msg), ] ) ) def print_fail(self, msg: str) -> None: print_formatted_text( FormattedText( [ ("ansibrightred", "✗ "), ("", msg), ] ) ) async def _interact(self, connection: TelnetConnection) -> None: """ The interactive loop for each telnet client connection. """ await asyncio.sleep(0.3) # wait until telnet negotiation is done tasknum = len(all_tasks(loop=self._monitored_loop)) s = "" if tasknum == 1 else "s" print(self.intro.format(tasknum=tasknum, s=s), file=connection.stdout) current_monitor_token = current_monitor.set(self) current_stdout_token = current_stdout.set(connection.stdout) # NOTE: prompt_toolkit's all internal console output automatically uses # an internal contextvar to keep the stdout consistent with the # current telnet connection. prompt_session: PromptSession[str] = PromptSession( completer=ClickCompleter(monitor_cli), complete_while_typing=False, ) lastcmd = "noop" style_prompt = "#5fd7ff bold" try: while True: try: user_input = ( await prompt_session.prompt_async( FormattedText( [ (style_prompt, self.prompt), ] ) ) ).strip() except KeyboardInterrupt: self.print_fail("To terminate, press Ctrl+D or type 'exit'.") except (EOFError, asyncio.CancelledError): return except Exception: self.print_fail(traceback.format_exc()) else: command_done_event = asyncio.Event() command_done_token = command_done.set(command_done_event) try: if not user_input and lastcmd is not None: user_input = lastcmd args = shlex.split(user_input) term_size = prompt_session.output.get_size() ctx = copy_context() monitor_cli.main, args, prog_name="", obj=self, standalone_mode=False, # type: ignore max_content_width=term_size.columns, ) await command_done_event.wait() if args[0] == "console": lastcmd = "noop" else: lastcmd = user_input except (click.BadParameter, click.UsageError) as e: self.print_fail(str(e)) except asyncio.CancelledError: return except Exception: self.print_fail(traceback.format_exc()) finally: command_done.reset(command_done_token) finally: current_stdout.reset(current_stdout_token) current_monitor.reset(current_monitor_token)
def auto_command_done(cmdfunc): @functools.wraps(cmdfunc) def _inner(ctx: click.Context, *args, **kwargs): command_done_event = command_done.get() try: return cmdfunc(ctx, *args, **kwargs) finally: command_done_event.set() return _inner def auto_async_command_done(cmdfunc): @functools.wraps(cmdfunc) async def _inner(ctx: click.Context, *args, **kwargs): command_done_event = command_done.get() try: return await cmdfunc(ctx, *args, **kwargs) finally: command_done_event.set() return _inner def custom_help_option(cmdfunc): """ A custom help option to ensure setting `command_done_event`. """ @auto_command_done def show_help(ctx: click.Context, param: click.Parameter, value: bool) -> None: if not value: return click.echo(ctx.get_help(), color=ctx.color) ctx.exit() return click.option( "--help", is_flag=True, expose_value=False, is_eager=True, callback=show_help, help="Show the help message", )(cmdfunc) @monitor_cli.command(name="noop", hidden=True) @auto_command_done def do_noop(ctx: click.Context) -> None: pass @monitor_cli.command(name="help", aliases=["?", "h"]) @custom_help_option @auto_command_done def do_help(ctx: click.Context) -> None: """Show the list of commands""" click.echo(monitor_cli.get_help(ctx)) @monitor_cli.command(name="signal") @click.argument("signame", type=str, shell_complete=complete_signal_names) @custom_help_option @auto_command_done def do_signal(ctx: click.Context, signame: str) -> None: """Send a Unix signal""" self: Monitor = ctx.obj if hasattr(signal, signame): os.kill(os.getpid(), getattr(signal, signame)) self.print_ok(f"Sent signal to {signame} PID {os.getpid()}") else: self.print_fail(f"Unknown signal {signame}") @monitor_cli.command(name="stacktrace", aliases=["st", "stack"]) @custom_help_option @auto_command_done def do_stacktrace(ctx: click.Context) -> None: """Print a stack trace from the event loop thread""" self: Monitor = ctx.obj stdout = _get_current_stdout() tid = self._event_loop_thread_id assert tid is not None frame = sys._current_frames()[tid] traceback.print_stack(frame, file=stdout) @monitor_cli.command(name="cancel", aliases=["ca"]) @click.argument("taskid", shell_complete=complete_task_id) @custom_help_option @auto_command_done def do_cancel(ctx: click.Context, taskid: str) -> None: """Cancel an indicated task""" self: Monitor = ctx.obj task_id = int(taskid) task = task_by_id(task_id, self._monitored_loop) if task: if self._monitored_loop == asyncio.get_running_loop(): asyncio.create_task(cancel_task(task)) else: fut = asyncio.run_coroutine_threadsafe( cancel_task(task), loop=self._monitored_loop ) fut.result(timeout=3) self.print_ok(f"Cancelled task {task_id}") else: self.print_fail(f"No task {task_id}") @monitor_cli.command(name="exit", aliases=["q", "quit"]) @custom_help_option @auto_command_done def do_exit(ctx: click.Context) -> None: """Leave the monitor client session""" raise asyncio.CancelledError("exit by user") @monitor_cli.command(name="console") @custom_help_option def do_console(ctx: click.Context) -> None: """Switch to async Python REPL""" self: Monitor = ctx.obj if not self._console_enabled: self.print_fail("Python console is disabled for this session!") return @auto_async_command_done async def _console(ctx: click.Context) -> None: h, p = self._host, self._console_port"Starting aioconsole at %s:%d", h, p) app_session = get_app_session() server = await console.start( self._host, self._console_port, self.console_locals, self._monitored_loop, ) try: await console.proxy( app_session.input, app_session.output, self._host, self._console_port, ) except asyncio.CancelledError: return finally: await console.close(server, self._monitored_loop)"Terminated aioconsole at %s:%d", h, p) self.print_ok("The console session is closed.") # Since we are already inside the UI's event loop, # spawn the async command function as a new task and let it # set `command_done_event` internally. task = asyncio.create_task(_console(ctx)) self._console_tasks.add(task) @monitor_cli.command(name="ps", aliases=["p"]) @click.option("-f", "--filter", "filter_", help="filter by coroutine or task name") @click.option("-p", "--persistent", is_flag=True, help="show only persistent tasks") @custom_help_option @auto_command_done def do_ps( ctx: click.Context, filter_: str, persistent: bool, ) -> None: """Show task table""" headers = ( "Task ID", "State", "Name", "Coroutine", "Created Location", "Since", ) self: Monitor = ctx.obj stdout = _get_current_stdout() table_data: List[Tuple[str, str, str, str, str, str]] = [headers] all_running_tasks = all_tasks(loop=self._monitored_loop) for task in sorted(all_running_tasks, key=id): taskid = str(id(task)) if isinstance(task, TracedTask): coro_repr = _format_coroutine(task._orig_coro).partition(" ")[0] if persistent and task._orig_coro not in persistent_coro: continue else: coro_repr = _format_coroutine(task.get_coro()).partition(" ")[0] if persistent: # untracked tasks should be skipped when showing persistent ones only continue if filter_ and (filter_ not in coro_repr and filter_ not in task.get_name()): continue creation_stack = self._created_tracebacks.get(task) # Some values are masked as "-" when they are unavailable # if it's the root task/coro or if the task factory is not applied. if not creation_stack: created_location = "-" else: creation_stack = _filter_stack(creation_stack) fn = _format_filename(creation_stack[-1].filename) lineno = creation_stack[-1].lineno created_location = f"{fn}:{lineno}" if isinstance(task, TracedTask): running_since = _format_timedelta( timedelta( seconds=(time.perf_counter() - task._started_at), ) ) else: running_since = "-" table_data.append( ( taskid, task._state, task.get_name(), coro_repr, created_location, running_since, ) ) table = AsciiTable(table_data) table.inner_row_border = False table.inner_column_border = False if filter_ or persistent: stdout.write( f"{len(all_running_tasks)} tasks running " f"(showing {len(table_data) - 1} tasks)\n" ) else: stdout.write(f"{len(all_running_tasks)} tasks running\n") stdout.write(table.table) stdout.write("\n") stdout.flush() @monitor_cli.command(name="ps-terminated", aliases=["pt", "pst"]) @click.option("-f", "--filter", "filter_", help="filter by coroutine or task name") @click.option("-p", "--persistent", is_flag=True, help="show only persistent tasks") @custom_help_option @auto_command_done def do_ps_terminated( ctx: click.Context, filter_: str, persistent: bool, ) -> None: """List recently terminated/cancelled tasks""" headers = ( "Trace ID", "Name", "Coro", "Since Started", "Since Terminated", ) self: Monitor = ctx.obj stdout = _get_current_stdout() table_data: List[Tuple[str, str, str, str, str]] = [headers] terminated_tasks = self._terminated_tasks.values() for item in sorted( terminated_tasks, key=lambda info: info.terminated_at, reverse=True, ): if persistent and not item.persistent: continue if filter_ and (filter_ not in item.coro and filter_ not in continue run_since = _format_timedelta( timedelta(seconds=time.perf_counter() - item.started_at) ) cancelled_since = _format_timedelta( timedelta(seconds=time.perf_counter() - item.terminated_at) ) table_data.append( ( str(,, item.coro, run_since, cancelled_since, ) ) table = AsciiTable(table_data) table.inner_row_border = False table.inner_column_border = False if filter_ or persistent: stdout.write( f"{len(terminated_tasks)} tasks terminated " f"(showing {len(table_data) - 1} tasks)\n" ) else: stdout.write( f"{len(terminated_tasks)} tasks terminated (old ones may be stripped)\n" ) stdout.write(table.table) stdout.write("\n") stdout.flush() @monitor_cli.command(name="where", aliases=["w"]) @click.argument("taskid", shell_complete=complete_task_id) @custom_help_option @auto_command_done def do_where(ctx: click.Context, taskid: str) -> None: """Show stack frames and the task creation chain of a task""" self: Monitor = ctx.obj stdout = _get_current_stdout() depth = 0 task_id = int(taskid) task = task_by_id(task_id, self._monitored_loop) if task is None: self.print_fail(f"No task {task_id}") return task_chain: List[asyncio.Task[Any]] = [] while task is not None: task_chain.append(task) task_ref = self._created_traceback_chains.get(task) task = task_ref() if task_ref is not None else None prev_task = None for task in reversed(task_chain): if depth == 0: stdout.write( "Stack of the root task or coroutine scheduled in the event loop (most recent call last):\n\n" ) elif depth > 0: assert prev_task is not None stdout.write( "Stack of %s when creating the next task (most recent call last):\n\n" % _format_task(prev_task) ) stack = self._created_tracebacks.get(task) if stack is None: stdout.write( " No stack available (maybe it is a native code, a synchronous callback function, or the event loop itself)\n" ) else: stack = _filter_stack(stack) stdout.write("".join(traceback.format_list(stack))) prev_task = task depth += 1 stdout.write("\n") task = task_chain[0] stdout.write("Stack of %s (most recent call last):\n\n" % _format_task(task)) stack = _extract_stack_from_task(task) if not stack: stdout.write(" No stack available for %s" % _format_task(task)) else: stdout.write("".join(traceback.format_list(stack))) stdout.write("\n") @monitor_cli.command(name="where-terminated", aliases=["wt"]) @click.argument("trace_id", shell_complete=complete_trace_id) @custom_help_option @auto_command_done def do_where_terminated(ctx: click.Context, trace_id: str) -> None: """Show stack frames and the termination/cancellation chain of a task""" self: Monitor = ctx.obj stdout = _get_current_stdout() depth = 0 tinfo_chain: List[TerminatedTaskInfo] = [] while trace_id is not None: tinfo_chain.append(self._terminated_tasks[trace_id]) trace_id = self._canceller_chain.get(trace_id) # type: ignore prev_tinfo = None for tinfo in reversed(tinfo_chain): if depth == 0: stdout.write( "Stack of the root task or coroutine scheduled in the event loop (most recent call last):\n\n" ) elif depth > 0: assert prev_tinfo is not None stdout.write( "Stack of %s when creating the next task (most recent call last):\n\n" % _format_terminated_task(prev_tinfo) ) stack = tinfo.canceller_stack if stack is None: stdout.write( " No stack available (maybe it is a self-raised cancellation or exception)\n" ) else: stack = _filter_stack(stack) stdout.write("".join(traceback.format_list(stack))) prev_tinfo = tinfo depth += 1 stdout.write("\n") tinfo = tinfo_chain[0] stdout.write( "Stack of %s (most recent call last):\n\n" % _format_terminated_task(tinfo) ) stack = tinfo.termination_stack if not stack: stdout.write( " No stack available for %s (the task has run to completion)" % _format_terminated_task(tinfo) ) else: stdout.write("".join(traceback.format_list(stack))) stdout.write("\n") def start_monitor( loop: asyncio.AbstractEventLoop, *, monitor_cls: Type[Monitor] = Monitor, host: str = MONITOR_HOST, port: int = MONITOR_PORT, console_port: int = CONSOLE_PORT, console_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: Optional[int] = None, locals: Optional[Dict[str, Any]] = None, ) -> Monitor: m = monitor_cls( loop, host=host, port=port, console_port=console_port, console_enabled=console_enabled, hook_task_factory=hook_task_factory, max_termination_history=max_termination_history or get_default_args(monitor_cls.__init__)["max_termination_history"], locals=locals, ) m.start() return m