Document and test remaining attributes/methods; fix PyPy tests by not calling unbind() from tp_dealloc; misc other cleanups

This commit is contained in:
Joshua Oreman 2022-01-13 17:09:51 -07:00
parent 2c782b9619
commit 62da18ee37
5 changed files with 248 additions and 143 deletions

View File

@ -90,13 +90,9 @@ To install from source::
API
===
``NetfilterQueue.COPY_NONE``
``NetfilterQueue.COPY_META``
``NetfilterQueue.COPY_PACKET``
``NetfilterQueue.COPY_NONE``, ``NetfilterQueue.COPY_META``, ``NetfilterQueue.COPY_PACKET``
These constants specify how much of the packet should be given to the
script- nothing, metadata, or the whole packet.
script: nothing, metadata, or the whole packet.
NetfilterQueue objects
----------------------
@ -104,7 +100,7 @@ NetfilterQueue objects
A NetfilterQueue object represents a single queue. Configure your queue with
a call to ``bind``, then start receiving packets with a call to ``run``.
``QueueHandler.bind(queue_num, callback[, max_len[, mode[, range[, sock_len]]]])``
``NetfilterQueue.bind(queue_num, callback, max_len=1024, mode=COPY_PACKET, range=65535, sock_len=...)``
Create and bind to the queue. ``queue_num`` uniquely identifies this
queue for the kernel. It must match the ``--queue-num`` in your iptables
rule, but there is no ordering requirement: it's fine to either ``bind()``
@ -118,22 +114,23 @@ a call to ``bind``, then start receiving packets with a call to ``run``.
the source and destination IPs of a IPv4 packet, ``range`` could be 20.
``sock_len`` sets the receive socket buffer size.
``QueueHandler.unbind()``
``NetfilterQueue.unbind()``
Remove the queue. Packets matched by your iptables rule will be dropped.
``QueueHandler.get_fd()``
``NetfilterQueue.get_fd()``
Get the file descriptor of the socket used to receive queued
packets and send verdicts. If you're using an async event loop,
you can poll this FD for readability and call ``run(False)`` every
time data appears on it.
``QueueHandler.run([block])``
``NetfilterQueue.run(block=True)``
Send packets to your callback. By default, this method blocks, running
until an exception is raised (such as by Ctrl+C). Set
block=False to process the pending messages without waiting for more.
You can get the file descriptor of the socket with the ``get_fd`` method.
``block=False`` to process the pending messages without waiting for more;
in conjunction with the ``get_fd`` method, you can use this to integrate
with async event loops.
``QueueHandler.run_socket(socket)``
``NetfilterQueue.run_socket(socket)``
Send packets to your callback, but use the supplied socket instead of
recv, so that, for example, gevent can monkeypatch it. You can make a
socket with ``socket.fromfd(nfqueue.get_fd(), socket.AF_NETLINK, socket.SOCK_RAW)``
@ -148,6 +145,8 @@ Objects of this type are passed to your callback.
Return the packet's payload as a bytes object. The returned value
starts with the IP header. You must call ``retain()`` if you want
to be able to ``get_payload()`` after your callback has returned.
If you have already called ``set_payload()``, then ``get_payload()``
returns what you passed to ``set_payload()``.
``Packet.set_payload(payload)``
Set the packet payload. Call this before ``accept()`` if you want to
@ -166,12 +165,46 @@ Objects of this type are passed to your callback.
rules. ``mark`` is a 32-bit number.
``Packet.get_mark()``
Get the mark already on the packet (either the one you set using
Get the mark on the packet (either the one you set using
``set_mark()``, or the one it arrived with if you haven't called
``set_mark()``).
``Packet.get_hw()``
Return the hardware address as a Python string.
Return the source hardware address of the packet as a Python
bytestring, or ``None`` if the source hardware address was not
captured (packets captured by the ``OUTPUT`` or ``PREROUTING``
hooks). For example, on Ethernet the result will be a six-byte
MAC address. The destination hardware address is not available
because it is determined in the kernel only after packet filtering
is complete.
``Packet.get_timestamp()``
Return the time at which this packet was received by the kernel,
as a floating-point Unix timestamp with microsecond precision
(comparable to the result of ``time.time()``, for example).
Packets captured by the ``OUTPUT`` or ``POSTROUTING`` hooks
do not have a timestamp, and ``get_timestamp()`` will return 0.0
for them.
``Packet.id``
The identifier assigned to this packet by the kernel. Typically
the first packet received by your queue starts at 1 and later ones
count up from there.
``Packet.hw_protocol``
The link-layer protocol for this packet. For example, IPv4 packets
on Ethernet would have this set to the EtherType for IPv4, which is
``0x0800``.
``Packet.mark``
The mark that had been assigned to this packet when it was enqueued.
Unlike the result of ``get_mark()``, this does not change if you call
``set_mark()``.
``Packet.hook``
The netfilter hook (iptables chain, roughly) that diverted this packet
into our queue. Values 0 through 4 correspond to PREROUTING, INPUT,
FORWARD, OUTPUT, and POSTROUTING respectively.
``Packet.retain()``
Allocate a copy of the packet payload for use after the callback
@ -249,20 +282,39 @@ The fields are:
Limitations
===========
* Compiled with a 4096-byte buffer for packets, so it probably won't work on
loopback or Ethernet with jumbo packets. If this is a problem, either lower
MTU on your loopback, disable jumbo packets, or get Cython,
change ``DEF BufferSize = 4096`` in ``netfilterqueue.pyx``, and rebuild.
* Full libnetfilter_queue API is not yet implemented:
* We use a fixed-size 4096-byte buffer for packets, so you are likely
to see truncation on loopback and on Ethernet with jumbo packets.
If this is a problem, either lower the MTU on your loopback, disable
jumbo packets, or get Cython, change ``DEF BufferSize = 4096`` in
``netfilterqueue.pyx``, and rebuild.
* Omits methods for getting information about the interface a packet has
arrived on or is leaving on
* Probably other stuff is omitted too
* Not all information available from libnetfilter_queue is exposed:
missing pieces include packet input/output network interface names,
checksum offload flags, UID/GID and security context data
associated with the packet (if any).
* Not all information available from the kernel is even processed by
libnetfilter_queue: missing pieces include additional link-layer
header data for some packets (including VLAN tags), connection-tracking
state, and incoming packet length (if truncated for queueing).
* We do not expose the libnetfilter_queue interface for changing queue flags.
Most of these pertain to other features we don't support (listed above),
but there's one that could set the queue to accept (rather than dropping)
packets received when it's full.
Source
======
https://github.com/kti/python-netfilterqueue
https://github.com/oremanj/python-netfilterqueue
Authorship
==========
python-netfilterqueue was originally written by Matthew Fox of
Kerkhoff Technologies, Inc. Since 2022 it has been maintained by
Joshua Oreman of Hudson River Trading LLC. Both authors wish to
thank their employers for their support of open source.
License
=======

View File

@ -172,7 +172,6 @@ cdef class NetfilterQueue:
cdef object user_callback # User callback
cdef nfq_handle *h # Handle to NFQueue library
cdef nfq_q_handle *qh # A handle to the queue
cdef bint unbinding
cdef class Packet:
cdef NetfilterQueue _queue
@ -180,6 +179,7 @@ cdef class Packet:
# false otherwise
cdef bint _mark_is_set # True if a mark has been given, false otherwise
cdef bint _hwaddr_is_set
cdef bint _timestamp_is_set
cdef u_int32_t _given_mark # Mark given to packet
cdef bytes _given_payload # New payload of packet, or null
cdef bytes _owned_payload
@ -197,7 +197,6 @@ cdef class Packet:
cdef u_int8_t hw_addr[8]
# TODO: implement these
#cdef u_int8_t hw_addr[8] # A eui64-formatted address?
#cdef readonly u_int32_t nfmark
#cdef readonly u_int32_t indev
#cdef readonly u_int32_t physindev

View File

@ -24,18 +24,6 @@ DEF SockCopySize = MaxCopySize + SockOverhead
# Socket queue should hold max number of packets of copysize bytes
DEF SockRcvSize = DEFAULT_MAX_QUEUELEN * SockCopySize // 2
cdef extern from *:
"""
static void do_write_unraisable(PyObject* obj) {
PyObject *ty, *val, *tb;
PyErr_GetExcInfo(&ty, &val, &tb);
PyErr_Restore(ty, val, tb);
PyErr_WriteUnraisable(obj);
}
"""
cdef void do_write_unraisable(msg)
from cpython.exc cimport PyErr_CheckSignals
# A negative return value from this callback will stop processing and
@ -45,17 +33,15 @@ cdef int global_callback(nfq_q_handle *qh, nfgenmsg *nfmsg,
"""Create a Packet and pass it to appropriate callback."""
cdef NetfilterQueue nfqueue = <NetfilterQueue>data
cdef object user_callback = <object>nfqueue.user_callback
if user_callback is None:
# Queue is being unbound; we can't send a verdict at this point
# so just ignore the packet. The kernel will drop it once we
# unbind.
return 1
packet = Packet()
packet.set_nfq_data(nfqueue, nfa)
try:
user_callback(packet)
except BaseException as exc:
if nfqueue.unbinding == True:
do_write_unraisable(
"netfilterqueue callback during unbind"
)
else:
raise
finally:
packet.drop_refs()
return 1
@ -104,7 +90,7 @@ cdef class Packet:
cdef drop_refs(self):
"""
Called at the end of the user_callback, when the storage passed to
set_nfq_data() is about to be deallocated.
set_nfq_data() is about to be reused.
"""
self.payload = NULL
@ -139,7 +125,11 @@ cdef class Packet:
self._verdict_is_set = True
def get_hw(self):
"""Return the hardware address as Python string."""
"""Return the packet's source MAC address as a Python bytestring, or
None if it's not available.
"""
if not self._hwaddr_is_set:
return None
cdef object py_string
py_string = PyBytes_FromStringAndSize(<char*>self.hw_addr, 8)
return py_string
@ -209,11 +199,17 @@ cdef class NetfilterQueue:
if nfq_bind_pf(self.h, af) < 0:
raise OSError("Failed to bind family %s. Are you root?" % af)
def __dealloc__(self):
def __del__(self):
# unbind() can result in invocations of global_callback, so we
# must do it in __del__ (when this is still a valid
# NetfilterQueue object) rather than __dealloc__
self.unbind()
def __dealloc__(self):
# Don't call nfq_unbind_pf unless you want to disconnect any other
# processes using this libnetfilter_queue on this protocol family!
nfq_close(self.h)
if self.h != NULL:
nfq_close(self.h)
def bind(self, int queue_num, object user_callback,
u_int32_t max_len=DEFAULT_MAX_QUEUELEN,
@ -254,12 +250,9 @@ cdef class NetfilterQueue:
def unbind(self):
"""Destroy the queue."""
self.user_callback = None
if self.qh != NULL:
self.unbinding = True
try:
nfq_destroy_queue(self.qh)
finally:
self.unbinding = False
nfq_destroy_queue(self.qh)
self.qh = NULL
# See warning about nfq_unbind_pf in __dealloc__ above.
@ -288,9 +281,7 @@ cdef class NetfilterQueue:
PyErr_CheckSignals()
continue
raise OSError(errno, "recv failed")
rv = nfq_handle_packet(self.h, buf, rv)
if rv < 0:
raise OSError(errno, "nfq_handle_packet failed")
nfq_handle_packet(self.h, buf, rv)
def run_socket(self, s):
"""Accept packets using socket.recv so that, for example, gevent can monkeypatch it."""
@ -299,11 +290,6 @@ cdef class NetfilterQueue:
while True:
try:
buf = s.recv(BufferSize)
rv = len(buf)
if rv >= 0:
nfq_handle_packet(self.h, buf, rv)
else:
break
except socket.error as e:
err = e.args[0]
if err == ENOBUFS:
@ -315,6 +301,8 @@ cdef class NetfilterQueue:
else:
# This is bad. Let the caller handle it.
raise e
else:
nfq_handle_packet(self.h, buf, len(buf))
PROTOCOLS = {
0: "HOPOPT",

View File

@ -8,7 +8,7 @@ import trio
import unshare
import netfilterqueue
from functools import partial
from typing import AsyncIterator, Callable, Optional
from typing import AsyncIterator, Callable, Optional, Tuple
from async_generator import asynccontextmanager
from pytest_trio.enable_trio_mode import *
@ -124,6 +124,7 @@ class Harness:
def __init__(self):
self._received = {}
self._conn = {}
self.dest_addr = {}
self.failed = False
async def _run_peer(self, idx: int, *, task_status):
@ -188,8 +189,11 @@ class Harness:
start_nursery.start_soon(nursery.start, self._manage_peer, 1)
start_nursery.start_soon(nursery.start, self._manage_peer, 2)
# Tell each peer about the other one's port
await self._conn[2].send(await self._received[1].receive())
await self._conn[1].send(await self._received[2].receive())
for idx in (1, 2):
self.dest_addr[idx] = (
PEER_IP[idx], int(await self._received[idx].receive())
)
await self._conn[3 - idx].send(b"%d" % self.dest_addr[idx][1])
yield
self._conn[1].shutdown(socket.SHUT_WR)
self._conn[2].shutdown(socket.SHUT_WR)
@ -201,6 +205,47 @@ class Harness:
f"Peer {idx} received unexepcted packet {remainder!r}"
)
def bind_queue(
self,
cb: Callable[[netfilterqueue.Packet], None],
*,
queue_num: int = -1,
**options: int,
) -> Tuple[int, netfilterqueue.NetfilterQueue]:
nfq = netfilterqueue.NetfilterQueue()
# Use a smaller socket buffer to avoid a warning in CI
options.setdefault("sock_len", 131072)
if queue_num >= 0:
nfq.bind(queue_num, cb, **options)
else:
for queue_num in range(16):
try:
nfq.bind(queue_num, cb, **options)
break
except Exception as ex:
last_error = ex
else:
raise RuntimeError(
"Couldn't bind any netfilter queue number between 0-15"
) from last_error
return queue_num, nfq
@asynccontextmanager
async def enqueue_packets_to(
self, idx: int, queue_num: int, *, forwarded: bool = True
) -> AsyncIterator[None]:
if forwarded:
chain = "FORWARD"
else:
chain = "OUTPUT"
rule = f"{chain} -d {PEER_IP[idx]} -j NFQUEUE --queue-num {queue_num}"
await trio.run_process(f"/sbin/iptables -A {rule}".split())
try:
yield
finally:
await trio.run_process(f"/sbin/iptables -D {rule}".split())
@asynccontextmanager
async def capture_packets_to(
self,
@ -209,33 +254,13 @@ class Harness:
["trio.MemorySendChannel[netfilterqueue.Packet]", netfilterqueue.Packet],
None,
] = _default_capture_cb,
*,
queue_num: int = -1,
**options: int,
) -> AsyncIterator["trio.MemoryReceiveChannel[netfilterqueue.Packet]"]:
packets_w, packets_r = trio.open_memory_channel(math.inf)
nfq = netfilterqueue.NetfilterQueue()
# Use a smaller socket buffer to avoid a warning in CI
options.setdefault("sock_len", 131072)
if queue_num >= 0:
nfq.bind(queue_num, partial(cb, packets_w), **options)
else:
for queue_num in range(16):
try:
nfq.bind(queue_num, partial(cb, packets_w), **options)
break
except Exception as ex:
last_error = ex
else:
raise RuntimeError(
"Couldn't bind any netfilter queue number between 0-15"
) from last_error
queue_num, nfq = self.bind_queue(partial(cb, packets_w), **options)
try:
rule = f"-d {PEER_IP[idx]} -j NFQUEUE --queue-num {queue_num}"
await trio.run_process(f"/sbin/iptables -A FORWARD {rule}".split())
try:
async with self.enqueue_packets_to(idx, queue_num):
async with packets_w, trio.open_nursery() as nursery:
@nursery.start_soon
@ -246,8 +271,6 @@ class Harness:
yield packets_r
nursery.cancel_scope.cancel()
finally:
await trio.run_process(f"/sbin/iptables -D FORWARD {rule}".split())
finally:
nfq.unbind()

View File

@ -1,9 +1,13 @@
import gc
import struct
import trio
import trio.testing
import pytest
import signal
import socket
import sys
import time
import weakref
from netfilterqueue import NetfilterQueue
@ -84,6 +88,77 @@ async def test_rewrite_reorder(harness):
await harness.expect(2, b"numero uno", b"three", b"TWO", b"four")
async def test_mark_repeat(harness):
counter = 0
timestamps = []
def cb(chan, pkt):
nonlocal counter
assert pkt.get_mark() == counter
timestamps.append(pkt.get_timestamp())
if counter < 5:
counter += 1
pkt.set_mark(counter)
pkt.repeat()
assert pkt.get_mark() == counter
else:
pkt.accept()
async with harness.capture_packets_to(2, cb):
t0 = time.time()
await harness.send(2, b"testing")
await harness.expect(2, b"testing")
t1 = time.time()
assert counter == 5
# All iterations of the packet have the same timestamps
assert all(t == timestamps[0] for t in timestamps[1:])
assert t0 < timestamps[0] < t1
async def test_hwaddr(harness):
hwaddrs = []
def cb(pkt):
hwaddrs.append((pkt.get_hw(), pkt.hook, pkt.get_payload()[28:]))
pkt.accept()
queue_num, nfq = harness.bind_queue(cb)
try:
async with trio.open_nursery() as nursery:
@nursery.start_soon
async def listen_for_packets():
while True:
await trio.lowlevel.wait_readable(nfq.get_fd())
nfq.run(block=False)
async with harness.enqueue_packets_to(2, queue_num, forwarded=True):
await harness.send(2, b"one", b"two")
await harness.expect(2, b"one", b"two")
async with harness.enqueue_packets_to(2, queue_num, forwarded=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for payload in (b"three", b"four"):
sock.sendto(payload, harness.dest_addr[2])
with trio.fail_after(1):
while len(hwaddrs) < 4:
await trio.sleep(0.1)
nursery.cancel_scope.cancel()
finally:
nfq.unbind()
# Forwarded packets capture a hwaddr, but OUTPUT don't
FORWARD = 2
OUTPUT = 3
mac1 = hwaddrs[0][0]
assert mac1 is not None
assert hwaddrs == [
(mac1, FORWARD, b"one"),
(mac1, FORWARD, b"two"),
(None, OUTPUT, b"three"),
(None, OUTPUT, b"four")
]
async def test_errors(harness):
with pytest.warns(RuntimeWarning, match="rcvbuf limit is") as record:
async with harness.capture_packets_to(2, sock_len=2 ** 30):
@ -95,17 +170,28 @@ async def test_errors(harness):
async with harness.capture_packets_to(2, queue_num=0):
pass
nfq = NetfilterQueue()
nfq.bind(1, lambda p: None, sock_len=131072)
_, nfq = harness.bind_queue(lambda: None, queue_num=1)
with pytest.raises(RuntimeError, match="A queue is already bound"):
nfq.bind(2, lambda p: None, sock_len=131072)
nfq.bind(2, lambda p: None)
# Test unbinding via __del__
nfq = weakref.ref(nfq)
for _ in range(4):
gc.collect()
if nfq() is None:
break
else:
raise RuntimeError("Couldn't trigger garbage collection of NFQ")
async def test_unretained(harness):
# Capture packets without retaining -> can't access payload
async with harness.capture_packets_to(
2, trio.MemorySendChannel.send_nowait
) as chan:
def cb(chan, pkt):
# Can access payload within callback
assert pkt.get_payload()[-3:] in (b"one", b"two")
chan.send_nowait(pkt)
# Capture packets without retaining -> can't access payload after cb returns
async with harness.capture_packets_to(2, cb) as chan:
await harness.send(2, b"one", b"two")
accept = True
async for p in chan:
@ -155,53 +241,10 @@ async def test_cb_exception(harness):
pkt.drop()
async def test_cb_exception_during_unbind(harness, capsys):
pkt = None
def cb(channel, p):
nonlocal pkt
pkt = p
raise ValueError("test")
if sys.version_info >= (3, 8):
from _pytest.unraisableexception import catch_unraisable_exception
else:
from contextlib import contextmanager
@contextmanager
def catch_unraisable_exception():
yield
with catch_unraisable_exception() as unraise, trio.CancelScope() as cscope:
async with harness.capture_packets_to(2, cb):
# Cancel the task that reads from netfilter:
cscope.cancel()
with trio.CancelScope(shield=True):
await trio.testing.wait_all_tasks_blocked()
# Now actually send the packet and wait for the report to appear
# (hopefully)
await harness.send(2, b"boom boom")
await trio.sleep(0.5)
# Exiting the block calls unbind() and raises the exception in the cb.
# It gets caught and discarded as unraisable.
if unraise:
assert unraise.unraisable
assert unraise.unraisable.object == "netfilterqueue callback during unbind"
assert unraise.unraisable.exc_type is ValueError
assert str(unraise.unraisable.exc_value) == "test"
if not unraise:
assert (
"Exception ignored in: 'netfilterqueue callback" in capsys.readouterr().err
)
with pytest.raises(RuntimeError, match="Payload data is no longer available"):
pkt.get_payload()
with pytest.raises(RuntimeError, match="Parent queue has already been unbound"):
pkt.drop()
@pytest.mark.skipif(
sys.implementation.name == "pypy",
reason="pypy does not support PyErr_CheckSignals",
)
def test_signal():
nfq = NetfilterQueue()
nfq.bind(1, lambda p: None, sock_len=131072)