John Snow 3bc72e3aed python/aqmp: add __del__ method to legacy interface
asyncio can complain *very* loudly if you forget to back out of things
gracefully before the garbage collector starts destroying objects that
contain live references to asyncio Tasks.

The usual fix is just to remember to call aqmp.disconnect(), but for the
sake of the legacy wrapper and quick, one-off scripts where a graceful
shutdown is not necessarily of paramount imporance, add a courtesy
cleanup that will trigger prior to seeing screenfuls of confusing
asyncio tracebacks.

Note that we can't *always* save you from yourself; depending on when
the GC runs, you might just seriously be out of luck. The best we can do
in this case is to gently remind you to clean up after yourself.

(Still much better than multiple pages of incomprehensible python
warnings for the crime of forgetting to put your toys away.)

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Beraldo Leal <bleal@redhat.com>
2022-01-21 16:01:31 -05:00

157 lines
4.4 KiB
Python

"""
Sync QMP Wrapper
This class pretends to be qemu.qmp.QEMUMonitorProtocol.
"""
import asyncio
from typing import (
Awaitable,
List,
Optional,
TypeVar,
Union,
)
import qemu.qmp
from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
from .error import AQMPError
from .protocol import Runstate
from .qmp_client import QMPClient
# pylint: disable=missing-docstring
class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
def __init__(self, address: SocketAddrT,
server: bool = False,
nickname: Optional[str] = None):
# pylint: disable=super-init-not-called
self._aqmp = QMPClient(nickname)
self._aloop = asyncio.get_event_loop()
self._address = address
self._timeout: Optional[float] = None
_T = TypeVar('_T')
def _sync(
self, future: Awaitable[_T], timeout: Optional[float] = None
) -> _T:
return self._aloop.run_until_complete(
asyncio.wait_for(future, timeout=timeout)
)
def _get_greeting(self) -> Optional[QMPMessage]:
if self._aqmp.greeting is not None:
# pylint: disable=protected-access
return self._aqmp.greeting._asdict()
return None
# __enter__ and __exit__ need no changes
# parse_address needs no changes
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
self._aqmp.await_greeting = negotiate
self._aqmp.negotiate = negotiate
self._sync(
self._aqmp.connect(self._address)
)
return self._get_greeting()
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
self._aqmp.await_greeting = True
self._aqmp.negotiate = True
self._sync(
self._aqmp.accept(self._address),
timeout
)
ret = self._get_greeting()
assert ret is not None
return ret
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
return dict(
self._sync(
# pylint: disable=protected-access
# _raw() isn't a public API, because turning off
# automatic ID assignment is discouraged. For
# compatibility with iotests *only*, do it anyway.
self._aqmp._raw(qmp_cmd, assign_id=False),
self._timeout
)
)
# Default impl of cmd() delegates to cmd_obj
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
return self._sync(
self._aqmp.execute(cmd, kwds),
self._timeout
)
def pull_event(self,
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
if not wait:
# wait is False/0: "do not wait, do not except."
if self._aqmp.events.empty():
return None
# If wait is 'True', wait forever. If wait is False/0, the events
# queue must not be empty; but it still needs some real amount
# of time to complete.
timeout = None
if wait and isinstance(wait, float):
timeout = wait
return dict(
self._sync(
self._aqmp.events.get(),
timeout
)
)
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
events = [dict(x) for x in self._aqmp.events.clear()]
if events:
return events
event = self.pull_event(wait)
return [event] if event is not None else []
def clear_events(self) -> None:
self._aqmp.events.clear()
def close(self) -> None:
self._sync(
self._aqmp.disconnect()
)
def settimeout(self, timeout: Optional[float]) -> None:
self._timeout = timeout
def send_fd_scm(self, fd: int) -> None:
self._aqmp.send_fd_scm(fd)
def __del__(self) -> None:
if self._aqmp.runstate == Runstate.IDLE:
return
if not self._aloop.is_running():
self.close()
else:
# Garbage collection ran while the event loop was running.
# Nothing we can do about it now, but if we don't raise our
# own error, the user will be treated to a lot of traceback
# they might not understand.
raise AQMPError(
"QEMUMonitorProtocol.close()"
" was not called before object was garbage collected"
)