forked from BSB-WS23/mpstubs
132 lines
4.3 KiB
Python
132 lines
4.3 KiB
Python
|
|
import asyncio
|
|
import re
|
|
from qemu.qmp import QMPClient
|
|
|
|
class Monitor:
|
|
def __init__(self, socket):
|
|
self.socket = socket
|
|
self.qmp = QMPClient(f'Monitor: {socket}')
|
|
pass
|
|
|
|
async def _connect(self):
|
|
await self.qmp.connect(self.socket)
|
|
|
|
async def _disconnect(self):
|
|
await self.qmp.disconnect()
|
|
|
|
async def _hmc(self, cmd):
|
|
raw = await self.qmp.execute(
|
|
'human-monitor-command', {'command-line': cmd})
|
|
return raw
|
|
|
|
def registers(self):
|
|
registers = dict()
|
|
|
|
async def query_registers():
|
|
await self._connect()
|
|
raw = await self._hmc('info registers -a')
|
|
await self._disconnect()
|
|
return raw
|
|
raw = asyncio.run(query_registers())
|
|
|
|
# each paragraph of `raw` contains the registers for a logical CPU
|
|
cpu_split = raw.split('\r\n\r\n')
|
|
cpu_split_stripped = (x.strip() for x in cpu_split)
|
|
cpu_split = [s for s in cpu_split_stripped if s]
|
|
|
|
# general purpose registers
|
|
def fetch_gpr(input):
|
|
registers = dict()
|
|
gprs = ['RAX', 'RBX', 'RCX', 'RDX', 'RSI', 'RDI',
|
|
'RBP', 'RSP', 'R8', 'R9', 'R10', 'R11',
|
|
'R12', 'R13', 'R14', 'R15', 'RIP']
|
|
for gpr in gprs:
|
|
pattern = rf"{gpr}\s?=(?P<{gpr}>\w{{16}})"
|
|
match = re.search(pattern, input)
|
|
value_raw = match.group(gpr)
|
|
value = int(value_raw, 16)
|
|
registers[gpr.lower()] = value
|
|
return registers
|
|
|
|
# control registers
|
|
def fetch_cr(input):
|
|
registers = dict()
|
|
for cr in ['CR0', 'CR2', 'CR3', 'CR4']:
|
|
pattern = rf"{cr}=(?P<{cr}>\w{{8,16}})"
|
|
match = re.search(pattern, input)
|
|
value_raw = match.group(cr)
|
|
value = int(value_raw, 16)
|
|
registers[cr.lower()] = value
|
|
return registers
|
|
|
|
# desriptor tables
|
|
def fetch_dt(input):
|
|
registers = dict()
|
|
for tbl in ['GDT', 'IDT']:
|
|
pattern = rf"{tbl}\s*=\s*(?P<{tbl}_base>\w{{16}})" \
|
|
rf"\s+(?P<{tbl}_limit>\w{{8}})"
|
|
match = re.search(pattern, input)
|
|
base_raw = match.group(f"{tbl}_base")
|
|
limit_raw = match.group(f"{tbl}_limit")
|
|
base = int(base_raw, 16)
|
|
limit = int(limit_raw, 16)
|
|
registers[tbl.lower()] = (base, limit)
|
|
return registers
|
|
|
|
registers = dict()
|
|
for cpuid, regstr in enumerate(cpu_split):
|
|
assert (regstr is not None and len(regstr) > 0)
|
|
registers[cpuid] = dict()
|
|
registers[cpuid].update(fetch_gpr(regstr))
|
|
registers[cpuid].update(fetch_cr(regstr))
|
|
registers[cpuid].update(fetch_dt(regstr))
|
|
|
|
return registers
|
|
|
|
def virtual_memory(self, addr, size):
|
|
# byte, word, double word, giant
|
|
types = {1: 'b', 2: 'w', 4: 'd', 8: 'g'}
|
|
assert (size in types)
|
|
|
|
async def query_virtual_memory():
|
|
await self._connect()
|
|
res = await self._hmc(f"x/x{types[size]} {addr}")
|
|
await self._disconnect()
|
|
return res
|
|
|
|
res = asyncio.run(query_virtual_memory())
|
|
match = re.match(r"[a-f\d]+:\s*(0x[a-f\d]+)", res)
|
|
assert (match)
|
|
return int(match.group(1), 16)
|
|
|
|
def physical_memory(self, addr, size):
|
|
# byte, word, double word, giant
|
|
types = {1: 'b', 2: 'w', 4: 'd', 8: 'g'}
|
|
assert (size in types)
|
|
|
|
async def query_physical_memory():
|
|
await self._connect()
|
|
res = await self._hmc(f"xp/x{types[size]} {addr}")
|
|
await self._disconnect()
|
|
return res
|
|
|
|
res = asyncio.run(query_physical_memory())
|
|
match = re.match(r"[a-f\d]+:\s*(0x[a-f\d]+)", res)
|
|
assert (match)
|
|
return int(match.group(1), 16)
|
|
|
|
def gva2gpa(self, addr):
|
|
async def query_gva2gpa():
|
|
await self._connect()
|
|
res = await self._hmc(f"gva2gpa {addr}")
|
|
await self._disconnect()
|
|
return res
|
|
|
|
res = asyncio.run(query_gva2gpa())
|
|
if res == 'Unmapped\r\n':
|
|
return None
|
|
match = re.match(r"gpa:\s*0x([\da-f]+)", res)
|
|
assert (match)
|
|
return int(match.group(1), 16)
|