from __future__ import annotations
import asyncio
import contextlib
import contextvars
import functools
import logging
import sys
import textwrap
import threading
import time
import traceback
import weakref
from asyncio.coroutines import _format_coroutine # type: ignore
from datetime import timedelta
from types import TracebackType
from typing import (
Any,
Awaitable,
Coroutine,
Dict,
Final,
Generator,
List,
Optional,
Sequence,
Type,
TypeVar,
cast,
)
import janus
from aiohttp import web
from prompt_toolkit.contrib.telnet.server import TelnetServer
from .exceptions import MissingTask
from .task import TracedTask, persistent_coro
from .termui.commands import interact
from .types import (
CancellationChain,
FormatItemTypes,
FormattedLiveTaskInfo,
FormattedStackItem,
FormattedTerminatedTaskInfo,
TerminatedTaskInfo,
)
from .utils import (
_extract_stack_from_exception,
_extract_stack_from_frame,
_extract_stack_from_task,
_filter_stack,
_format_filename,
_format_task,
_format_terminated_task,
_format_timedelta,
get_default_args,
)
from .webui.app import init_webui
__all__ = (
"Monitor",
"start_monitor",
)
log = logging.getLogger(__name__)
MONITOR_HOST: Final = "127.0.0.1"
MONITOR_TERMUI_PORT: Final = 20101
MONITOR_WEBUI_PORT: Final = 20102
CONSOLE_PORT: Final = 20103
T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)
def task_by_id(
taskid: int, loop: asyncio.AbstractEventLoop
) -> "Optional[asyncio.Task[Any]]":
tasks = asyncio.all_tasks(loop=loop)
return next(filter(lambda t: id(t) == taskid, tasks), None)
async def cancel_task(task: "asyncio.Task[Any]") -> None:
with contextlib.suppress(asyncio.CancelledError):
task.cancel()
await task
[docs]
class Monitor:
_event_loop_thread_id: Optional[int] = None
console_locals: Dict[str, Any]
_termui_tasks: weakref.WeakSet[asyncio.Task[Any]]
_created_traceback_chains: weakref.WeakKeyDictionary[
asyncio.Task[Any],
weakref.ReferenceType[asyncio.Task[Any]],
]
_created_tracebacks: weakref.WeakKeyDictionary[
asyncio.Task[Any], List[traceback.FrameSummary]
]
_terminated_tasks: Dict[str, TerminatedTaskInfo]
_terminated_history: List[str]
_termination_info_queue: janus.Queue[TerminatedTaskInfo]
_canceller_chain: Dict[str, str]
_canceller_stacks: Dict[str, List[traceback.FrameSummary] | None]
_cancellation_chain_queue: janus.Queue[CancellationChain]
def __init__(
self,
loop: asyncio.AbstractEventLoop,
*,
host: str = MONITOR_HOST,
termui_port: int = MONITOR_TERMUI_PORT,
webui_port: int = MONITOR_WEBUI_PORT,
console_port: int = CONSOLE_PORT,
console_enabled: bool = True,
hook_task_factory: bool = False,
max_termination_history: int = 1000,
locals: Optional[Dict[str, Any]] = None,
) -> None:
self._monitored_loop = loop or asyncio.get_running_loop()
self._host = host
self._termui_port = termui_port
self._webui_port = webui_port
self._console_port = console_port
self._console_enabled = console_enabled
if locals is None:
self.console_locals = {"__name__": "__console__", "__doc__": None}
else:
self.console_locals = locals
self.prompt = "monitor >>> "
log.info(
"Starting aiomonitor at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d",
{
"host": host,
"tport": termui_port,
"wport": webui_port,
},
)
self._closed = False
self._started = False
self._termui_tasks = weakref.WeakSet()
self._hook_task_factory = hook_task_factory
self._created_traceback_chains = weakref.WeakKeyDictionary()
self._created_tracebacks = weakref.WeakKeyDictionary()
self._terminated_tasks = {}
self._canceller_chain = {}
self._canceller_stacks = {}
self._terminated_history = []
self._max_termination_history = max_termination_history
self._ui_started = threading.Event()
self._ui_thread = threading.Thread(target=self._ui_main, args=(), daemon=True)
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._termui_port
def __repr__(self) -> str:
name = self.__class__.__name__
return "<{name}: {host}:{port}>".format(
name=name, host=self._host, port=self._termui_port
)
[docs]
def start(self) -> None:
assert not self._closed
assert not self._started
self._started = True
self._original_task_factory = self._monitored_loop.get_task_factory()
if self._hook_task_factory:
self._monitored_loop.set_task_factory(self._create_task)
self._event_loop_thread_id = threading.get_ident()
self._ui_thread.start()
self._ui_started.wait()
@property
def closed(self) -> bool:
return self._closed
def __enter__(self) -> Monitor:
if not self._started:
self.start()
return self
# exc_type should be Optional[Type[BaseException]], but
# this runs into https://github.com/python/typing/issues/266
# on Python 3.5.
def __exit__(
self,
exc_type: Any,
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.close()
[docs]
def close(self) -> None:
assert self._started, "The monitor must have been started to close it."
if not self._closed:
self._ui_loop.call_soon_threadsafe(
self._ui_forever_future.cancel,
)
self._monitored_loop.set_task_factory(self._original_task_factory)
self._ui_thread.join()
self._closed = True
def format_running_task_list(
self, filter_: str, persistent: bool
) -> Sequence[FormattedLiveTaskInfo]:
all_running_tasks = asyncio.all_tasks(loop=self._monitored_loop)
tasks = []
for task in sorted(all_running_tasks, key=id):
taskid = str(id(task))
if isinstance(task, TracedTask):
coro_repr = _format_coroutine(task._orig_coro).partition(" ")[0]
if persistent and task._orig_coro not in persistent_coro:
continue
else:
coro_repr = _format_coroutine(task.get_coro()).partition(" ")[0]
if persistent:
# untracked tasks should be skipped when showing persistent ones only
continue
if filter_ and (
filter_ not in coro_repr and filter_ not in task.get_name()
):
continue
creation_stack = self._created_tracebacks.get(task)
# Some values are masked as "-" when they are unavailable
# if it's the root task/coro or if the task factory is not applied.
if not creation_stack:
created_location = "-"
else:
creation_stack = _filter_stack(creation_stack)
fn = _format_filename(creation_stack[-1].filename)
lineno = creation_stack[-1].lineno
created_location = f"{fn}:{lineno}"
if isinstance(task, TracedTask):
running_since = _format_timedelta(
timedelta(
seconds=(time.perf_counter() - task._started_at),
)
)
else:
running_since = "-"
tasks.append(
FormattedLiveTaskInfo(
taskid,
task._state,
task.get_name(),
coro_repr,
created_location,
running_since,
)
)
return tasks
def format_terminated_task_list(
self, filter_: str, persistent: bool
) -> Sequence[FormattedTerminatedTaskInfo]:
terminated_tasks = self._terminated_tasks.values()
tasks = []
for item in sorted(
terminated_tasks,
key=lambda info: info.terminated_at,
reverse=True,
):
if persistent and not item.persistent:
continue
if filter_ and (filter_ not in item.coro and filter_ not in item.name):
continue
started_since = _format_timedelta(
timedelta(seconds=time.perf_counter() - item.started_at)
)
terminated_since = _format_timedelta(
timedelta(seconds=time.perf_counter() - item.terminated_at)
)
tasks.append(
FormattedTerminatedTaskInfo(
str(item.id),
item.name,
item.coro,
started_since,
terminated_since,
)
)
return tasks
async def cancel_monitored_task(self, task_id: str | int) -> str:
task_id_ = int(task_id)
task = task_by_id(task_id_, self._monitored_loop)
if task is not None:
if self._monitored_loop == asyncio.get_running_loop():
await cancel_task(task)
else:
fut = asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(
cancel_task(task), loop=self._monitored_loop
)
)
await fut
if isinstance(task, TracedTask):
coro_repr = _format_coroutine(task._orig_coro).partition(" ")[0]
else:
coro_repr = _format_coroutine(task.get_coro()).partition(" ")[0]
return coro_repr
else:
raise ValueError("Invalid or non-existent task ID", task_id)
def format_running_task_stack(
self,
task_id: str | int,
) -> Sequence[FormattedStackItem]:
depth = 0
task_id_ = int(task_id)
task = task_by_id(task_id_, self._monitored_loop)
if task is None:
raise MissingTask(task_id_)
task_chain: List[asyncio.Task[Any]] = []
while task is not None:
task_chain.append(task)
task_ref = self._created_traceback_chains.get(task)
task = task_ref() if task_ref is not None else None
prev_task = None
formatted_stack_list = []
for task in reversed(task_chain):
if depth == 0:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
(
"Stack of the root task or coroutine scheduled "
"in the event loop (most recent call last)"
),
)
)
elif depth > 0:
assert prev_task is not None
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
(
"Stack of %s when creating the next task "
"(most recent call last)" % _format_task(prev_task)
),
)
)
stack = self._created_tracebacks.get(task)
if stack is None:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
(
"No stack available (maybe it is a native code, "
"a synchronous callback function, "
"or the event loop itself)"
),
)
)
else:
stack = _filter_stack(stack)
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
textwrap.dedent("".join(traceback.format_list(stack))),
)
)
prev_task = task
depth += 1
task = task_chain[0]
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
"Stack of %s (most recent call last)" % _format_task(task),
)
)
stack = _extract_stack_from_task(task)
if not stack:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
"No stack available for %s" % _format_task(task),
)
)
else:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
textwrap.dedent("".join(traceback.format_list(stack))),
)
)
return formatted_stack_list
def format_terminated_task_stack(
self,
trace_id: str,
) -> Sequence[FormattedStackItem]:
depth = 0
tinfo_chain: List[TerminatedTaskInfo] = []
while trace_id is not None:
tinfo_chain.append(self._terminated_tasks[trace_id])
trace_id = self._canceller_chain.get(trace_id) # type: ignore
prev_tinfo = None
formatted_stack_list = []
for tinfo in reversed(tinfo_chain):
if depth == 0:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
(
"Stack of the root task or coroutine "
"scheduled in the event loop"
"(most recent call last)"
),
)
)
elif depth > 0:
assert prev_tinfo is not None
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
(
"Stack of %s when creating the next task "
"(most recent call last)"
% _format_terminated_task(prev_tinfo)
),
)
)
stack = tinfo.canceller_stack
if stack is None:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
(
"No stack available "
"(maybe it is a self-raised cancellation or exception)"
),
)
)
else:
stack = _filter_stack(stack)
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
textwrap.dedent("".join(traceback.format_list(stack))),
)
)
prev_tinfo = tinfo
depth += 1
tinfo = tinfo_chain[0]
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.HEADER,
"Stack of %s (most recent call last)" % _format_terminated_task(tinfo),
)
)
stack = tinfo.termination_stack
if not stack:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
(
"No stack available for %s (the task has run to completion)"
% _format_terminated_task(tinfo)
),
)
)
else:
formatted_stack_list.append(
FormattedStackItem(
FormatItemTypes.CONTENT,
textwrap.dedent("".join(traceback.format_list(stack))),
)
)
return formatted_stack_list
async def _coro_wrapper(self, coro: Awaitable[T_co]) -> T_co:
myself = asyncio.current_task()
assert isinstance(myself, TracedTask)
try:
return await coro
except BaseException as e:
myself._termination_stack = _extract_stack_from_exception(e)[:-1]
raise
def _create_task(
self,
loop: asyncio.AbstractEventLoop,
coro: Coroutine[Any, Any, T_co] | Generator[Any, None, T_co],
*,
name: str | None = None,
context: contextvars.Context | None = None,
) -> asyncio.Future[T_co]:
assert loop is self._monitored_loop
try:
parent_task = asyncio.current_task()
except RuntimeError:
parent_task = None
persistent = coro in persistent_coro
task = TracedTask(
self._coro_wrapper(coro), # type: ignore
termination_info_queue=self._termination_info_queue.sync_q,
cancellation_chain_queue=self._cancellation_chain_queue.sync_q,
persistent=persistent,
loop=self._monitored_loop,
name=name, # since Python 3.8
context=context, # since Python 3.11
)
task._orig_coro = cast(Coroutine[Any, Any, T_co], coro)
self._created_tracebacks[task] = _extract_stack_from_frame(sys._getframe())[
:-1
] # strip this wrapper method
if parent_task is not None:
self._created_traceback_chains[task] = weakref.ref(parent_task)
return task
def _ui_main(self) -> None:
asyncio.run(self._ui_main_async())
async def _ui_main_async(self) -> None:
loop = asyncio.get_running_loop()
self._termination_info_queue = janus.Queue()
self._cancellation_chain_queue = janus.Queue()
self._ui_loop = loop
self._ui_forever_future = loop.create_future()
self._ui_termination_handler_task = loop.create_task(
self._ui_handle_termination_updates()
)
self._ui_cancellation_handler_task = loop.create_task(
self._ui_handle_cancellation_updates()
)
telnet_server = TelnetServer(
interact=functools.partial(interact, self),
host=self._host,
port=self._termui_port,
)
webui_app = await init_webui(self)
webui_runner = web.AppRunner(webui_app)
await webui_runner.setup()
webui_site = web.TCPSite(
webui_runner,
str(self._host),
self._webui_port,
reuse_port=True,
)
await webui_site.start()
telnet_server.start()
await asyncio.sleep(0)
self._ui_started.set()
try:
await self._ui_forever_future
except asyncio.CancelledError:
pass
finally:
termui_tasks = {*self._termui_tasks}
for termui_task in termui_tasks:
termui_task.cancel()
await asyncio.gather(*termui_tasks, return_exceptions=True)
self._ui_termination_handler_task.cancel()
self._ui_cancellation_handler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._ui_termination_handler_task
with contextlib.suppress(asyncio.CancelledError):
await self._ui_cancellation_handler_task
await telnet_server.stop()
await webui_runner.cleanup()
async def _ui_handle_termination_updates(self) -> None:
while True:
try:
update: TerminatedTaskInfo = (
await self._termination_info_queue.async_q.get()
)
except asyncio.CancelledError:
return
self._terminated_tasks[update.id] = update
if not update.persistent:
self._terminated_history.append(update.id)
# canceller stack is already put in _ui_handle_cancellation_updates()
if canceller_stack := self._canceller_stacks.pop(update.id, None):
update.canceller_stack = canceller_stack
while len(self._terminated_history) > self._max_termination_history:
removed_id = self._terminated_history.pop(0)
self._terminated_tasks.pop(removed_id, None)
self._canceller_chain.pop(removed_id, None)
self._canceller_stacks.pop(removed_id, None)
async def _ui_handle_cancellation_updates(self) -> None:
while True:
try:
update: CancellationChain = (
await self._cancellation_chain_queue.async_q.get()
)
except asyncio.CancelledError:
return
self._canceller_stacks[update.target_id] = update.canceller_stack
self._canceller_chain[update.target_id] = update.canceller_id
[docs]
def start_monitor(
loop: asyncio.AbstractEventLoop,
*,
monitor_cls: Type[Monitor] = Monitor,
host: str = MONITOR_HOST,
port: int = MONITOR_TERMUI_PORT, # kept the name for backward compatibility
console_port: int = CONSOLE_PORT,
webui_port: int = MONITOR_WEBUI_PORT,
console_enabled: bool = True,
hook_task_factory: bool = False,
max_termination_history: Optional[int] = None,
locals: Optional[Dict[str, Any]] = None,
) -> Monitor:
m = monitor_cls(
loop,
host=host,
termui_port=port,
webui_port=webui_port,
console_port=console_port,
console_enabled=console_enabled,
hook_task_factory=hook_task_factory,
max_termination_history=(
max_termination_history
or get_default_args(monitor_cls.__init__)["max_termination_history"]
),
locals=locals,
)
m.start()
return m