paulbergmann_mpstubs/tools/gdb/stubs/monitor.py

132 lines
4.3 KiB
Python

import asyncio
import re
from qemu.qmp import QMPClient
class Monitor:
def __init__(self, socket):
self.socket = socket
self.qmp = QMPClient(f'Monitor: {socket}')
pass
async def _connect(self):
await self.qmp.connect(self.socket)
async def _disconnect(self):
await self.qmp.disconnect()
async def _hmc(self, cmd):
raw = await self.qmp.execute(
'human-monitor-command', {'command-line': cmd})
return raw
def registers(self):
registers = dict()
async def query_registers():
await self._connect()
raw = await self._hmc('info registers -a')
await self._disconnect()
return raw
raw = asyncio.run(query_registers())
# each paragraph of `raw` contains the registers for a logical CPU
cpu_split = raw.split('\r\n\r\n')
cpu_split_stripped = (x.strip() for x in cpu_split)
cpu_split = [s for s in cpu_split_stripped if s]
# general purpose registers
def fetch_gpr(input):
registers = dict()
gprs = ['RAX', 'RBX', 'RCX', 'RDX', 'RSI', 'RDI',
'RBP', 'RSP', 'R8', 'R9', 'R10', 'R11',
'R12', 'R13', 'R14', 'R15', 'RIP']
for gpr in gprs:
pattern = rf"{gpr}\s?=(?P<{gpr}>\w{{16}})"
match = re.search(pattern, input)
value_raw = match.group(gpr)
value = int(value_raw, 16)
registers[gpr.lower()] = value
return registers
# control registers
def fetch_cr(input):
registers = dict()
for cr in ['CR0', 'CR2', 'CR3', 'CR4']:
pattern = rf"{cr}=(?P<{cr}>\w{{8,16}})"
match = re.search(pattern, input)
value_raw = match.group(cr)
value = int(value_raw, 16)
registers[cr.lower()] = value
return registers
# desriptor tables
def fetch_dt(input):
registers = dict()
for tbl in ['GDT', 'IDT']:
pattern = rf"{tbl}\s*=\s*(?P<{tbl}_base>\w{{16}})" \
rf"\s+(?P<{tbl}_limit>\w{{8}})"
match = re.search(pattern, input)
base_raw = match.group(f"{tbl}_base")
limit_raw = match.group(f"{tbl}_limit")
base = int(base_raw, 16)
limit = int(limit_raw, 16)
registers[tbl.lower()] = (base, limit)
return registers
registers = dict()
for cpuid, regstr in enumerate(cpu_split):
assert (regstr is not None and len(regstr) > 0)
registers[cpuid] = dict()
registers[cpuid].update(fetch_gpr(regstr))
registers[cpuid].update(fetch_cr(regstr))
registers[cpuid].update(fetch_dt(regstr))
return registers
def virtual_memory(self, addr, size):
# byte, word, double word, giant
types = {1: 'b', 2: 'w', 4: 'd', 8: 'g'}
assert (size in types)
async def query_virtual_memory():
await self._connect()
res = await self._hmc(f"x/x{types[size]} {addr}")
await self._disconnect()
return res
res = asyncio.run(query_virtual_memory())
match = re.match(r"[a-f\d]+:\s*(0x[a-f\d]+)", res)
assert (match)
return int(match.group(1), 16)
def physical_memory(self, addr, size):
# byte, word, double word, giant
types = {1: 'b', 2: 'w', 4: 'd', 8: 'g'}
assert (size in types)
async def query_physical_memory():
await self._connect()
res = await self._hmc(f"xp/x{types[size]} {addr}")
await self._disconnect()
return res
res = asyncio.run(query_physical_memory())
match = re.match(r"[a-f\d]+:\s*(0x[a-f\d]+)", res)
assert (match)
return int(match.group(1), 16)
def gva2gpa(self, addr):
async def query_gva2gpa():
await self._connect()
res = await self._hmc(f"gva2gpa {addr}")
await self._disconnect()
return res
res = asyncio.run(query_gva2gpa())
if res == 'Unmapped\r\n':
return None
match = re.match(r"gpa:\s*0x([\da-f]+)", res)
assert (match)
return int(match.group(1), 16)