Compare commits

..

No commits in common. "master" and "25-Crash-in-iptables-OUTPUT-chain" have entirely different histories.

18 changed files with 9133 additions and 1396 deletions

View File

@ -1,44 +0,0 @@
name: CI
on:
push:
branches:
- master
pull_request:
jobs:
Ubuntu:
name: 'Ubuntu (${{ matrix.python }}${{ matrix.extra_name }})'
timeout-minutes: 10
runs-on: 'ubuntu-latest'
strategy:
fail-fast: false
matrix:
python:
- '3.7'
- '3.8'
- '3.9'
- '3.10'
- '3.11'
- 'pypy-3.7'
- 'pypy-3.8'
- 'pypy-3.9'
check_lint: ['0']
extra_name: ['']
include:
- python: '3.9'
check_lint: '1'
extra_name: ', check lint'
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: ${{ fromJSON(format('["{0}", "{1}"]', format('{0}.0-alpha - {0}.X', matrix.python), matrix.python))[startsWith(matrix.python, 'pypy')] }}
- name: Run tests
run: ./ci.sh
env:
CHECK_LINT: '${{ matrix.check_lint }}'
# Should match 'name:' up above
JOB_NAME: 'Ubuntu (${{ matrix.python }}${{ matrix.extra_name }})'

View File

@ -1,29 +1,3 @@
v1.1.0, 1 Mar 2023
Add Packet accessors for {indev, outdev, physindev, physoutdev} interface indices
v1.0.0, 14 Jan 2022
Propagate exceptions raised by the user's packet callback
Avoid calls to the packet callback during queue unbinding
Raise an error if a packet verdict is set after its parent queue is closed
set_payload() now affects the result of later get_payload()
Handle signals received when run() is blocked in recv()
Accept packets in COPY_META mode, only failing on an attempt to access the payload
Add a parameter NetfilterQueue(sockfd=N) that uses an already-opened Netlink socket
Add type hints
Remove the Packet.payload attribute; it was never safe (treated as a char* but not NUL-terminated) nor documented, but was exposed in the API (perhaps inadvertently).
v0.9.0, 12 Jan 2022
Improve usability when Packet objects are retained past the callback
Add Packet.retain() to save the packet contents in such cases
Eliminate warnings during build on py3
Add CI and basic test suite
Raise a warning, not an error, if we don't get the bufsize we want
Don't allow bind() more than once on the same NetfilterQueue, since that would leak the old queue handle
** This will be the last version with support for Python 2.7. **
v0.8.1, 30 Jan 2017
Fix bug #25- crashing when used in OUTPUT or POSTROUTING chains
v0.8, 15 Dec 2016
Add get_hw()
Fix byte order bug in set_mark

View File

@ -1,3 +1,5 @@
include LICENSE.txt README.rst CHANGES.txt
recursive-include netfilterqueue *.py *.pyx *.pxd *.c *.pyi py.typed
recursive-include tests *.py
include *.txt
include *.rst
include *.c
include *.pyx
include *.pxd

View File

@ -1,25 +1,13 @@
.. image:: https://img.shields.io/pypi/v/netfilterqueue.svg
:target: https://pypi.org/project/netfilterqueue
:alt: Latest PyPI version
.. image:: https://github.com/oremanj/python-netfilterqueue/actions/workflows/ci.yml/badge.svg?branch=master
:target: https://github.com/oremanj/python-netfilterqueue/actions?query=branch%3Amaster
:alt: Automated test status
==============
NetfilterQueue
==============
NetfilterQueue provides access to packets matched by an iptables rule in
Linux. Packets so matched can be accepted, dropped, altered, reordered,
or given a mark.
Linux. Packets so matched can be accepted, dropped, altered, or given a mark.
libnetfilter_queue (the netfilter library, not this module) is part of the
Libnetfilter_queue (the netfilter library, not this module) is part of the
`Netfilter project <http://netfilter.org/projects/libnetfilter_queue/>`_.
The current version of NetfilterQueue requires Python 3.6 or later.
The last version with support for Python 2.7 was 0.9.0.
Example
=======
@ -27,18 +15,18 @@ The following script prints a short description of each packet before accepting
it. ::
from netfilterqueue import NetfilterQueue
def print_and_accept(pkt):
print(pkt)
pkt.accept()
nfqueue = NetfilterQueue()
nfqueue.bind(1, print_and_accept)
try:
nfqueue.run()
except KeyboardInterrupt:
print('')
nfqueue.unbind()
You can also make your own socket so that it can be used with gevent, for example. ::
@ -68,7 +56,7 @@ To send packets destined for your LAN to the script, type something like::
Installation
============
NetfilterQueue is a C extention module that links against libnetfilter_queue.
NetfilterQueue is a C extention module that links against libnetfilter_queue.
Before installing, ensure you have:
1. A C compiler
@ -79,7 +67,7 @@ Before installing, ensure you have:
On Debian or Ubuntu, install these files with::
apt-get install build-essential python3-dev libnetfilter-queue-dev
apt-get install build-essential python-dev libnetfilter-queue-dev
From PyPI
---------
@ -93,17 +81,22 @@ From source
To install from source::
pip install cython
git clone https://github.com/oremanj/python-netfilterqueue
git clone git@github.com:kti/python-netfilterqueue.git
cd python-netfilterqueue
pip install .
python setup.py install
If Cython is installed, Distutils will use it to regenerate the .c source from the .pyx. It will then compile the .c into a .so.
API
===
``NetfilterQueue.COPY_NONE``, ``NetfilterQueue.COPY_META``, ``NetfilterQueue.COPY_PACKET``
``NetfilterQueue.COPY_NONE``
``NetfilterQueue.COPY_META``
``NetfilterQueue.COPY_PACKET``
These constants specify how much of the packet should be given to the
script: nothing, metadata, or the whole packet.
script- nothing, metadata, or the whole packet.
NetfilterQueue objects
----------------------
@ -111,12 +104,9 @@ NetfilterQueue objects
A NetfilterQueue object represents a single queue. Configure your queue with
a call to ``bind``, then start receiving packets with a call to ``run``.
``NetfilterQueue.bind(queue_num, callback, max_len=1024, mode=COPY_PACKET, range=65535, sock_len=...)``
Create and bind to the queue. ``queue_num`` uniquely identifies this
queue for the kernel. It must match the ``--queue-num`` in your iptables
rule, but there is no ordering requirement: it's fine to either ``bind()``
first or set up the iptables rule first.
``callback`` is a function or method that takes one
``QueueHandler.bind(queue_num, callback[, max_len[, mode[, range, [sock_len]]]])``
Create and bind to the queue. ``queue_num`` must match the number in your
iptables rule. ``callback`` is a function or method that takes one
argument, a Packet object (see below). ``max_len`` sets the largest number
of packets that can be in the queue; new packets are dropped if the size of
the queue reaches this number. ``mode`` determines how much of the packet
@ -125,26 +115,21 @@ a call to ``bind``, then start receiving packets with a call to ``run``.
the source and destination IPs of a IPv4 packet, ``range`` could be 20.
``sock_len`` sets the receive socket buffer size.
``NetfilterQueue.unbind()``
``QueueHandler.unbind()``
Remove the queue. Packets matched by your iptables rule will be dropped.
``NetfilterQueue.get_fd()``
Get the file descriptor of the socket used to receive queued
packets and send verdicts. If you're using an async event loop,
you can poll this FD for readability and call ``run(False)`` every
time data appears on it.
``QueueHandler.get_fd()``
Get the file descriptor of the queue handler.
``NetfilterQueue.run(block=True)``
Send packets to your callback. By default, this method blocks, running
until an exception is raised (such as by Ctrl+C). Set
``block=False`` to process the pending messages without waiting for more;
in conjunction with the ``get_fd`` method, you can use this to integrate
with async event loops.
``QueueHandler.run([block])``
Send packets to your callback. By default, this method blocks. Set
block=False to let your thread continue. You can get the file descriptor
of the socket with the ``get_fd`` method.
``NetfilterQueue.run_socket(socket)``
``QueueHandler.run_socket(socket)``
Send packets to your callback, but use the supplied socket instead of
recv, so that, for example, gevent can monkeypatch it. You can make a
socket with ``socket.fromfd(nfqueue.get_fd(), socket.AF_NETLINK, socket.SOCK_RAW)``
socket with ``socket.fromfd(nfqueue.get_fd(), socket.AF_UNIX, socket.SOCK_STREAM)``
and optionally make it non-blocking with ``socket.setblocking(False)``.
Packet objects
@ -153,138 +138,42 @@ Packet objects
Objects of this type are passed to your callback.
``Packet.get_payload()``
Return the packet's payload as a bytes object. The returned value
starts with the IP header. You must call ``retain()`` if you want
to be able to ``get_payload()`` after your callback has returned.
If you have already called ``set_payload()``, then ``get_payload()``
returns what you passed to ``set_payload()``.
Return the packet's payload as a string (Python 2) or bytes (Python 3).
``Packet.set_payload(payload)``
Set the packet payload. Call this before ``accept()`` if you want to
change the contents of the packet before allowing it to be released.
Don't forget to update the transport-layer checksum (or clear it,
if you're using UDP), or else the recipient is likely to drop the
packet. If you're changing the length of the packet, you'll also need
to update the IP length, IP header checksum, and probably some
transport-level fields (such as UDP length for UDP).
Set the packet payload. ``payload`` is a bytes.
``Packet.get_payload_len()``
Return the size of the payload.
``Packet.set_mark(mark)``
Give the packet a kernel mark, which can be used in future iptables
rules. ``mark`` is a 32-bit number.
Give the packet a kernel mark. ``mark`` is a 32-bit number.
``Packet.get_mark()``
Get the mark on the packet (either the one you set using
``set_mark()``, or the one it arrived with if you haven't called
``set_mark()``).
Get the mark already on the packet.
``Packet.get_hw()``
Return the source hardware address of the packet as a Python
bytestring, or ``None`` if the source hardware address was not
captured (packets captured by the ``OUTPUT`` or ``PREROUTING``
hooks). For example, on Ethernet the result will be a six-byte
MAC address. The destination hardware address is not available
because it is determined in the kernel only after packet filtering
is complete.
``Packet.get_timestamp()``
Return the time at which this packet was received by the kernel,
as a floating-point Unix timestamp with microsecond precision
(comparable to the result of ``time.time()``, for example).
Packets captured by the ``OUTPUT`` or ``POSTROUTING`` hooks
do not have a timestamp, and ``get_timestamp()`` will return 0.0
for them.
``Packet.id``
The identifier assigned to this packet by the kernel. Typically
the first packet received by your queue starts at 1 and later ones
count up from there.
``Packet.hw_protocol``
The link-layer protocol for this packet. For example, IPv4 packets
on Ethernet would have this set to the EtherType for IPv4, which is
``0x0800``.
``Packet.mark``
The mark that had been assigned to this packet when it was enqueued.
Unlike the result of ``get_mark()``, this does not change if you call
``set_mark()``.
``Packet.hook``
The netfilter hook (iptables chain, roughly) that diverted this packet
into our queue. Values 0 through 4 correspond to PREROUTING, INPUT,
FORWARD, OUTPUT, and POSTROUTING respectively.
``Packet.indev``, ``Packet.outdev``, ``Packet.physindev``, ``Packet.physoutdev``
The interface indices on which the packet arrived (``indev``) or is slated
to depart (``outdev``). These are integers, which can be converted to
names like "eth0" by using ``socket.if_indextoname()``. Zero means
no interface is applicable, either because the packet was locally generated
or locally received, or because the interface information wasn't available
when the packet was queued (for example, ``PREROUTING`` rules don't yet
know the ``outdev``). If the ``indev`` or ``outdev`` refers to a bridge
device, then the corresponding ``physindev`` or ``physoutdev`` will name
the bridge member on which the actual traffic occurred; otherwise
``physindev`` and ``physoutdev`` will be zero.
``Packet.retain()``
Allocate a copy of the packet payload for use after the callback
has returned. ``get_payload()`` will raise an exception at that
point if you didn't call ``retain()``.
Return the hardware address as a Python string.
``Packet.accept()``
Accept the packet. You can reorder packets by accepting them
in a different order than the order in which they were passed
to your callback.
Accept the packet.
``Packet.drop()``
Drop the packet.
``Packet.repeat()``
Restart processing of this packet from the beginning of its
Netfilter hook (iptables chain, roughly). Any changes made
using ``set_payload()`` or ``set_mark()`` are preserved; in the
absence of such changes, the packet will probably come right
back to the same queue.
Iterate the same cycle once more.
Callback objects
----------------
Your callback can be any one-argument callable and will be invoked with
a ``Packet`` object as argument. You must call ``retain()`` within the
callback if you want to be able to ``get_payload()`` after the callback
has returned. You can hang onto ``Packet`` objects and resolve them later,
but note that packets continue to count against the queue size limit
until they've been given a verdict (accept, drop, or repeat). Also, the
kernel stores the enqueued packets in a linked list, so keeping lots of packets
outstanding is likely to adversely impact performance.
Your callback can be function or a method and must accept one argument, a
Packet object. You must call either Packet.accept() or Packet.drop() before
returning.
Monitoring a different network namespace
----------------------------------------
If you are using Linux network namespaces (``man 7
network_namespaces``) in some kind of containerization system, all of
the Netfilter queue state is kept per-namespace; queue 1 in namespace
X is not the same as queue 1 in namespace Y. NetfilterQueue will
ordinarily pass you the traffic for the network namespace you're a
part of. If you want to monitor a different one, you can do so with a
bit of trickery and cooperation from a process in that
namespace; this section describes how.
You'll need to arrange for a process in the network namespace you want
to monitor to call ``socket(AF_NETLINK, SOCK_RAW, 12)`` and pass you
the resulting file descriptor using something like
``socket.send_fds()`` over a Unix domain socket. (12 is
``NETLINK_NETFILTER``, a constant which is not exposed by the Python
``socket`` module.) Once you've received that file descriptor in your
process, you can create a NetfilterQueue object using the special
constructor ``NetfilterQueue(sockfd=N)`` where N is the file
descriptor you received. Because the socket was originally created
in the other network namespace, the kernel treats it as part of that
namespace, and you can use it to access that namespace even though it's
not the namespace you're in yourself.
``callback(packet)`` or ``callback(self, packet)``
Handle a single packet from the queue. You must call either
``packet.accept()`` or ``packet.drop()``.
Usage
=====
@ -292,12 +181,12 @@ Usage
To send packets to the queue::
iptables -I <table or chain> <match specification> -j NFQUEUE --queue-num <queue number>
For example::
iptables -I INPUT -d 192.168.0.0/24 -j NFQUEUE --queue-num 1
The only special part of the rule is the target. Rules can have any match and
The only special part of the rule is the target. Rules can have any match and
can be added to any table or chain.
Valid queue numbers are integers from 0 to 65,535 inclusive.
@ -330,44 +219,25 @@ The fields are:
Limitations
===========
* We use a fixed-size 4096-byte buffer for packets, so you are likely
to see truncation on loopback and on Ethernet with jumbo packets.
If this is a problem, either lower the MTU on your loopback, disable
jumbo packets, or get Cython, change ``DEF BufferSize = 4096`` in
``netfilterqueue.pyx``, and rebuild.
* Not all information available from libnetfilter_queue is exposed:
missing pieces include packet input/output network interface names,
checksum offload flags, UID/GID and security context data
associated with the packet (if any).
* Not all information available from the kernel is even processed by
libnetfilter_queue: missing pieces include additional link-layer
header data for some packets (including VLAN tags), connection-tracking
state, and incoming packet length (if truncated for queueing).
* We do not expose the libnetfilter_queue interface for changing queue flags.
Most of these pertain to other features we don't support (listed above),
but there's one that could set the queue to accept (rather than dropping)
packets received when it's full.
* Compiled with a 4096-byte buffer for packets, so it probably won't work on
loopback or Ethernet with jumbo packets. If this is a problem, either lower
MTU on your loopback, disable jumbo packets, or get Cython,
change ``DEF BufferSize = 4096`` in ``netfilterqueue.pyx``, and rebuild.
* Full libnetfilter_queue API is not yet implemented:
* Omits methods for getting information about the interface a packet has
arrived on or is leaving on
* Probably other stuff is omitted too
Source
======
https://github.com/oremanj/python-netfilterqueue
Authorship
==========
python-netfilterqueue was originally written by Matthew Fox of
Kerkhoff Technologies, Inc. Since 2022 it has been maintained by
Joshua Oreman of Hudson River Trading LLC. Both authors wish to
thank their employers for their support of open source.
https://github.com/kti/python-netfilterqueue
License
=======
Copyright (c) 2011, Kerkhoff Technologies, Inc, and contributors.
Copyright (c) 2011, Kerkhoff Technologies, Inc.
`MIT licensed <https://github.com/kti/python-netfilterqueue/blob/master/LICENSE.txt>`_

53
ci.sh
View File

@ -1,53 +0,0 @@
#!/bin/bash
set -ex -o pipefail
pip install -U pip setuptools wheel
sudo apt-get install libnetfilter-queue-dev
# Cython is required to build the sdist...
pip install cython
python setup.py sdist --formats=zip
# ... but not to install it
pip uninstall -y cython
python setup.py build_ext
pip install dist/*.zip
pip install -Ur test-requirements.txt
if [ "$CHECK_LINT" = "1" ]; then
error=0
black_files="setup.py tests netfilterqueue"
if ! black --check $black_files; then
error=$?
black --diff $black_files
fi
mypy --strict -p netfilterqueue || error=$?
( mkdir empty; cd empty; python -m mypy.stubtest netfilterqueue ) || error=$?
if [ $error -ne 0 ]; then
cat <<EOF
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Problems were found by static analysis (listed above).
To fix formatting and see remaining errors, run:
pip install -r test-requirements.txt
black $black_files
mypy --strict -p netfilterqueue
( mkdir empty; cd empty; python -m mypy.stubtest netfilterqueue )
in your local checkout.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
EOF
exit 1
fi
exit 0
fi
cd tests
pytest -W error -ra -v .

8946
netfilterqueue.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,22 +1,18 @@
cdef extern from "<sys/types.h>":
cdef extern from "sys/types.h":
ctypedef unsigned char u_int8_t
ctypedef unsigned short int u_int16_t
ctypedef unsigned int u_int32_t
cdef extern from "<unistd.h>":
int dup2(int oldfd, int newfd)
cdef extern from "<errno.h>":
int errno
# dummy defines from asm-generic/errno.h:
cdef enum:
EINTR = 4
EAGAIN = 11 # Try again
EWOULDBLOCK = EAGAIN
ENOBUFS = 105 # No buffer space available
cdef extern from "<netinet/ip.h>":
cdef extern from "netinet/ip.h":
struct iphdr:
u_int8_t tos
u_int16_t tot_len
@ -63,7 +59,7 @@ cdef extern from "Python.h":
object PyBytes_FromStringAndSize(char *s, Py_ssize_t len)
object PyString_FromStringAndSize(char *s, Py_ssize_t len)
cdef extern from "<sys/time.h>":
cdef extern from "sys/time.h":
ctypedef long time_t
struct timeval:
time_t tv_sec
@ -71,7 +67,7 @@ cdef extern from "<sys/time.h>":
struct timezone:
pass
cdef extern from "<netinet/in.h>":
cdef extern from "netinet/in.h":
u_int32_t ntohl (u_int32_t __netlong) nogil
u_int16_t ntohs (u_int16_t __netshort) nogil
u_int32_t htonl (u_int32_t __hostlong) nogil
@ -86,9 +82,6 @@ cdef extern from "libnfnetlink/linux_nfnetlink.h":
cdef extern from "libnfnetlink/libnfnetlink.h":
struct nfnl_handle:
pass
nfnl_handle *nfnl_open()
void nfnl_close(nfnl_handle *h)
int nfnl_fd(nfnl_handle *h)
unsigned int nfnl_rcvbufsiz(nfnl_handle *h, unsigned int size)
cdef extern from "libnetfilter_queue/linux_nfnetlink_queue.h":
@ -112,7 +105,6 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
u_int8_t hw_addr[8]
nfq_handle *nfq_open()
nfq_handle *nfq_open_nfnl(nfnl_handle *h)
int nfq_close(nfq_handle *h)
int nfq_bind_pf(nfq_handle *h, u_int16_t pf)
@ -123,17 +115,15 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
u_int16_t num,
nfq_callback *cb,
void *data)
int nfq_destroy_queue(nfq_q_handle *qh)
# Any function that parses Netlink replies might invoke the user
# callback and thus might need to propagate a Python exception.
# This includes nfq_handle_packet but is not limited to that --
# other functions might send a query, read until they get the reply,
# and find a packet notification before the reply which they then
# must deal with.
int nfq_destroy_queue(nfq_q_handle *qh) except? -1
int nfq_handle_packet(nfq_handle *h, char *buf, int len) except? -1
int nfq_set_mode(nfq_q_handle *qh, u_int8_t mode, unsigned int len) except? -1
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen) except? -1
int nfq_handle_packet(nfq_handle *h, char *buf, int len)
int nfq_set_mode(nfq_q_handle *qh,
u_int8_t mode, unsigned int len)
q_set_queue_maxlen(nfq_q_handle *qh,
u_int32_t queuelen)
int nfq_set_verdict(nfq_q_handle *qh,
u_int32_t id,
@ -147,26 +137,22 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
u_int32_t mark,
u_int32_t datalen,
unsigned char *buf) nogil
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen)
int nfq_fd(nfq_handle *h)
nfqnl_msg_packet_hdr *nfq_get_msg_packet_hdr(nfq_data *nfad)
int nfq_get_payload(nfq_data *nfad, unsigned char **data)
int nfq_get_payload(nfq_data *nfad, char **data)
int nfq_get_timestamp(nfq_data *nfad, timeval *tv)
nfqnl_msg_packet_hw *nfq_get_packet_hw(nfq_data *nfad)
int nfq_get_nfmark(nfq_data *nfad)
u_int32_t nfq_get_indev(nfq_data *nfad)
u_int32_t nfq_get_outdev(nfq_data *nfad)
u_int32_t nfq_get_physindev(nfq_data *nfad)
u_int32_t nfq_get_physoutdev(nfq_data *nfad)
int nfq_get_nfmark (nfq_data *nfad)
nfnl_handle *nfq_nfnlh(nfq_handle *h)
# Dummy defines from linux/socket.h:
cdef enum: # Protocol families, same as address families.
PF_INET = 2
PF_INET6 = 10
PF_NETLINK = 16
cdef extern from "<sys/socket.h>":
cdef extern from "sys/socket.h":
ssize_t recv(int __fd, void *__buf, size_t __n, int __flags) nogil
int MSG_DONTWAIT
@ -180,21 +166,16 @@ cdef enum:
NF_STOP
NF_MAX_VERDICT = NF_STOP
cdef class NetfilterQueue:
cdef object __weakref__
cdef object user_callback # User callback
cdef nfq_handle *h # Handle to NFQueue library
cdef nfq_q_handle *qh # A handle to the queue
cdef class Packet:
cdef NetfilterQueue _queue
cdef bint _verdict_is_set # True if verdict has been issued, false otherwise
cdef nfq_q_handle *_qh
cdef nfq_data *_nfa
cdef nfqnl_msg_packet_hdr *_hdr
cdef nfqnl_msg_packet_hw *_hw
cdef bint _verdict_is_set # True if verdict has been issued,
# false otherwise
cdef bint _mark_is_set # True if a mark has been given, false otherwise
cdef bint _hwaddr_is_set
cdef bint _timestamp_is_set
cdef u_int32_t _given_mark # Mark given to packet
cdef bytes _given_payload # New payload of packet, or null
cdef bytes _owned_payload
# From NFQ packet header:
cdef readonly u_int32_t id
@ -204,24 +185,33 @@ cdef class Packet:
# Packet details:
cdef Py_ssize_t payload_len
cdef unsigned char *payload
cdef readonly char *payload
cdef timeval timestamp
cdef u_int8_t hw_addr[8]
cdef readonly u_int32_t indev
cdef readonly u_int32_t physindev
cdef readonly u_int32_t outdev
cdef readonly u_int32_t physoutdev
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa)
cdef drop_refs(self)
cdef int verdict(self, u_int8_t verdict) except -1
# TODO: implement these
#cdef u_int8_t hw_addr[8] # A eui64-formatted address?
#cdef readonly u_int32_t nfmark
#cdef readonly u_int32_t indev
#cdef readonly u_int32_t physindev
#cdef readonly u_int32_t outdev
#cdef readonly u_int32_t physoutdev
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa)
cdef void verdict(self, u_int8_t verdict)
cpdef Py_ssize_t get_payload_len(self)
cpdef double get_timestamp(self)
cpdef bytes get_payload(self)
cpdef set_payload(self, bytes payload)
cpdef set_mark(self, u_int32_t mark)
cpdef get_mark(self)
cpdef retain(self)
cpdef accept(self)
cpdef drop(self)
cpdef repeat(self)
cdef class NetfilterQueue:
cdef object user_callback # User callback
cdef nfq_handle *h # Handle to NFQueue library
cdef nfq_q_handle *qh # A handle to the queue
cdef u_int16_t af # Address family
cdef packet_copy_size # Amount of packet metadata + data copied to buffer

View File

@ -5,6 +5,7 @@ function.
Copyright: (c) 2011, Kerkhoff Technologies Inc.
License: MIT; see LICENSE.txt
"""
VERSION = (0, 8, 1)
# Constants for module users
COPY_NONE = 0
@ -21,32 +22,19 @@ DEF MaxCopySize = BufferSize - MetadataSize
DEF SockOverhead = 760+20
DEF SockCopySize = MaxCopySize + SockOverhead
# Socket queue should hold max number of packets of copysize bytes
DEF SockRcvSize = DEFAULT_MAX_QUEUELEN * SockCopySize // 2
DEF SockRcvSize = DEFAULT_MAX_QUEUELEN * SockCopySize / 2
from cpython.exc cimport PyErr_CheckSignals
import socket
cimport cpython.version
cdef extern from "Python.h":
ctypedef struct PyTypeObject:
const char* tp_name
# A negative return value from this callback will stop processing and
# make nfq_handle_packet return -1, so we use that as the error flag.
cdef int global_callback(nfq_q_handle *qh, nfgenmsg *nfmsg,
nfq_data *nfa, void *data) except -1 with gil:
nfq_data *nfa, void *data) with gil:
"""Create a Packet and pass it to appropriate callback."""
cdef NetfilterQueue nfqueue = <NetfilterQueue>data
cdef object user_callback = <object>nfqueue.user_callback
if user_callback is None:
# Queue is being unbound; we can't send a verdict at this point
# so just ignore the packet. The kernel will drop it once we
# unbind.
return 1
packet = Packet()
packet.set_nfq_data(nfqueue, nfa)
try:
user_callback(packet)
finally:
packet.drop_refs()
packet.set_nfq_data(qh, nfa)
user_callback(packet)
return 1
cdef class Packet:
@ -57,66 +45,34 @@ cdef class Packet:
self._given_payload = None
def __str__(self):
cdef unsigned char *payload = NULL
if self._owned_payload:
payload = self._owned_payload
elif self.payload != NULL:
payload = self.payload
else:
return "%d byte packet, contents unretained" % (self.payload_len,)
cdef iphdr *hdr = <iphdr*>payload
cdef iphdr *hdr = <iphdr*>self.payload
protocol = PROTOCOLS.get(hdr.protocol, "Unknown protocol")
return "%s packet, %s bytes" % (protocol, self.payload_len)
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa):
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa):
"""
Assign a packet from NFQ to this object. Parse the header and load
local values.
"""
cdef nfqnl_msg_packet_hw *hw
cdef nfqnl_msg_packet_hdr *hdr
self._qh = qh
self._nfa = nfa
self._hdr = nfq_get_msg_packet_hdr(nfa)
hdr = nfq_get_msg_packet_hdr(nfa)
self._queue = queue
self.id = ntohl(hdr.packet_id)
self.hw_protocol = ntohs(hdr.hw_protocol)
self.hook = hdr.hook
self.id = ntohl(self._hdr.packet_id)
self.hw_protocol = ntohs(self._hdr.hw_protocol)
self.hook = self._hdr.hook
hw = nfq_get_packet_hw(nfa)
if hw == NULL:
# nfq_get_packet_hw doesn't work on OUTPUT and PREROUTING chains
self._hwaddr_is_set = False
else:
self.hw_addr = hw.hw_addr
self._hwaddr_is_set = True
self.payload_len = nfq_get_payload(nfa, &self.payload)
self.payload_len = nfq_get_payload(self._nfa, &self.payload)
if self.payload_len < 0:
# Probably using a mode that doesn't provide the payload
self.payload = NULL
self.payload_len = 0
raise OSError("Failed to get payload of packet.")
nfq_get_timestamp(nfa, &self.timestamp)
nfq_get_timestamp(self._nfa, &self.timestamp)
self.mark = nfq_get_nfmark(nfa)
self.indev = nfq_get_indev(nfa)
self.outdev = nfq_get_outdev(nfa)
self.physindev = nfq_get_physindev(nfa)
self.physoutdev = nfq_get_physoutdev(nfa)
cdef drop_refs(self):
"""
Called at the end of the user_callback, when the storage passed to
set_nfq_data() is about to be reused.
"""
self.payload = NULL
cdef int verdict(self, u_int8_t verdict) except -1:
cdef void verdict(self, u_int8_t verdict):
"""Call appropriate set_verdict... function on packet."""
if self._verdict_is_set:
raise RuntimeError("Verdict already given for this packet")
if self._queue.qh == NULL:
raise RuntimeError("Parent queue has already been unbound")
raise RuntimeWarning("Verdict already given for this packet.")
cdef u_int32_t modified_payload_len = 0
cdef unsigned char *modified_payload = NULL
@ -125,7 +81,7 @@ cdef class Packet:
modified_payload = self._given_payload
if self._mark_is_set:
nfq_set_verdict2(
self._queue.qh,
self._qh,
self.id,
verdict,
self._given_mark,
@ -133,7 +89,7 @@ cdef class Packet:
modified_payload)
else:
nfq_set_verdict(
self._queue.qh,
self._qh,
self.id,
verdict,
modified_payload_len,
@ -142,34 +98,24 @@ cdef class Packet:
self._verdict_is_set = True
def get_hw(self):
"""Return the packet's source MAC address as a Python bytestring, or
None if it's not available.
"""
if not self._hwaddr_is_set:
"""Return the hardware address as Python string."""
self._hw = nfq_get_packet_hw(self._nfa)
if self._hw == NULL:
# nfq_get_packet_hw doesn't work on OUTPUT and PREROUTING chains
return None
self.hw_addr = self._hw.hw_addr
cdef object py_string
py_string = PyBytes_FromStringAndSize(<char*>self.hw_addr, 8)
if cpython.version.PY_MAJOR_VERSION >= 3:
py_string = PyBytes_FromStringAndSize(<char*>self.hw_addr, 8)
else:
py_string = PyString_FromStringAndSize(<char*>self.hw_addr, 8)
return py_string
cpdef bytes get_payload(self):
def get_payload(self):
"""Return payload as Python string."""
if self._given_payload:
return self._given_payload
elif self._owned_payload:
return self._owned_payload
elif self.payload != NULL:
return self.payload[:self.payload_len]
elif self.payload_len == 0:
raise RuntimeError(
"Packet has no payload -- perhaps you're using COPY_META mode?"
)
else:
raise RuntimeError(
"Payload data is no longer available. You must call "
"retain() within the user_callback in order to copy "
"the payload if you need to expect it after your "
"callback has returned."
)
cdef object py_string
py_string = self.payload[:self.payload_len]
return py_string
cpdef Py_ssize_t get_payload_len(self):
return self.payload_len
@ -190,9 +136,6 @@ cdef class Packet:
return self._given_mark
return self.mark
cpdef retain(self):
self._owned_payload = self.get_payload()
cpdef accept(self):
"""Accept the packet."""
self.verdict(NF_ACCEPT)
@ -205,65 +148,25 @@ cdef class Packet:
"""Repeat the packet."""
self.verdict(NF_REPEAT)
cdef class NetfilterQueue:
"""Handle a single numbered queue."""
def __cinit__(self, *, u_int16_t af = PF_INET, int sockfd = -1):
cdef nfnl_handle *nlh = NULL
try:
if sockfd >= 0:
# This is a hack to use the given Netlink socket instead
# of the one allocated by nfq_open(). Intended use case:
# the given socket was opened in a different network
# namespace, and you want to monitor traffic in that
# namespace from this process running outside of it.
# Call socket(AF_NETLINK, SOCK_RAW, /*NETLINK_NETFILTER*/ 12)
# in the other namespace and pass that fd here (via Unix
# domain socket or similar).
nlh = nfnl_open()
if nlh == NULL:
raise OSError(errno, "Failed to open nfnetlink handle")
def __cinit__(self, *args, **kwargs):
self.af = kwargs.get("af", PF_INET)
# At this point nfnl_get_fd(nlh) is a new netlink socket
# and has been bound to an automatically chosen port id.
# This dup2 will close it, freeing up that address.
if dup2(sockfd, nfnl_fd(nlh)) < 0:
raise OSError(errno, "dup2 failed")
# Opening the netfilterqueue subsystem will rebind
# the socket, using the same portid from the old socket,
# which is hopefully now free. An alternative approach,
# theoretically more robust against concurrent binds,
# would be to autobind the new socket and write the chosen
# address to nlh->local. nlh is an opaque type so this
# would need to be done using memcpy (local starts
# 4 bytes into the structure); let's avoid that unless
# we really need it.
self.h = nfq_open_nfnl(nlh)
else:
self.h = nfq_open()
if self.h == NULL:
raise OSError(errno, "Failed to open NFQueue.")
except:
if nlh != NULL:
nfnl_close(nlh)
raise
nfq_unbind_pf(self.h, af) # This does NOT kick out previous queues
if nfq_bind_pf(self.h, af) < 0:
raise OSError("Failed to bind family %s. Are you root?" % af)
def __del__(self):
# unbind() can result in invocations of global_callback, so we
# must do it in __del__ (when this is still a valid
# NetfilterQueue object) rather than __dealloc__
self.unbind()
self.h = nfq_open()
if self.h == NULL:
raise OSError("Failed to open NFQueue.")
nfq_unbind_pf(self.h, self.af) # This does NOT kick out previous
# running queues
if nfq_bind_pf(self.h, self.af) < 0:
raise OSError("Failed to bind family %s. Are you root?" % self.af)
def __dealloc__(self):
if self.qh != NULL:
nfq_destroy_queue(self.qh)
# Don't call nfq_unbind_pf unless you want to disconnect any other
# processes using this libnetfilter_queue on this protocol family!
if self.h != NULL:
nfq_close(self.h)
nfq_close(self.h)
def bind(self, int queue_num, object user_callback,
u_int32_t max_len=DEFAULT_MAX_QUEUELEN,
@ -271,9 +174,6 @@ cdef class NetfilterQueue:
u_int32_t range=MaxPacketSize,
u_int32_t sock_len=SockRcvSize):
"""Create and bind to a new queue."""
if self.qh != NULL:
raise RuntimeError("A queue is already bound; use unbind() first")
cdef unsigned int newsiz
self.user_callback = user_callback
self.qh = nfq_create_queue(self.h, queue_num,
@ -284,27 +184,16 @@ cdef class NetfilterQueue:
if range > MaxCopySize:
range = MaxCopySize
if nfq_set_mode(self.qh, mode, range) < 0:
self.unbind()
raise OSError("Failed to set packet copy mode.")
nfq_set_queue_maxlen(self.qh, max_len)
newsiz = nfnl_rcvbufsiz(nfq_nfnlh(self.h), sock_len)
if newsiz != sock_len * 2:
try:
import warnings
warnings.warn(
"Socket rcvbuf limit is now %d, requested %d." % (newsiz, sock_len),
category=RuntimeWarning,
)
except: # if warnings are being treated as errors
self.unbind()
raise
newsiz = nfnl_rcvbufsiz(nfq_nfnlh(self.h),sock_len)
if newsiz != sock_len*2:
raise RuntimeWarning("Socket rcvbuf limit is now %d, requested %d." % (newsiz,sock_len))
def unbind(self):
"""Destroy the queue."""
self.user_callback = None
if self.qh != NULL:
nfq_destroy_queue(self.qh)
self.qh = NULL
@ -325,25 +214,22 @@ cdef class NetfilterQueue:
while True:
with nogil:
rv = recv(fd, buf, sizeof(buf), recv_flags)
if rv < 0:
if errno == EAGAIN:
if (rv >= 0):
nfq_handle_packet(self.h, buf, rv)
else:
if errno != ENOBUFS:
break
if errno == ENOBUFS:
# Kernel is letting us know we dropped a packet
continue
if errno == EINTR:
PyErr_CheckSignals()
continue
raise OSError(errno, "recv failed")
nfq_handle_packet(self.h, buf, rv)
def run_socket(self, s):
"""Accept packets using socket.recv so that, for example, gevent can monkeypatch it."""
import socket
while True:
try:
buf = s.recv(BufferSize)
rv = len(buf)
if rv >= 0:
nfq_handle_packet(self.h, buf, rv)
else:
break
except socket.error as e:
err = e.args[0]
if err == ENOBUFS:
@ -355,19 +241,6 @@ cdef class NetfilterQueue:
else:
# This is bad. Let the caller handle it.
raise e
else:
nfq_handle_packet(self.h, buf, len(buf))
cdef void _fix_names():
# Avoid ._impl showing up in reprs. This doesn't work on PyPy; there we would
# need to modify the name before PyType_Ready(), but I can't find any way to
# write Cython code that would execute at that time.
cdef PyTypeObject* tp = <PyTypeObject*>Packet
tp.tp_name = "netfilterqueue.Packet"
tp = <PyTypeObject*>NetfilterQueue
tp.tp_name = "netfilterqueue.NetfilterQueue"
_fix_names()
PROTOCOLS = {
0: "HOPOPT",

View File

@ -1,12 +0,0 @@
from ._impl import (
COPY_NONE as COPY_NONE,
COPY_META as COPY_META,
COPY_PACKET as COPY_PACKET,
Packet as Packet,
NetfilterQueue as NetfilterQueue,
PROTOCOLS as PROTOCOLS,
)
from ._version import (
VERSION as VERSION,
__version__ as __version__,
)

View File

@ -1,47 +0,0 @@
import socket
from enum import IntEnum
from typing import Callable, Dict, Optional, Tuple
COPY_NONE: int
COPY_META: int
COPY_PACKET: int
class Packet:
hook: int
hw_protocol: int
id: int
mark: int
# These are ifindexes, pass to socket.if_indextoname() to get names:
indev: int
outdev: int
physindev: int
physoutdev: int
def get_hw(self) -> Optional[bytes]: ...
def get_payload(self) -> bytes: ...
def get_payload_len(self) -> int: ...
def get_timestamp(self) -> float: ...
def get_mark(self) -> int: ...
def set_payload(self, payload: bytes) -> None: ...
def set_mark(self, mark: int) -> None: ...
def retain(self) -> None: ...
def accept(self) -> None: ...
def drop(self) -> None: ...
def repeat(self) -> None: ...
class NetfilterQueue:
def __new__(self, *, af: int = ..., sockfd: int = ...) -> NetfilterQueue: ...
def bind(
self,
queue_num: int,
user_callback: Callable[[Packet], None],
max_len: int = ...,
mode: int = COPY_PACKET,
range: int = ...,
sock_len: int = ...,
) -> None: ...
def unbind(self) -> None: ...
def get_fd(self) -> int: ...
def run(self, block: bool = ...) -> None: ...
def run_socket(self, s: socket.socket) -> None: ...
PROTOCOLS: Dict[int, str]

View File

@ -1,4 +0,0 @@
# This file is imported from __init__.py and exec'd from setup.py
__version__ = "1.1.0+dev"
VERSION = (1, 1, 0)

View File

@ -1,3 +0,0 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

View File

@ -1,61 +1,38 @@
import os, sys
from setuptools import setup, Extension
from distutils.core import setup, Extension
exec(open("netfilterqueue/_version.py", encoding="utf-8").read())
VERSION = "0.8.1" # Remember to change netfilterqueue.pyx when version changes.
setup_requires = ["wheel"]
try:
# Use Cython
from Cython.Build import cythonize
ext_modules = cythonize(
Extension(
"netfilterqueue._impl",
["netfilterqueue/_impl.pyx"],
from Cython.Distutils import build_ext
cmd = {"build_ext": build_ext}
ext = Extension(
"netfilterqueue",
sources=["netfilterqueue.pyx",],
libraries=["netfilter_queue"],
),
compiler_directives={"language_level": "3str"},
)
)
except ImportError:
# No Cython
if "egg_info" in sys.argv:
# We're being run by pip to figure out what we need. Request cython in
# setup_requires below.
setup_requires += ["cython"]
elif not os.path.exists(
os.path.join(os.path.dirname(__file__), "netfilterqueue/_impl.c")
):
sys.stderr.write(
"You must have Cython installed (`pip install cython`) to build this "
"package from source.\nIf you're receiving this error when installing from "
"PyPI, please file a bug report at "
"https://github.com/oremanj/python-netfilterqueue/issues/new\n"
)
sys.exit(1)
ext_modules = [
Extension(
"netfilterqueue._impl",
["netfilterqueue/_impl.c"],
cmd = {}
ext = Extension(
"netfilterqueue",
sources = ["netfilterqueue.c"],
libraries=["netfilter_queue"],
)
]
setup(
cmdclass = cmd,
ext_modules = [ext],
name="NetfilterQueue",
version=__version__,
version=VERSION,
license="MIT",
author="Matthew Fox <matt@tansen.ca>, Joshua Oreman <oremanj@gmail.com>",
author_email="oremanj@gmail.com",
url="https://github.com/oremanj/python-netfilterqueue",
author="Matthew Fox",
author_email="matt@tansen.ca",
url="https://github.com/kti/python-netfilterqueue",
description="Python bindings for libnetfilter_queue",
long_description=open("README.rst", encoding="utf-8").read(),
packages=["netfilterqueue"],
ext_modules=ext_modules,
include_package_data=True,
exclude_package_data={"netfilterqueue": ["*.c"]},
setup_requires=setup_requires,
python_requires=">=3.6",
classifiers=[
long_description=open("README.rst").read(),
download_url="http://pypi.python.org/packages/source/N/NetfilterQueue/NetfilterQueue-%s.tar.gz" % VERSION,
classifiers = [
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
@ -66,5 +43,5 @@ setup(
"Programming Language :: Cython",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
],
]
)

View File

@ -1,8 +0,0 @@
git+https://github.com/NightTsarina/python-unshare.git@4e98c177bdeb24c5dcfcd66c457845a776bbb75c
pytest
trio
pytest-trio
async_generator
black
platformdirs <= 2.4.0 # needed by black; 2.4.1+ don't support py3.6
mypy; implementation_name == "cpython"

View File

@ -1,74 +0,0 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile test-requirements.in
#
async-generator==1.10
# via
# -r test-requirements.in
# pytest-trio
# trio
attrs==21.4.0
# via
# outcome
# pytest
# trio
black==21.12b0
# via -r test-requirements.in
click==8.0.3
# via black
idna==3.3
# via trio
iniconfig==1.1.1
# via pytest
mypy==0.931 ; implementation_name == "cpython"
# via -r test-requirements.in
mypy-extensions==0.4.3
# via
# black
# mypy
outcome==1.1.0
# via
# pytest-trio
# trio
packaging==21.3
# via pytest
pathspec==0.9.0
# via black
platformdirs==2.4.0
# via
# -r test-requirements.in
# black
pluggy==1.0.0
# via pytest
py==1.11.0
# via pytest
pyparsing==3.0.6
# via packaging
pytest==6.2.5
# via
# -r test-requirements.in
# pytest-trio
pytest-trio==0.7.0
# via -r test-requirements.in
python-unshare @ git+https://github.com/NightTsarina/python-unshare.git@4e98c177bdeb24c5dcfcd66c457845a776bbb75c
# via -r test-requirements.in
sniffio==1.2.0
# via trio
sortedcontainers==2.4.0
# via trio
toml==0.10.2
# via pytest
tomli==1.2.3
# via
# black
# mypy
trio==0.19.0
# via
# -r test-requirements.in
# pytest-trio
typing-extensions==4.0.1
# via
# black
# mypy

View File

@ -1,309 +0,0 @@
import math
import os
import pytest
import socket
import subprocess
import sys
import trio
import unshare # type: ignore
import netfilterqueue
from functools import partial
from typing import Any, AsyncIterator, Callable, Dict, Optional, Tuple
from async_generator import asynccontextmanager
from pytest_trio.enable_trio_mode import * # type: ignore
# We'll create three network namespaces, representing a router (which
# has interfaces on ROUTER_IP[1, 2]) and two hosts connected to it
# (PEER_IP[1, 2] respectively). The router (in the parent pytest
# process) will configure netfilterqueue iptables rules and use them
# to intercept and modify traffic between the two hosts (each of which
# is implemented in a subprocess).
#
# The 'peer' subprocesses communicate with each other over UDP, and
# with the router parent over a UNIX domain SOCK_SEQPACKET socketpair.
# Each packet sent from the parent to one peer over the UNIX domain
# socket will be forwarded to the other peer over UDP. Each packet
# received over UDP by either of the peers will be forwarded to its
# parent.
ROUTER_IP = {1: "172.16.101.1", 2: "172.16.102.1"}
PEER_IP = {1: "172.16.101.2", 2: "172.16.102.2"}
def enter_netns() -> None:
# Create new namespaces of the other types we need
unshare.unshare(unshare.CLONE_NEWNS | unshare.CLONE_NEWNET)
# Mount /sys so network tools work
subprocess.run("/bin/mount -t sysfs sys /sys".split(), check=True)
# Bind-mount /run so iptables can get its lock
subprocess.run("/bin/mount -t tmpfs tmpfs /run".split(), check=True)
# Set up loopback interface
subprocess.run("/sbin/ip link set lo up".split(), check=True)
@pytest.hookimpl(tryfirst=True) # type: ignore
def pytest_runtestloop() -> None:
if os.getuid() != 0:
# Create a new user namespace for the whole test session
outer = {"uid": os.getuid(), "gid": os.getgid()}
unshare.unshare(unshare.CLONE_NEWUSER)
with open("/proc/self/setgroups", "wb") as fp:
# This is required since we're unprivileged outside the namespace
fp.write(b"deny")
for idtype in ("uid", "gid"):
with open(f"/proc/self/{idtype}_map", "wb") as fp:
fp.write(b"0 %d 1" % (outer[idtype],))
assert os.getuid() == os.getgid() == 0
# Create a new network namespace for this pytest process
enter_netns()
with open("/proc/sys/net/ipv4/ip_forward", "wb") as fp:
fp.write(b"1\n")
async def peer_main(idx: int, parent_fd: int) -> None:
parent = trio.socket.fromfd(parent_fd, socket.AF_UNIX, socket.SOCK_SEQPACKET)
# Tell parent we've set up our netns, wait for it to confirm it's
# created our veth interface
await parent.send(b"ok")
assert b"ok" == await parent.recv(4096)
my_ip = PEER_IP[idx]
router_ip = ROUTER_IP[idx]
peer_ip = PEER_IP[3 - idx]
for cmd in (
f"ip link set veth0 up",
f"ip addr add {my_ip}/24 dev veth0",
f"ip route add default via {router_ip} dev veth0",
):
await trio.run_process(cmd.split(), capture_stdout=True, capture_stderr=True)
peer = trio.socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
await peer.bind((my_ip, 0))
# Tell the parent our port and get our peer's port
await parent.send(b"%d" % peer.getsockname()[1])
peer_port = int(await parent.recv(4096))
await peer.connect((peer_ip, peer_port))
# Enter the message-forwarding loop
async def proxy_one_way(
src: trio.socket.SocketType, dest: trio.socket.SocketType
) -> None:
while src.fileno() >= 0:
try:
msg = await src.recv(4096)
except trio.ClosedResourceError:
return
if not msg:
dest.close()
return
try:
await dest.send(msg)
except BrokenPipeError:
return
async with trio.open_nursery() as nursery:
nursery.start_soon(proxy_one_way, parent, peer)
nursery.start_soon(proxy_one_way, peer, parent)
def _default_capture_cb(
target: "trio.MemorySendChannel[netfilterqueue.Packet]",
packet: netfilterqueue.Packet,
) -> None:
packet.retain()
target.send_nowait(packet)
class Harness:
def __init__(self) -> None:
self._received: Dict[int, trio.MemoryReceiveChannel[bytes]] = {}
self._conn: Dict[int, trio.socket.SocketType] = {}
self.dest_addr: Dict[int, Tuple[str, int]] = {}
self.failed = False
async def _run_peer(self, idx: int, *, task_status: Any) -> None:
their_ip = PEER_IP[idx]
my_ip = ROUTER_IP[idx]
conn, child_conn = trio.socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
with conn:
try:
process = await trio.open_process(
[sys.executable, __file__, str(idx), str(child_conn.fileno())],
stdin=subprocess.DEVNULL,
pass_fds=[child_conn.fileno()],
preexec_fn=enter_netns,
)
finally:
child_conn.close()
assert b"ok" == await conn.recv(4096)
for cmd in (
f"ip link add veth{idx} type veth peer netns {process.pid} name veth0",
f"ip link set veth{idx} up",
f"ip addr add {my_ip}/24 dev veth{idx}",
):
await trio.run_process(cmd.split())
try:
await conn.send(b"ok")
self._conn[idx] = conn
task_status.started()
retval = await process.wait()
except BaseException:
process.kill()
with trio.CancelScope(shield=True):
await process.wait()
raise
else:
if retval != 0:
raise RuntimeError(
"peer subprocess exited with code {}".format(retval)
)
finally:
# On some kernels the veth device is removed when the subprocess exits
# and its netns goes away. check=False to suppress that error.
await trio.run_process(f"ip link delete veth{idx}".split(), check=False)
async def _manage_peer(self, idx: int, *, task_status: Any) -> None:
async with trio.open_nursery() as nursery:
await nursery.start(self._run_peer, idx)
packets_w, packets_r = trio.open_memory_channel[bytes](math.inf)
self._received[idx] = packets_r
task_status.started()
async with packets_w:
while True:
msg = await self._conn[idx].recv(4096)
if not msg:
break
await packets_w.send(msg)
@asynccontextmanager
async def run(self) -> AsyncIterator[None]:
async with trio.open_nursery() as nursery:
async with trio.open_nursery() as start_nursery:
start_nursery.start_soon(nursery.start, self._manage_peer, 1)
start_nursery.start_soon(nursery.start, self._manage_peer, 2)
# Tell each peer about the other one's port
for idx in (1, 2):
self.dest_addr[idx] = (
PEER_IP[idx],
int(await self._received[idx].receive()),
)
await self._conn[3 - idx].send(b"%d" % self.dest_addr[idx][1])
yield
self._conn[1].shutdown(socket.SHUT_WR)
self._conn[2].shutdown(socket.SHUT_WR)
if not self.failed:
for idx in (1, 2):
async for remainder in self._received[idx]:
raise AssertionError(
f"Peer {idx} received unexepcted packet {remainder!r}"
)
def bind_queue(
self,
cb: Callable[[netfilterqueue.Packet], None],
*,
queue_num: int = -1,
**options: int,
) -> Tuple[int, netfilterqueue.NetfilterQueue]:
nfq = netfilterqueue.NetfilterQueue()
# Use a smaller socket buffer to avoid a warning in CI
options.setdefault("sock_len", 131072)
if queue_num >= 0:
nfq.bind(queue_num, cb, **options)
else:
for queue_num in range(16):
try:
nfq.bind(queue_num, cb, **options)
break
except Exception as ex:
last_error = ex
else:
raise RuntimeError(
"Couldn't bind any netfilter queue number between 0-15"
) from last_error
return queue_num, nfq
@asynccontextmanager
async def enqueue_packets_to(
self, idx: int, queue_num: int, *, forwarded: bool = True
) -> AsyncIterator[None]:
if forwarded:
chain = "FORWARD"
else:
chain = "OUTPUT"
rule = f"{chain} -d {PEER_IP[idx]} -j NFQUEUE --queue-num {queue_num}"
await trio.run_process(f"/sbin/iptables -A {rule}".split())
try:
yield
finally:
await trio.run_process(f"/sbin/iptables -D {rule}".split())
@asynccontextmanager
async def capture_packets_to(
self,
idx: int,
cb: Callable[
["trio.MemorySendChannel[netfilterqueue.Packet]", netfilterqueue.Packet],
None,
] = _default_capture_cb,
**options: int,
) -> AsyncIterator["trio.MemoryReceiveChannel[netfilterqueue.Packet]"]:
packets_w, packets_r = trio.open_memory_channel[netfilterqueue.Packet](math.inf)
queue_num, nfq = self.bind_queue(partial(cb, packets_w), **options)
try:
async with self.enqueue_packets_to(idx, queue_num):
async with packets_w, trio.open_nursery() as nursery:
@nursery.start_soon
async def listen_for_packets() -> None:
while True:
await trio.lowlevel.wait_readable(nfq.get_fd())
nfq.run(block=False)
yield packets_r
nursery.cancel_scope.cancel()
finally:
nfq.unbind()
async def expect(self, idx: int, *packets: bytes) -> None:
for expected in packets:
with trio.move_on_after(5) as scope:
received = await self._received[idx].receive()
if scope.cancelled_caught:
self.failed = True
raise AssertionError(
f"Timeout waiting for peer {idx} to receive {expected!r}"
)
if received != expected:
self.failed = True
raise AssertionError(
f"Expected peer {idx} to receive {expected!r} but it "
f"received {received!r}"
)
async def send(self, idx: int, *packets: bytes) -> None:
for packet in packets:
await self._conn[3 - idx].send(packet)
@pytest.fixture
async def harness() -> AsyncIterator[Harness]:
h = Harness()
async with h.run():
yield h
if __name__ == "__main__":
trio.run(peer_main, int(sys.argv[1]), int(sys.argv[2]))

View File

@ -1,341 +0,0 @@
import gc
import struct
import os
import pytest
import signal
import socket
import sys
import time
import trio
import trio.testing
import weakref
from netfilterqueue import NetfilterQueue, COPY_META
async def test_comms_without_queue(harness):
await harness.send(2, b"hello", b"world")
await harness.expect(2, b"hello", b"world")
await harness.send(1, b"it works?")
await harness.expect(1, b"it works?")
async def test_queue_dropping(harness):
async def drop(packets, msg):
async for packet in packets:
assert "UDP packet" in str(packet)
if packet.get_payload()[28:] == msg:
packet.drop()
else:
packet.accept()
async with trio.open_nursery() as nursery:
async with harness.capture_packets_to(2) as p2, harness.capture_packets_to(
1
) as p1:
nursery.start_soon(drop, p2, b"two")
nursery.start_soon(drop, p1, b"one")
await harness.send(2, b"one", b"two", b"three")
await harness.send(1, b"one", b"two", b"three")
await harness.expect(2, b"one", b"three")
await harness.expect(1, b"two", b"three")
# Once we stop capturing, everything gets through again:
await harness.send(2, b"one", b"two", b"three")
await harness.send(1, b"one", b"two", b"three")
await harness.expect(2, b"one", b"two", b"three")
await harness.expect(1, b"one", b"two", b"three")
async def test_rewrite_reorder(harness):
async def munge(packets):
def set_udp_payload(p, msg):
data = bytearray(p.get_payload())
old_len = len(data) - 28
if len(msg) != old_len:
data[2:4] = struct.pack(">H", len(msg) + 28)
data[24:26] = struct.pack(">H", len(msg) + 8)
# Recompute checksum too
data[10:12] = b"\x00\x00"
words = struct.unpack(">10H", data[:20])
cksum = sum(words)
while cksum >> 16:
cksum = (cksum & 0xFFFF) + (cksum >> 16)
data[10:12] = struct.pack(">H", cksum ^ 0xFFFF)
# Clear UDP checksum and set payload
data[28:] = msg
data[26:28] = b"\x00\x00"
p.set_payload(bytes(data))
async for packet in packets:
payload = packet.get_payload()[28:]
if payload == b"one":
set_udp_payload(packet, b"numero uno")
assert b"numero uno" == packet.get_payload()[28:]
packet.accept()
elif payload == b"two":
two = packet
elif payload == b"three":
set_udp_payload(two, b"TWO")
packet.accept()
two.accept()
else:
packet.accept()
async with trio.open_nursery() as nursery:
async with harness.capture_packets_to(2) as p2:
nursery.start_soon(munge, p2)
await harness.send(2, b"one", b"two", b"three", b"four")
await harness.expect(2, b"numero uno", b"three", b"TWO", b"four")
async def test_mark_repeat(harness):
counter = 0
timestamps = []
def cb(chan, pkt):
nonlocal counter
with pytest.raises(RuntimeError, match="Packet has no payload"):
pkt.get_payload()
assert pkt.get_mark() == counter
timestamps.append(pkt.get_timestamp())
if counter < 5:
counter += 1
pkt.set_mark(counter)
pkt.repeat()
assert pkt.get_mark() == counter
else:
pkt.accept()
async with harness.capture_packets_to(2, cb, mode=COPY_META):
t0 = time.time()
await harness.send(2, b"testing")
await harness.expect(2, b"testing")
t1 = time.time()
assert counter == 5
# All iterations of the packet have the same timestamps
assert all(t == timestamps[0] for t in timestamps[1:])
assert t0 < timestamps[0] < t1
async def test_hwaddr_and_inoutdev(harness):
hwaddrs = []
inoutdevs = []
def cb(pkt):
hwaddrs.append((pkt.get_hw(), pkt.hook, pkt.get_payload()[28:]))
inoutdevs.append((pkt.indev, pkt.outdev))
pkt.accept()
queue_num, nfq = harness.bind_queue(cb)
try:
async with trio.open_nursery() as nursery:
@nursery.start_soon
async def listen_for_packets():
while True:
await trio.lowlevel.wait_readable(nfq.get_fd())
nfq.run(block=False)
async with harness.enqueue_packets_to(2, queue_num, forwarded=True):
await harness.send(2, b"one", b"two")
await harness.expect(2, b"one", b"two")
async with harness.enqueue_packets_to(2, queue_num, forwarded=False):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
for payload in (b"three", b"four"):
sock.sendto(payload, harness.dest_addr[2])
with trio.fail_after(1):
while len(hwaddrs) < 4:
await trio.sleep(0.1)
nursery.cancel_scope.cancel()
finally:
nfq.unbind()
# Forwarded packets capture a hwaddr, but OUTPUT don't
FORWARD = 2
OUTPUT = 3
mac1 = hwaddrs[0][0]
assert mac1 is not None
assert hwaddrs == [
(mac1, FORWARD, b"one"),
(mac1, FORWARD, b"two"),
(None, OUTPUT, b"three"),
(None, OUTPUT, b"four"),
]
if sys.implementation.name != "pypy":
# pypy doesn't appear to provide if_nametoindex()
iface1 = socket.if_nametoindex("veth1")
iface2 = socket.if_nametoindex("veth2")
else:
iface1, iface2 = inoutdevs[0]
assert 0 != iface1 != iface2 != 0
assert inoutdevs == [
(iface1, iface2),
(iface1, iface2),
(0, iface2),
(0, iface2),
]
async def test_errors(harness):
with pytest.warns(RuntimeWarning, match="rcvbuf limit is") as record:
async with harness.capture_packets_to(2, sock_len=2 ** 30):
pass
assert record[0].filename.endswith("conftest.py")
async with harness.capture_packets_to(2, queue_num=0):
with pytest.raises(OSError, match="Failed to create queue"):
async with harness.capture_packets_to(2, queue_num=0):
pass
_, nfq = harness.bind_queue(lambda: None, queue_num=1)
with pytest.raises(RuntimeError, match="A queue is already bound"):
nfq.bind(2, lambda p: None)
# Test unbinding via __del__
nfq = weakref.ref(nfq)
for _ in range(4):
gc.collect()
if nfq() is None:
break
else:
raise RuntimeError("Couldn't trigger garbage collection of NFQ")
async def test_unretained(harness):
def cb(chan, pkt):
# Can access payload within callback
assert "UDP packet" in str(pkt)
assert pkt.get_payload()[-3:] in (b"one", b"two")
chan.send_nowait(pkt)
# Capture packets without retaining -> can't access payload after cb returns
async with harness.capture_packets_to(2, cb) as chan:
await harness.send(2, b"one", b"two")
accept = True
async for p in chan:
with pytest.raises(
RuntimeError, match="Payload data is no longer available"
):
p.get_payload()
assert "contents unretained" in str(p)
# Can still issue verdicts though
if accept:
p.accept()
accept = False
else:
break
with pytest.raises(RuntimeError, match="Parent queue has already been unbound"):
p.drop()
await harness.expect(2, b"one")
async def test_cb_exception(harness):
pkt = None
def cb(channel, p):
nonlocal pkt
pkt = p
raise ValueError("test")
# Error raised within run():
with pytest.raises(ValueError, match="test"):
async with harness.capture_packets_to(2, cb):
await harness.send(2, b"boom")
with trio.fail_after(1):
try:
await trio.sleep_forever()
finally:
# At this point the error has been raised (since we were
# cancelled) but the queue is still open. We shouldn't
# be able to access the payload, since we didn't retain(),
# but verdicts should otherwise work.
with pytest.raises(RuntimeError, match="Payload data is no longer"):
pkt.get_payload()
pkt.accept()
await harness.expect(2, b"boom")
with pytest.raises(RuntimeError, match="Verdict already given for this packet"):
pkt.drop()
@pytest.mark.skipif(
sys.implementation.name == "pypy",
reason="pypy does not support PyErr_CheckSignals",
)
def test_signal():
nfq = NetfilterQueue()
nfq.bind(1, lambda p: None, sock_len=131072)
def raise_alarm(sig, frame):
raise KeyboardInterrupt("brrrrrring!")
old_handler = signal.signal(signal.SIGALRM, raise_alarm)
old_timer = signal.setitimer(signal.ITIMER_REAL, 0.5, 0)
try:
with pytest.raises(KeyboardInterrupt, match="brrrrrring!") as exc_info:
nfq.run()
assert any("NetfilterQueue.run" in line.name for line in exc_info.traceback)
finally:
nfq.unbind()
signal.setitimer(signal.ITIMER_REAL, *old_timer)
signal.signal(signal.SIGALRM, old_handler)
async def test_external_fd(harness):
child_prog = """
import os, sys, unshare
from netfilterqueue import NetfilterQueue
unshare.unshare(unshare.CLONE_NEWNET)
nfq = NetfilterQueue(sockfd=int(sys.argv[1]))
def cb(pkt):
pkt.accept()
sys.exit(pkt.get_payload()[28:].decode("ascii"))
nfq.bind(1, cb, sock_len=131072)
os.write(1, b"ok\\n")
try:
nfq.run()
finally:
nfq.unbind()
"""
async with trio.open_nursery() as nursery:
async def monitor_in_child(task_status):
with trio.fail_after(5):
r, w = os.pipe()
# 12 is NETLINK_NETFILTER family
nlsock = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, 12)
@nursery.start_soon
async def wait_started():
await trio.lowlevel.wait_readable(r)
assert b"ok\n" == os.read(r, 16)
nlsock.close()
os.close(w)
os.close(r)
task_status.started()
result = await trio.run_process(
[sys.executable, "-c", child_prog, str(nlsock.fileno())],
stdout=w,
capture_stderr=True,
check=False,
pass_fds=(nlsock.fileno(),),
)
assert result.stderr == b"this is a test\n"
await nursery.start(monitor_in_child)
async with harness.enqueue_packets_to(2, queue_num=1):
await harness.send(2, b"this is a test")
await harness.expect(2, b"this is a test")
with pytest.raises(OSError, match="dup2 failed"):
NetfilterQueue(sockfd=1000)
with pytest.raises(OSError, match="Failed to open NFQueue"):
with open("/dev/null") as fp:
NetfilterQueue(sockfd=fp.fileno())