Compare commits
26 Commits
release-v0
...
master
Author | SHA1 | Date |
---|---|---|
Joshua Oreman | 542159a631 | |
Joshua Oreman | e11aaf6b17 | |
Joshua Oreman | e38557217d | |
Joshua Oreman | 84676504de | |
Joshua Oreman | 2b6849b5d2 | |
Joshua Oreman | 49e16812f7 | |
Joshua Oreman | 0bb948d2c1 | |
Joshua Oreman | 9f460d220a | |
Joshua Oreman | a3dedaea57 | |
Joshua Oreman | 69436c1328 | |
Joshua Oreman | c03aec2e88 | |
Joshua Oreman | ebeb8a7337 | |
Joshua Oreman | e1e20d4aba | |
Joshua Oreman | db80c853ba | |
Joshua Oreman | 541c9e7648 | |
Joshua Oreman | 1cbea513e6 | |
Joshua Oreman | a935aadb5f | |
Joshua Oreman | 6faaa7fe35 | |
Joshua Oreman | 53e2db3cd2 | |
Joshua Oreman | 6fb345ee17 | |
Joshua Oreman | c7fd3e5485 | |
Joshua Oreman | 305b258a48 | |
Joshua Oreman | 62da18ee37 | |
Joshua Oreman | 2c782b9619 | |
Joshua Oreman | 0187c89611 | |
Joshua Oreman | ddbc12a6ab |
|
@ -15,18 +15,17 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
python:
|
||||
- '3.6'
|
||||
- '3.7'
|
||||
- '3.8'
|
||||
- '3.9'
|
||||
- '3.10'
|
||||
- '3.11'
|
||||
- 'pypy-3.7'
|
||||
- 'pypy-3.8'
|
||||
- 'pypy-3.9'
|
||||
check_lint: ['0']
|
||||
extra_name: ['']
|
||||
include:
|
||||
- python: '2.7'
|
||||
extra_name: ', build only'
|
||||
- python: '3.9'
|
||||
check_lint: '1'
|
||||
extra_name: ', check lint'
|
||||
|
|
16
CHANGES.txt
16
CHANGES.txt
|
@ -1,4 +1,18 @@
|
|||
v0.9.0, 12 Jan 2021
|
||||
v1.1.0, 1 Mar 2023
|
||||
Add Packet accessors for {indev, outdev, physindev, physoutdev} interface indices
|
||||
|
||||
v1.0.0, 14 Jan 2022
|
||||
Propagate exceptions raised by the user's packet callback
|
||||
Avoid calls to the packet callback during queue unbinding
|
||||
Raise an error if a packet verdict is set after its parent queue is closed
|
||||
set_payload() now affects the result of later get_payload()
|
||||
Handle signals received when run() is blocked in recv()
|
||||
Accept packets in COPY_META mode, only failing on an attempt to access the payload
|
||||
Add a parameter NetfilterQueue(sockfd=N) that uses an already-opened Netlink socket
|
||||
Add type hints
|
||||
Remove the Packet.payload attribute; it was never safe (treated as a char* but not NUL-terminated) nor documented, but was exposed in the API (perhaps inadvertently).
|
||||
|
||||
v0.9.0, 12 Jan 2022
|
||||
Improve usability when Packet objects are retained past the callback
|
||||
Add Packet.retain() to save the packet contents in such cases
|
||||
Eliminate warnings during build on py3
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
include *.txt
|
||||
include *.rst
|
||||
include *.c
|
||||
include *.pyx
|
||||
include *.pxd
|
||||
recursive-include tests/ *.py
|
||||
include LICENSE.txt README.rst CHANGES.txt
|
||||
recursive-include netfilterqueue *.py *.pyx *.pxd *.c *.pyi py.typed
|
||||
recursive-include tests *.py
|
||||
|
|
150
README.rst
150
README.rst
|
@ -1,3 +1,11 @@
|
|||
.. image:: https://img.shields.io/pypi/v/netfilterqueue.svg
|
||||
:target: https://pypi.org/project/netfilterqueue
|
||||
:alt: Latest PyPI version
|
||||
|
||||
.. image:: https://github.com/oremanj/python-netfilterqueue/actions/workflows/ci.yml/badge.svg?branch=master
|
||||
:target: https://github.com/oremanj/python-netfilterqueue/actions?query=branch%3Amaster
|
||||
:alt: Automated test status
|
||||
|
||||
==============
|
||||
NetfilterQueue
|
||||
==============
|
||||
|
@ -9,6 +17,9 @@ or given a mark.
|
|||
libnetfilter_queue (the netfilter library, not this module) is part of the
|
||||
`Netfilter project <http://netfilter.org/projects/libnetfilter_queue/>`_.
|
||||
|
||||
The current version of NetfilterQueue requires Python 3.6 or later.
|
||||
The last version with support for Python 2.7 was 0.9.0.
|
||||
|
||||
Example
|
||||
=======
|
||||
|
||||
|
@ -68,7 +79,7 @@ Before installing, ensure you have:
|
|||
|
||||
On Debian or Ubuntu, install these files with::
|
||||
|
||||
apt-get install build-essential python-dev libnetfilter-queue-dev
|
||||
apt-get install build-essential python3-dev libnetfilter-queue-dev
|
||||
|
||||
From PyPI
|
||||
---------
|
||||
|
@ -90,13 +101,9 @@ To install from source::
|
|||
API
|
||||
===
|
||||
|
||||
``NetfilterQueue.COPY_NONE``
|
||||
|
||||
``NetfilterQueue.COPY_META``
|
||||
|
||||
``NetfilterQueue.COPY_PACKET``
|
||||
``NetfilterQueue.COPY_NONE``, ``NetfilterQueue.COPY_META``, ``NetfilterQueue.COPY_PACKET``
|
||||
These constants specify how much of the packet should be given to the
|
||||
script- nothing, metadata, or the whole packet.
|
||||
script: nothing, metadata, or the whole packet.
|
||||
|
||||
NetfilterQueue objects
|
||||
----------------------
|
||||
|
@ -104,7 +111,7 @@ NetfilterQueue objects
|
|||
A NetfilterQueue object represents a single queue. Configure your queue with
|
||||
a call to ``bind``, then start receiving packets with a call to ``run``.
|
||||
|
||||
``QueueHandler.bind(queue_num, callback[, max_len[, mode[, range[, sock_len]]]])``
|
||||
``NetfilterQueue.bind(queue_num, callback, max_len=1024, mode=COPY_PACKET, range=65535, sock_len=...)``
|
||||
Create and bind to the queue. ``queue_num`` uniquely identifies this
|
||||
queue for the kernel. It must match the ``--queue-num`` in your iptables
|
||||
rule, but there is no ordering requirement: it's fine to either ``bind()``
|
||||
|
@ -118,22 +125,23 @@ a call to ``bind``, then start receiving packets with a call to ``run``.
|
|||
the source and destination IPs of a IPv4 packet, ``range`` could be 20.
|
||||
``sock_len`` sets the receive socket buffer size.
|
||||
|
||||
``QueueHandler.unbind()``
|
||||
``NetfilterQueue.unbind()``
|
||||
Remove the queue. Packets matched by your iptables rule will be dropped.
|
||||
|
||||
``QueueHandler.get_fd()``
|
||||
``NetfilterQueue.get_fd()``
|
||||
Get the file descriptor of the socket used to receive queued
|
||||
packets and send verdicts. If you're using an async event loop,
|
||||
you can poll this FD for readability and call ``run(False)`` every
|
||||
time data appears on it.
|
||||
|
||||
``QueueHandler.run([block])``
|
||||
``NetfilterQueue.run(block=True)``
|
||||
Send packets to your callback. By default, this method blocks, running
|
||||
until an exception is raised (such as by Ctrl+C). Set
|
||||
block=False to process the pending messages without waiting for more.
|
||||
You can get the file descriptor of the socket with the ``get_fd`` method.
|
||||
``block=False`` to process the pending messages without waiting for more;
|
||||
in conjunction with the ``get_fd`` method, you can use this to integrate
|
||||
with async event loops.
|
||||
|
||||
``QueueHandler.run_socket(socket)``
|
||||
``NetfilterQueue.run_socket(socket)``
|
||||
Send packets to your callback, but use the supplied socket instead of
|
||||
recv, so that, for example, gevent can monkeypatch it. You can make a
|
||||
socket with ``socket.fromfd(nfqueue.get_fd(), socket.AF_NETLINK, socket.SOCK_RAW)``
|
||||
|
@ -148,6 +156,8 @@ Objects of this type are passed to your callback.
|
|||
Return the packet's payload as a bytes object. The returned value
|
||||
starts with the IP header. You must call ``retain()`` if you want
|
||||
to be able to ``get_payload()`` after your callback has returned.
|
||||
If you have already called ``set_payload()``, then ``get_payload()``
|
||||
returns what you passed to ``set_payload()``.
|
||||
|
||||
``Packet.set_payload(payload)``
|
||||
Set the packet payload. Call this before ``accept()`` if you want to
|
||||
|
@ -166,12 +176,58 @@ Objects of this type are passed to your callback.
|
|||
rules. ``mark`` is a 32-bit number.
|
||||
|
||||
``Packet.get_mark()``
|
||||
Get the mark already on the packet (either the one you set using
|
||||
Get the mark on the packet (either the one you set using
|
||||
``set_mark()``, or the one it arrived with if you haven't called
|
||||
``set_mark()``).
|
||||
|
||||
``Packet.get_hw()``
|
||||
Return the hardware address as a Python string.
|
||||
Return the source hardware address of the packet as a Python
|
||||
bytestring, or ``None`` if the source hardware address was not
|
||||
captured (packets captured by the ``OUTPUT`` or ``PREROUTING``
|
||||
hooks). For example, on Ethernet the result will be a six-byte
|
||||
MAC address. The destination hardware address is not available
|
||||
because it is determined in the kernel only after packet filtering
|
||||
is complete.
|
||||
|
||||
``Packet.get_timestamp()``
|
||||
Return the time at which this packet was received by the kernel,
|
||||
as a floating-point Unix timestamp with microsecond precision
|
||||
(comparable to the result of ``time.time()``, for example).
|
||||
Packets captured by the ``OUTPUT`` or ``POSTROUTING`` hooks
|
||||
do not have a timestamp, and ``get_timestamp()`` will return 0.0
|
||||
for them.
|
||||
|
||||
``Packet.id``
|
||||
The identifier assigned to this packet by the kernel. Typically
|
||||
the first packet received by your queue starts at 1 and later ones
|
||||
count up from there.
|
||||
|
||||
``Packet.hw_protocol``
|
||||
The link-layer protocol for this packet. For example, IPv4 packets
|
||||
on Ethernet would have this set to the EtherType for IPv4, which is
|
||||
``0x0800``.
|
||||
|
||||
``Packet.mark``
|
||||
The mark that had been assigned to this packet when it was enqueued.
|
||||
Unlike the result of ``get_mark()``, this does not change if you call
|
||||
``set_mark()``.
|
||||
|
||||
``Packet.hook``
|
||||
The netfilter hook (iptables chain, roughly) that diverted this packet
|
||||
into our queue. Values 0 through 4 correspond to PREROUTING, INPUT,
|
||||
FORWARD, OUTPUT, and POSTROUTING respectively.
|
||||
|
||||
``Packet.indev``, ``Packet.outdev``, ``Packet.physindev``, ``Packet.physoutdev``
|
||||
The interface indices on which the packet arrived (``indev``) or is slated
|
||||
to depart (``outdev``). These are integers, which can be converted to
|
||||
names like "eth0" by using ``socket.if_indextoname()``. Zero means
|
||||
no interface is applicable, either because the packet was locally generated
|
||||
or locally received, or because the interface information wasn't available
|
||||
when the packet was queued (for example, ``PREROUTING`` rules don't yet
|
||||
know the ``outdev``). If the ``indev`` or ``outdev`` refers to a bridge
|
||||
device, then the corresponding ``physindev`` or ``physoutdev`` will name
|
||||
the bridge member on which the actual traffic occurred; otherwise
|
||||
``physindev`` and ``physoutdev`` will be zero.
|
||||
|
||||
``Packet.retain()``
|
||||
Allocate a copy of the packet payload for use after the callback
|
||||
|
@ -205,6 +261,31 @@ until they've been given a verdict (accept, drop, or repeat). Also, the
|
|||
kernel stores the enqueued packets in a linked list, so keeping lots of packets
|
||||
outstanding is likely to adversely impact performance.
|
||||
|
||||
Monitoring a different network namespace
|
||||
----------------------------------------
|
||||
|
||||
If you are using Linux network namespaces (``man 7
|
||||
network_namespaces``) in some kind of containerization system, all of
|
||||
the Netfilter queue state is kept per-namespace; queue 1 in namespace
|
||||
X is not the same as queue 1 in namespace Y. NetfilterQueue will
|
||||
ordinarily pass you the traffic for the network namespace you're a
|
||||
part of. If you want to monitor a different one, you can do so with a
|
||||
bit of trickery and cooperation from a process in that
|
||||
namespace; this section describes how.
|
||||
|
||||
You'll need to arrange for a process in the network namespace you want
|
||||
to monitor to call ``socket(AF_NETLINK, SOCK_RAW, 12)`` and pass you
|
||||
the resulting file descriptor using something like
|
||||
``socket.send_fds()`` over a Unix domain socket. (12 is
|
||||
``NETLINK_NETFILTER``, a constant which is not exposed by the Python
|
||||
``socket`` module.) Once you've received that file descriptor in your
|
||||
process, you can create a NetfilterQueue object using the special
|
||||
constructor ``NetfilterQueue(sockfd=N)`` where N is the file
|
||||
descriptor you received. Because the socket was originally created
|
||||
in the other network namespace, the kernel treats it as part of that
|
||||
namespace, and you can use it to access that namespace even though it's
|
||||
not the namespace you're in yourself.
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
||||
|
@ -249,20 +330,39 @@ The fields are:
|
|||
Limitations
|
||||
===========
|
||||
|
||||
* Compiled with a 4096-byte buffer for packets, so it probably won't work on
|
||||
loopback or Ethernet with jumbo packets. If this is a problem, either lower
|
||||
MTU on your loopback, disable jumbo packets, or get Cython,
|
||||
change ``DEF BufferSize = 4096`` in ``netfilterqueue.pyx``, and rebuild.
|
||||
* Full libnetfilter_queue API is not yet implemented:
|
||||
* We use a fixed-size 4096-byte buffer for packets, so you are likely
|
||||
to see truncation on loopback and on Ethernet with jumbo packets.
|
||||
If this is a problem, either lower the MTU on your loopback, disable
|
||||
jumbo packets, or get Cython, change ``DEF BufferSize = 4096`` in
|
||||
``netfilterqueue.pyx``, and rebuild.
|
||||
|
||||
* Omits methods for getting information about the interface a packet has
|
||||
arrived on or is leaving on
|
||||
* Probably other stuff is omitted too
|
||||
* Not all information available from libnetfilter_queue is exposed:
|
||||
missing pieces include packet input/output network interface names,
|
||||
checksum offload flags, UID/GID and security context data
|
||||
associated with the packet (if any).
|
||||
|
||||
* Not all information available from the kernel is even processed by
|
||||
libnetfilter_queue: missing pieces include additional link-layer
|
||||
header data for some packets (including VLAN tags), connection-tracking
|
||||
state, and incoming packet length (if truncated for queueing).
|
||||
|
||||
* We do not expose the libnetfilter_queue interface for changing queue flags.
|
||||
Most of these pertain to other features we don't support (listed above),
|
||||
but there's one that could set the queue to accept (rather than dropping)
|
||||
packets received when it's full.
|
||||
|
||||
Source
|
||||
======
|
||||
|
||||
https://github.com/kti/python-netfilterqueue
|
||||
https://github.com/oremanj/python-netfilterqueue
|
||||
|
||||
Authorship
|
||||
==========
|
||||
|
||||
python-netfilterqueue was originally written by Matthew Fox of
|
||||
Kerkhoff Technologies, Inc. Since 2022 it has been maintained by
|
||||
Joshua Oreman of Hudson River Trading LLC. Both authors wish to
|
||||
thank their employers for their support of open source.
|
||||
|
||||
License
|
||||
=======
|
||||
|
|
32
ci.sh
32
ci.sh
|
@ -11,40 +11,42 @@ python setup.py sdist --formats=zip
|
|||
|
||||
# ... but not to install it
|
||||
pip uninstall -y cython
|
||||
python setup.py build_ext
|
||||
pip install dist/*.zip
|
||||
|
||||
if python --version 2>&1 | fgrep -q "Python 2.7"; then
|
||||
# The testsuite doesn't run on 2.7, so do just a basic smoke test.
|
||||
unshare -Urn python -c "from netfilterqueue import NetfilterQueue as NFQ; NFQ()"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
pip install -Ur test-requirements.txt
|
||||
|
||||
if [ "$CHECK_LINT" = "1" ]; then
|
||||
error=0
|
||||
if ! black --check setup.py tests; then
|
||||
black_files="setup.py tests netfilterqueue"
|
||||
if ! black --check $black_files; then
|
||||
error=$?
|
||||
black --diff $black_files
|
||||
fi
|
||||
mypy --strict -p netfilterqueue || error=$?
|
||||
( mkdir empty; cd empty; python -m mypy.stubtest netfilterqueue ) || error=$?
|
||||
|
||||
if [ $error -ne 0 ]; then
|
||||
cat <<EOF
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Formatting problems were found (listed above). To fix them, run
|
||||
Problems were found by static analysis (listed above).
|
||||
To fix formatting and see remaining errors, run:
|
||||
|
||||
pip install -r test-requirements.txt
|
||||
black setup.py tests
|
||||
black $black_files
|
||||
mypy --strict -p netfilterqueue
|
||||
( mkdir empty; cd empty; python -m mypy.stubtest netfilterqueue )
|
||||
|
||||
in your local checkout.
|
||||
|
||||
EOF
|
||||
error=1
|
||||
fi
|
||||
if [ "$error" = "1" ]; then
|
||||
cat <<EOF
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
EOF
|
||||
exit 1
|
||||
fi
|
||||
exit $error
|
||||
exit 0
|
||||
fi
|
||||
|
||||
cd tests
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
from ._impl import (
|
||||
COPY_NONE as COPY_NONE,
|
||||
COPY_META as COPY_META,
|
||||
COPY_PACKET as COPY_PACKET,
|
||||
Packet as Packet,
|
||||
NetfilterQueue as NetfilterQueue,
|
||||
PROTOCOLS as PROTOCOLS,
|
||||
)
|
||||
from ._version import (
|
||||
VERSION as VERSION,
|
||||
__version__ as __version__,
|
||||
)
|
|
@ -1,18 +1,22 @@
|
|||
cdef extern from "sys/types.h":
|
||||
cdef extern from "<sys/types.h>":
|
||||
ctypedef unsigned char u_int8_t
|
||||
ctypedef unsigned short int u_int16_t
|
||||
ctypedef unsigned int u_int32_t
|
||||
|
||||
cdef extern from "<unistd.h>":
|
||||
int dup2(int oldfd, int newfd)
|
||||
|
||||
cdef extern from "<errno.h>":
|
||||
int errno
|
||||
|
||||
# dummy defines from asm-generic/errno.h:
|
||||
cdef enum:
|
||||
EINTR = 4
|
||||
EAGAIN = 11 # Try again
|
||||
EWOULDBLOCK = EAGAIN
|
||||
ENOBUFS = 105 # No buffer space available
|
||||
|
||||
cdef extern from "netinet/ip.h":
|
||||
cdef extern from "<netinet/ip.h>":
|
||||
struct iphdr:
|
||||
u_int8_t tos
|
||||
u_int16_t tot_len
|
||||
|
@ -59,7 +63,7 @@ cdef extern from "Python.h":
|
|||
object PyBytes_FromStringAndSize(char *s, Py_ssize_t len)
|
||||
object PyString_FromStringAndSize(char *s, Py_ssize_t len)
|
||||
|
||||
cdef extern from "sys/time.h":
|
||||
cdef extern from "<sys/time.h>":
|
||||
ctypedef long time_t
|
||||
struct timeval:
|
||||
time_t tv_sec
|
||||
|
@ -67,7 +71,7 @@ cdef extern from "sys/time.h":
|
|||
struct timezone:
|
||||
pass
|
||||
|
||||
cdef extern from "netinet/in.h":
|
||||
cdef extern from "<netinet/in.h>":
|
||||
u_int32_t ntohl (u_int32_t __netlong) nogil
|
||||
u_int16_t ntohs (u_int16_t __netshort) nogil
|
||||
u_int32_t htonl (u_int32_t __hostlong) nogil
|
||||
|
@ -82,6 +86,9 @@ cdef extern from "libnfnetlink/linux_nfnetlink.h":
|
|||
cdef extern from "libnfnetlink/libnfnetlink.h":
|
||||
struct nfnl_handle:
|
||||
pass
|
||||
nfnl_handle *nfnl_open()
|
||||
void nfnl_close(nfnl_handle *h)
|
||||
int nfnl_fd(nfnl_handle *h)
|
||||
unsigned int nfnl_rcvbufsiz(nfnl_handle *h, unsigned int size)
|
||||
|
||||
cdef extern from "libnetfilter_queue/linux_nfnetlink_queue.h":
|
||||
|
@ -105,6 +112,7 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
|
|||
u_int8_t hw_addr[8]
|
||||
|
||||
nfq_handle *nfq_open()
|
||||
nfq_handle *nfq_open_nfnl(nfnl_handle *h)
|
||||
int nfq_close(nfq_handle *h)
|
||||
|
||||
int nfq_bind_pf(nfq_handle *h, u_int16_t pf)
|
||||
|
@ -115,15 +123,17 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
|
|||
u_int16_t num,
|
||||
nfq_callback *cb,
|
||||
void *data)
|
||||
int nfq_destroy_queue(nfq_q_handle *qh)
|
||||
|
||||
int nfq_handle_packet(nfq_handle *h, char *buf, int len)
|
||||
|
||||
int nfq_set_mode(nfq_q_handle *qh,
|
||||
u_int8_t mode, unsigned int len)
|
||||
|
||||
q_set_queue_maxlen(nfq_q_handle *qh,
|
||||
u_int32_t queuelen)
|
||||
# Any function that parses Netlink replies might invoke the user
|
||||
# callback and thus might need to propagate a Python exception.
|
||||
# This includes nfq_handle_packet but is not limited to that --
|
||||
# other functions might send a query, read until they get the reply,
|
||||
# and find a packet notification before the reply which they then
|
||||
# must deal with.
|
||||
int nfq_destroy_queue(nfq_q_handle *qh) except? -1
|
||||
int nfq_handle_packet(nfq_handle *h, char *buf, int len) except? -1
|
||||
int nfq_set_mode(nfq_q_handle *qh, u_int8_t mode, unsigned int len) except? -1
|
||||
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen) except? -1
|
||||
|
||||
int nfq_set_verdict(nfq_q_handle *qh,
|
||||
u_int32_t id,
|
||||
|
@ -137,22 +147,26 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
|
|||
u_int32_t mark,
|
||||
u_int32_t datalen,
|
||||
unsigned char *buf) nogil
|
||||
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen)
|
||||
|
||||
int nfq_fd(nfq_handle *h)
|
||||
nfqnl_msg_packet_hdr *nfq_get_msg_packet_hdr(nfq_data *nfad)
|
||||
int nfq_get_payload(nfq_data *nfad, unsigned char **data)
|
||||
int nfq_get_timestamp(nfq_data *nfad, timeval *tv)
|
||||
nfqnl_msg_packet_hw *nfq_get_packet_hw(nfq_data *nfad)
|
||||
int nfq_get_nfmark (nfq_data *nfad)
|
||||
int nfq_get_nfmark(nfq_data *nfad)
|
||||
u_int32_t nfq_get_indev(nfq_data *nfad)
|
||||
u_int32_t nfq_get_outdev(nfq_data *nfad)
|
||||
u_int32_t nfq_get_physindev(nfq_data *nfad)
|
||||
u_int32_t nfq_get_physoutdev(nfq_data *nfad)
|
||||
nfnl_handle *nfq_nfnlh(nfq_handle *h)
|
||||
|
||||
|
||||
# Dummy defines from linux/socket.h:
|
||||
cdef enum: # Protocol families, same as address families.
|
||||
PF_INET = 2
|
||||
PF_INET6 = 10
|
||||
PF_NETLINK = 16
|
||||
|
||||
cdef extern from "sys/socket.h":
|
||||
cdef extern from "<sys/socket.h>":
|
||||
ssize_t recv(int __fd, void *__buf, size_t __n, int __flags) nogil
|
||||
int MSG_DONTWAIT
|
||||
|
||||
|
@ -166,12 +180,18 @@ cdef enum:
|
|||
NF_STOP
|
||||
NF_MAX_VERDICT = NF_STOP
|
||||
|
||||
cdef class NetfilterQueue:
|
||||
cdef object __weakref__
|
||||
cdef object user_callback # User callback
|
||||
cdef nfq_handle *h # Handle to NFQueue library
|
||||
cdef nfq_q_handle *qh # A handle to the queue
|
||||
|
||||
cdef class Packet:
|
||||
cdef nfq_q_handle *_qh
|
||||
cdef bint _verdict_is_set # True if verdict has been issued,
|
||||
# false otherwise
|
||||
cdef NetfilterQueue _queue
|
||||
cdef bint _verdict_is_set # True if verdict has been issued, false otherwise
|
||||
cdef bint _mark_is_set # True if a mark has been given, false otherwise
|
||||
cdef bint _hwaddr_is_set
|
||||
cdef bint _timestamp_is_set
|
||||
cdef u_int32_t _given_mark # Mark given to packet
|
||||
cdef bytes _given_payload # New payload of packet, or null
|
||||
cdef bytes _owned_payload
|
||||
|
@ -184,21 +204,17 @@ cdef class Packet:
|
|||
|
||||
# Packet details:
|
||||
cdef Py_ssize_t payload_len
|
||||
cdef readonly unsigned char *payload
|
||||
cdef unsigned char *payload
|
||||
cdef timeval timestamp
|
||||
cdef u_int8_t hw_addr[8]
|
||||
cdef readonly u_int32_t indev
|
||||
cdef readonly u_int32_t physindev
|
||||
cdef readonly u_int32_t outdev
|
||||
cdef readonly u_int32_t physoutdev
|
||||
|
||||
# TODO: implement these
|
||||
#cdef u_int8_t hw_addr[8] # A eui64-formatted address?
|
||||
#cdef readonly u_int32_t nfmark
|
||||
#cdef readonly u_int32_t indev
|
||||
#cdef readonly u_int32_t physindev
|
||||
#cdef readonly u_int32_t outdev
|
||||
#cdef readonly u_int32_t physoutdev
|
||||
|
||||
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa)
|
||||
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa)
|
||||
cdef drop_refs(self)
|
||||
cdef void verdict(self, u_int8_t verdict)
|
||||
cdef int verdict(self, u_int8_t verdict) except -1
|
||||
cpdef Py_ssize_t get_payload_len(self)
|
||||
cpdef double get_timestamp(self)
|
||||
cpdef bytes get_payload(self)
|
||||
|
@ -209,11 +225,3 @@ cdef class Packet:
|
|||
cpdef accept(self)
|
||||
cpdef drop(self)
|
||||
cpdef repeat(self)
|
||||
|
||||
cdef class NetfilterQueue:
|
||||
cdef object user_callback # User callback
|
||||
cdef nfq_handle *h # Handle to NFQueue library
|
||||
cdef nfq_q_handle *qh # A handle to the queue
|
||||
cdef u_int16_t af # Address family
|
||||
cdef packet_copy_size # Amount of packet metadata + data copied to buffer
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
import socket
|
||||
from enum import IntEnum
|
||||
from typing import Callable, Dict, Optional, Tuple
|
||||
|
||||
COPY_NONE: int
|
||||
COPY_META: int
|
||||
COPY_PACKET: int
|
||||
|
||||
class Packet:
|
||||
hook: int
|
||||
hw_protocol: int
|
||||
id: int
|
||||
mark: int
|
||||
# These are ifindexes, pass to socket.if_indextoname() to get names:
|
||||
indev: int
|
||||
outdev: int
|
||||
physindev: int
|
||||
physoutdev: int
|
||||
def get_hw(self) -> Optional[bytes]: ...
|
||||
def get_payload(self) -> bytes: ...
|
||||
def get_payload_len(self) -> int: ...
|
||||
def get_timestamp(self) -> float: ...
|
||||
def get_mark(self) -> int: ...
|
||||
def set_payload(self, payload: bytes) -> None: ...
|
||||
def set_mark(self, mark: int) -> None: ...
|
||||
def retain(self) -> None: ...
|
||||
def accept(self) -> None: ...
|
||||
def drop(self) -> None: ...
|
||||
def repeat(self) -> None: ...
|
||||
|
||||
class NetfilterQueue:
|
||||
def __new__(self, *, af: int = ..., sockfd: int = ...) -> NetfilterQueue: ...
|
||||
def bind(
|
||||
self,
|
||||
queue_num: int,
|
||||
user_callback: Callable[[Packet], None],
|
||||
max_len: int = ...,
|
||||
mode: int = COPY_PACKET,
|
||||
range: int = ...,
|
||||
sock_len: int = ...,
|
||||
) -> None: ...
|
||||
def unbind(self) -> None: ...
|
||||
def get_fd(self) -> int: ...
|
||||
def run(self, block: bool = ...) -> None: ...
|
||||
def run_socket(self, s: socket.socket) -> None: ...
|
||||
|
||||
PROTOCOLS: Dict[int, str]
|
|
@ -5,7 +5,6 @@ function.
|
|||
Copyright: (c) 2011, Kerkhoff Technologies Inc.
|
||||
License: MIT; see LICENSE.txt
|
||||
"""
|
||||
VERSION = (0, 9, 0)
|
||||
|
||||
# Constants for module users
|
||||
COPY_NONE = 0
|
||||
|
@ -24,22 +23,30 @@ DEF SockCopySize = MaxCopySize + SockOverhead
|
|||
# Socket queue should hold max number of packets of copysize bytes
|
||||
DEF SockRcvSize = DEFAULT_MAX_QUEUELEN * SockCopySize // 2
|
||||
|
||||
cdef extern from *:
|
||||
"""
|
||||
#if PY_MAJOR_VERSION < 3
|
||||
#define PyBytes_FromStringAndSize PyString_FromStringAndSize
|
||||
#endif
|
||||
"""
|
||||
from cpython.exc cimport PyErr_CheckSignals
|
||||
|
||||
cdef extern from "Python.h":
|
||||
ctypedef struct PyTypeObject:
|
||||
const char* tp_name
|
||||
|
||||
# A negative return value from this callback will stop processing and
|
||||
# make nfq_handle_packet return -1, so we use that as the error flag.
|
||||
cdef int global_callback(nfq_q_handle *qh, nfgenmsg *nfmsg,
|
||||
nfq_data *nfa, void *data) with gil:
|
||||
nfq_data *nfa, void *data) except -1 with gil:
|
||||
"""Create a Packet and pass it to appropriate callback."""
|
||||
cdef NetfilterQueue nfqueue = <NetfilterQueue>data
|
||||
cdef object user_callback = <object>nfqueue.user_callback
|
||||
if user_callback is None:
|
||||
# Queue is being unbound; we can't send a verdict at this point
|
||||
# so just ignore the packet. The kernel will drop it once we
|
||||
# unbind.
|
||||
return 1
|
||||
packet = Packet()
|
||||
packet.set_nfq_data(qh, nfa)
|
||||
user_callback(packet)
|
||||
packet.drop_refs()
|
||||
packet.set_nfq_data(nfqueue, nfa)
|
||||
try:
|
||||
user_callback(packet)
|
||||
finally:
|
||||
packet.drop_refs()
|
||||
return 1
|
||||
|
||||
cdef class Packet:
|
||||
|
@ -50,11 +57,19 @@ cdef class Packet:
|
|||
self._given_payload = None
|
||||
|
||||
def __str__(self):
|
||||
cdef iphdr *hdr = <iphdr*>self.payload
|
||||
cdef unsigned char *payload = NULL
|
||||
if self._owned_payload:
|
||||
payload = self._owned_payload
|
||||
elif self.payload != NULL:
|
||||
payload = self.payload
|
||||
else:
|
||||
return "%d byte packet, contents unretained" % (self.payload_len,)
|
||||
|
||||
cdef iphdr *hdr = <iphdr*>payload
|
||||
protocol = PROTOCOLS.get(hdr.protocol, "Unknown protocol")
|
||||
return "%s packet, %s bytes" % (protocol, self.payload_len)
|
||||
|
||||
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa):
|
||||
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa):
|
||||
"""
|
||||
Assign a packet from NFQ to this object. Parse the header and load
|
||||
local values.
|
||||
|
@ -63,7 +78,7 @@ cdef class Packet:
|
|||
cdef nfqnl_msg_packet_hdr *hdr
|
||||
|
||||
hdr = nfq_get_msg_packet_hdr(nfa)
|
||||
self._qh = qh
|
||||
self._queue = queue
|
||||
self.id = ntohl(hdr.packet_id)
|
||||
self.hw_protocol = ntohs(hdr.hw_protocol)
|
||||
self.hook = hdr.hook
|
||||
|
@ -78,22 +93,30 @@ cdef class Packet:
|
|||
|
||||
self.payload_len = nfq_get_payload(nfa, &self.payload)
|
||||
if self.payload_len < 0:
|
||||
raise OSError("Failed to get payload of packet.")
|
||||
# Probably using a mode that doesn't provide the payload
|
||||
self.payload = NULL
|
||||
self.payload_len = 0
|
||||
|
||||
nfq_get_timestamp(nfa, &self.timestamp)
|
||||
self.mark = nfq_get_nfmark(nfa)
|
||||
self.indev = nfq_get_indev(nfa)
|
||||
self.outdev = nfq_get_outdev(nfa)
|
||||
self.physindev = nfq_get_physindev(nfa)
|
||||
self.physoutdev = nfq_get_physoutdev(nfa)
|
||||
|
||||
cdef drop_refs(self):
|
||||
"""
|
||||
Called at the end of the user_callback, when the storage passed to
|
||||
set_nfq_data() is about to be deallocated.
|
||||
set_nfq_data() is about to be reused.
|
||||
"""
|
||||
self.payload = NULL
|
||||
|
||||
cdef void verdict(self, u_int8_t verdict):
|
||||
cdef int verdict(self, u_int8_t verdict) except -1:
|
||||
"""Call appropriate set_verdict... function on packet."""
|
||||
if self._verdict_is_set:
|
||||
raise RuntimeWarning("Verdict already given for this packet.")
|
||||
raise RuntimeError("Verdict already given for this packet")
|
||||
if self._queue.qh == NULL:
|
||||
raise RuntimeError("Parent queue has already been unbound")
|
||||
|
||||
cdef u_int32_t modified_payload_len = 0
|
||||
cdef unsigned char *modified_payload = NULL
|
||||
|
@ -102,7 +125,7 @@ cdef class Packet:
|
|||
modified_payload = self._given_payload
|
||||
if self._mark_is_set:
|
||||
nfq_set_verdict2(
|
||||
self._qh,
|
||||
self._queue.qh,
|
||||
self.id,
|
||||
verdict,
|
||||
self._given_mark,
|
||||
|
@ -110,7 +133,7 @@ cdef class Packet:
|
|||
modified_payload)
|
||||
else:
|
||||
nfq_set_verdict(
|
||||
self._qh,
|
||||
self._queue.qh,
|
||||
self.id,
|
||||
verdict,
|
||||
modified_payload_len,
|
||||
|
@ -119,17 +142,27 @@ cdef class Packet:
|
|||
self._verdict_is_set = True
|
||||
|
||||
def get_hw(self):
|
||||
"""Return the hardware address as Python string."""
|
||||
"""Return the packet's source MAC address as a Python bytestring, or
|
||||
None if it's not available.
|
||||
"""
|
||||
if not self._hwaddr_is_set:
|
||||
return None
|
||||
cdef object py_string
|
||||
py_string = PyBytes_FromStringAndSize(<char*>self.hw_addr, 8)
|
||||
return py_string
|
||||
|
||||
cpdef bytes get_payload(self):
|
||||
"""Return payload as Python string."""
|
||||
if self._owned_payload:
|
||||
if self._given_payload:
|
||||
return self._given_payload
|
||||
elif self._owned_payload:
|
||||
return self._owned_payload
|
||||
elif self.payload != NULL:
|
||||
return self.payload[:self.payload_len]
|
||||
elif self.payload_len == 0:
|
||||
raise RuntimeError(
|
||||
"Packet has no payload -- perhaps you're using COPY_META mode?"
|
||||
)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"Payload data is no longer available. You must call "
|
||||
|
@ -172,25 +205,65 @@ cdef class Packet:
|
|||
"""Repeat the packet."""
|
||||
self.verdict(NF_REPEAT)
|
||||
|
||||
|
||||
cdef class NetfilterQueue:
|
||||
"""Handle a single numbered queue."""
|
||||
def __cinit__(self, *args, **kwargs):
|
||||
self.af = kwargs.get("af", PF_INET)
|
||||
def __cinit__(self, *, u_int16_t af = PF_INET, int sockfd = -1):
|
||||
cdef nfnl_handle *nlh = NULL
|
||||
try:
|
||||
if sockfd >= 0:
|
||||
# This is a hack to use the given Netlink socket instead
|
||||
# of the one allocated by nfq_open(). Intended use case:
|
||||
# the given socket was opened in a different network
|
||||
# namespace, and you want to monitor traffic in that
|
||||
# namespace from this process running outside of it.
|
||||
# Call socket(AF_NETLINK, SOCK_RAW, /*NETLINK_NETFILTER*/ 12)
|
||||
# in the other namespace and pass that fd here (via Unix
|
||||
# domain socket or similar).
|
||||
nlh = nfnl_open()
|
||||
if nlh == NULL:
|
||||
raise OSError(errno, "Failed to open nfnetlink handle")
|
||||
|
||||
self.h = nfq_open()
|
||||
if self.h == NULL:
|
||||
raise OSError("Failed to open NFQueue.")
|
||||
nfq_unbind_pf(self.h, self.af) # This does NOT kick out previous
|
||||
# running queues
|
||||
if nfq_bind_pf(self.h, self.af) < 0:
|
||||
raise OSError("Failed to bind family %s. Are you root?" % self.af)
|
||||
# At this point nfnl_get_fd(nlh) is a new netlink socket
|
||||
# and has been bound to an automatically chosen port id.
|
||||
# This dup2 will close it, freeing up that address.
|
||||
if dup2(sockfd, nfnl_fd(nlh)) < 0:
|
||||
raise OSError(errno, "dup2 failed")
|
||||
|
||||
# Opening the netfilterqueue subsystem will rebind
|
||||
# the socket, using the same portid from the old socket,
|
||||
# which is hopefully now free. An alternative approach,
|
||||
# theoretically more robust against concurrent binds,
|
||||
# would be to autobind the new socket and write the chosen
|
||||
# address to nlh->local. nlh is an opaque type so this
|
||||
# would need to be done using memcpy (local starts
|
||||
# 4 bytes into the structure); let's avoid that unless
|
||||
# we really need it.
|
||||
self.h = nfq_open_nfnl(nlh)
|
||||
else:
|
||||
self.h = nfq_open()
|
||||
if self.h == NULL:
|
||||
raise OSError(errno, "Failed to open NFQueue.")
|
||||
except:
|
||||
if nlh != NULL:
|
||||
nfnl_close(nlh)
|
||||
raise
|
||||
|
||||
nfq_unbind_pf(self.h, af) # This does NOT kick out previous queues
|
||||
if nfq_bind_pf(self.h, af) < 0:
|
||||
raise OSError("Failed to bind family %s. Are you root?" % af)
|
||||
|
||||
def __del__(self):
|
||||
# unbind() can result in invocations of global_callback, so we
|
||||
# must do it in __del__ (when this is still a valid
|
||||
# NetfilterQueue object) rather than __dealloc__
|
||||
self.unbind()
|
||||
|
||||
def __dealloc__(self):
|
||||
if self.qh != NULL:
|
||||
nfq_destroy_queue(self.qh)
|
||||
# Don't call nfq_unbind_pf unless you want to disconnect any other
|
||||
# processes using this libnetfilter_queue on this protocol family!
|
||||
nfq_close(self.h)
|
||||
if self.h != NULL:
|
||||
nfq_close(self.h)
|
||||
|
||||
def bind(self, int queue_num, object user_callback,
|
||||
u_int32_t max_len=DEFAULT_MAX_QUEUELEN,
|
||||
|
@ -231,6 +304,7 @@ cdef class NetfilterQueue:
|
|||
|
||||
def unbind(self):
|
||||
"""Destroy the queue."""
|
||||
self.user_callback = None
|
||||
if self.qh != NULL:
|
||||
nfq_destroy_queue(self.qh)
|
||||
self.qh = NULL
|
||||
|
@ -251,11 +325,17 @@ cdef class NetfilterQueue:
|
|||
while True:
|
||||
with nogil:
|
||||
rv = recv(fd, buf, sizeof(buf), recv_flags)
|
||||
if (rv >= 0):
|
||||
nfq_handle_packet(self.h, buf, rv)
|
||||
else:
|
||||
if errno != ENOBUFS:
|
||||
if rv < 0:
|
||||
if errno == EAGAIN:
|
||||
break
|
||||
if errno == ENOBUFS:
|
||||
# Kernel is letting us know we dropped a packet
|
||||
continue
|
||||
if errno == EINTR:
|
||||
PyErr_CheckSignals()
|
||||
continue
|
||||
raise OSError(errno, "recv failed")
|
||||
nfq_handle_packet(self.h, buf, rv)
|
||||
|
||||
def run_socket(self, s):
|
||||
"""Accept packets using socket.recv so that, for example, gevent can monkeypatch it."""
|
||||
|
@ -264,11 +344,6 @@ cdef class NetfilterQueue:
|
|||
while True:
|
||||
try:
|
||||
buf = s.recv(BufferSize)
|
||||
rv = len(buf)
|
||||
if rv >= 0:
|
||||
nfq_handle_packet(self.h, buf, rv)
|
||||
else:
|
||||
break
|
||||
except socket.error as e:
|
||||
err = e.args[0]
|
||||
if err == ENOBUFS:
|
||||
|
@ -280,6 +355,19 @@ cdef class NetfilterQueue:
|
|||
else:
|
||||
# This is bad. Let the caller handle it.
|
||||
raise e
|
||||
else:
|
||||
nfq_handle_packet(self.h, buf, len(buf))
|
||||
|
||||
cdef void _fix_names():
|
||||
# Avoid ._impl showing up in reprs. This doesn't work on PyPy; there we would
|
||||
# need to modify the name before PyType_Ready(), but I can't find any way to
|
||||
# write Cython code that would execute at that time.
|
||||
cdef PyTypeObject* tp = <PyTypeObject*>Packet
|
||||
tp.tp_name = "netfilterqueue.Packet"
|
||||
tp = <PyTypeObject*>NetfilterQueue
|
||||
tp.tp_name = "netfilterqueue.NetfilterQueue"
|
||||
|
||||
_fix_names()
|
||||
|
||||
PROTOCOLS = {
|
||||
0: "HOPOPT",
|
|
@ -0,0 +1,4 @@
|
|||
# This file is imported from __init__.py and exec'd from setup.py
|
||||
|
||||
__version__ = "1.1.0+dev"
|
||||
VERSION = (1, 1, 0)
|
36
setup.py
36
setup.py
|
@ -1,21 +1,30 @@
|
|||
import os, sys
|
||||
from setuptools import setup, Extension
|
||||
|
||||
VERSION = "0.9.0" # Remember to change CHANGES.txt and netfilterqueue.pyx when version changes.
|
||||
exec(open("netfilterqueue/_version.py", encoding="utf-8").read())
|
||||
|
||||
setup_requires = ["wheel"]
|
||||
try:
|
||||
# Use Cython
|
||||
from Cython.Build import cythonize
|
||||
|
||||
ext_modules = cythonize(
|
||||
Extension(
|
||||
"netfilterqueue", ["netfilterqueue.pyx"], libraries=["netfilter_queue"]
|
||||
"netfilterqueue._impl",
|
||||
["netfilterqueue/_impl.pyx"],
|
||||
libraries=["netfilter_queue"],
|
||||
),
|
||||
compiler_directives={"language_level": "3str"},
|
||||
)
|
||||
except ImportError:
|
||||
# No Cython
|
||||
if not os.path.exists(os.path.join(os.path.dirname(__file__), "netfilterqueue.c")):
|
||||
if "egg_info" in sys.argv:
|
||||
# We're being run by pip to figure out what we need. Request cython in
|
||||
# setup_requires below.
|
||||
setup_requires += ["cython"]
|
||||
elif not os.path.exists(
|
||||
os.path.join(os.path.dirname(__file__), "netfilterqueue/_impl.c")
|
||||
):
|
||||
sys.stderr.write(
|
||||
"You must have Cython installed (`pip install cython`) to build this "
|
||||
"package from source.\nIf you're receiving this error when installing from "
|
||||
|
@ -24,19 +33,28 @@ except ImportError:
|
|||
)
|
||||
sys.exit(1)
|
||||
ext_modules = [
|
||||
Extension("netfilterqueue", ["netfilterqueue.c"], libraries=["netfilter_queue"])
|
||||
Extension(
|
||||
"netfilterqueue._impl",
|
||||
["netfilterqueue/_impl.c"],
|
||||
libraries=["netfilter_queue"],
|
||||
)
|
||||
]
|
||||
|
||||
setup(
|
||||
ext_modules=ext_modules,
|
||||
name="NetfilterQueue",
|
||||
version=VERSION,
|
||||
version=__version__,
|
||||
license="MIT",
|
||||
author="Matthew Fox",
|
||||
author_email="matt@tansen.ca",
|
||||
author="Matthew Fox <matt@tansen.ca>, Joshua Oreman <oremanj@gmail.com>",
|
||||
author_email="oremanj@gmail.com",
|
||||
url="https://github.com/oremanj/python-netfilterqueue",
|
||||
description="Python bindings for libnetfilter_queue",
|
||||
long_description=open("README.rst").read(),
|
||||
long_description=open("README.rst", encoding="utf-8").read(),
|
||||
packages=["netfilterqueue"],
|
||||
ext_modules=ext_modules,
|
||||
include_package_data=True,
|
||||
exclude_package_data={"netfilterqueue": ["*.c"]},
|
||||
setup_requires=setup_requires,
|
||||
python_requires=">=3.6",
|
||||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
|
|
|
@ -5,3 +5,4 @@ pytest-trio
|
|||
async_generator
|
||||
black
|
||||
platformdirs <= 2.4.0 # needed by black; 2.4.1+ don't support py3.6
|
||||
mypy; implementation_name == "cpython"
|
||||
|
|
|
@ -22,8 +22,12 @@ idna==3.3
|
|||
# via trio
|
||||
iniconfig==1.1.1
|
||||
# via pytest
|
||||
mypy==0.931 ; implementation_name == "cpython"
|
||||
# via -r test-requirements.in
|
||||
mypy-extensions==0.4.3
|
||||
# via black
|
||||
# via
|
||||
# black
|
||||
# mypy
|
||||
outcome==1.1.0
|
||||
# via
|
||||
# pytest-trio
|
||||
|
@ -57,10 +61,14 @@ sortedcontainers==2.4.0
|
|||
toml==0.10.2
|
||||
# via pytest
|
||||
tomli==1.2.3
|
||||
# via black
|
||||
# via
|
||||
# black
|
||||
# mypy
|
||||
trio==0.19.0
|
||||
# via
|
||||
# -r test-requirements.in
|
||||
# pytest-trio
|
||||
typing-extensions==4.0.1
|
||||
# via black
|
||||
# via
|
||||
# black
|
||||
# mypy
|
||||
|
|
|
@ -5,11 +5,12 @@ import socket
|
|||
import subprocess
|
||||
import sys
|
||||
import trio
|
||||
import unshare
|
||||
import unshare # type: ignore
|
||||
import netfilterqueue
|
||||
from typing import AsyncIterator
|
||||
from functools import partial
|
||||
from typing import Any, AsyncIterator, Callable, Dict, Optional, Tuple
|
||||
from async_generator import asynccontextmanager
|
||||
from pytest_trio.enable_trio_mode import *
|
||||
from pytest_trio.enable_trio_mode import * # type: ignore
|
||||
|
||||
|
||||
# We'll create three network namespaces, representing a router (which
|
||||
|
@ -44,8 +45,8 @@ def enter_netns() -> None:
|
|||
subprocess.run("/sbin/ip link set lo up".split(), check=True)
|
||||
|
||||
|
||||
@pytest.hookimpl(tryfirst=True)
|
||||
def pytest_runtestloop():
|
||||
@pytest.hookimpl(tryfirst=True) # type: ignore
|
||||
def pytest_runtestloop() -> None:
|
||||
if os.getuid() != 0:
|
||||
# Create a new user namespace for the whole test session
|
||||
outer = {"uid": os.getuid(), "gid": os.getgid()}
|
||||
|
@ -92,8 +93,10 @@ async def peer_main(idx: int, parent_fd: int) -> None:
|
|||
await peer.connect((peer_ip, peer_port))
|
||||
|
||||
# Enter the message-forwarding loop
|
||||
async def proxy_one_way(src, dest):
|
||||
while True:
|
||||
async def proxy_one_way(
|
||||
src: trio.socket.SocketType, dest: trio.socket.SocketType
|
||||
) -> None:
|
||||
while src.fileno() >= 0:
|
||||
try:
|
||||
msg = await src.recv(4096)
|
||||
except trio.ClosedResourceError:
|
||||
|
@ -111,13 +114,22 @@ async def peer_main(idx: int, parent_fd: int) -> None:
|
|||
nursery.start_soon(proxy_one_way, peer, parent)
|
||||
|
||||
|
||||
def _default_capture_cb(
|
||||
target: "trio.MemorySendChannel[netfilterqueue.Packet]",
|
||||
packet: netfilterqueue.Packet,
|
||||
) -> None:
|
||||
packet.retain()
|
||||
target.send_nowait(packet)
|
||||
|
||||
|
||||
class Harness:
|
||||
def __init__(self):
|
||||
self._received = {}
|
||||
self._conn = {}
|
||||
def __init__(self) -> None:
|
||||
self._received: Dict[int, trio.MemoryReceiveChannel[bytes]] = {}
|
||||
self._conn: Dict[int, trio.socket.SocketType] = {}
|
||||
self.dest_addr: Dict[int, Tuple[str, int]] = {}
|
||||
self.failed = False
|
||||
|
||||
async def _run_peer(self, idx: int, *, task_status):
|
||||
async def _run_peer(self, idx: int, *, task_status: Any) -> None:
|
||||
their_ip = PEER_IP[idx]
|
||||
my_ip = ROUTER_IP[idx]
|
||||
conn, child_conn = trio.socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
||||
|
@ -155,12 +167,14 @@ class Harness:
|
|||
"peer subprocess exited with code {}".format(retval)
|
||||
)
|
||||
finally:
|
||||
await trio.run_process(f"ip link delete veth{idx}".split())
|
||||
# On some kernels the veth device is removed when the subprocess exits
|
||||
# and its netns goes away. check=False to suppress that error.
|
||||
await trio.run_process(f"ip link delete veth{idx}".split(), check=False)
|
||||
|
||||
async def _manage_peer(self, idx: int, *, task_status):
|
||||
async def _manage_peer(self, idx: int, *, task_status: Any) -> None:
|
||||
async with trio.open_nursery() as nursery:
|
||||
await nursery.start(self._run_peer, idx)
|
||||
packets_w, packets_r = trio.open_memory_channel(math.inf)
|
||||
packets_w, packets_r = trio.open_memory_channel[bytes](math.inf)
|
||||
self._received[idx] = packets_r
|
||||
task_status.started()
|
||||
async with packets_w:
|
||||
|
@ -171,14 +185,18 @@ class Harness:
|
|||
await packets_w.send(msg)
|
||||
|
||||
@asynccontextmanager
|
||||
async def run(self):
|
||||
async def run(self) -> AsyncIterator[None]:
|
||||
async with trio.open_nursery() as nursery:
|
||||
async with trio.open_nursery() as start_nursery:
|
||||
start_nursery.start_soon(nursery.start, self._manage_peer, 1)
|
||||
start_nursery.start_soon(nursery.start, self._manage_peer, 2)
|
||||
# Tell each peer about the other one's port
|
||||
await self._conn[2].send(await self._received[1].receive())
|
||||
await self._conn[1].send(await self._received[2].receive())
|
||||
for idx in (1, 2):
|
||||
self.dest_addr[idx] = (
|
||||
PEER_IP[idx],
|
||||
int(await self._received[idx].receive()),
|
||||
)
|
||||
await self._conn[3 - idx].send(b"%d" % self.dest_addr[idx][1])
|
||||
yield
|
||||
self._conn[1].shutdown(socket.SHUT_WR)
|
||||
self._conn[2].shutdown(socket.SHUT_WR)
|
||||
|
@ -190,26 +208,22 @@ class Harness:
|
|||
f"Peer {idx} received unexepcted packet {remainder!r}"
|
||||
)
|
||||
|
||||
@asynccontextmanager
|
||||
async def capture_packets_to(
|
||||
self, idx: int, *, queue_num: int = -1, **options
|
||||
) -> AsyncIterator["trio.MemoryReceiveChannel[netfilterqueue.Packet]"]:
|
||||
|
||||
packets_w, packets_r = trio.open_memory_channel(math.inf)
|
||||
|
||||
def stash_packet(p):
|
||||
p.retain()
|
||||
packets_w.send_nowait(p)
|
||||
|
||||
def bind_queue(
|
||||
self,
|
||||
cb: Callable[[netfilterqueue.Packet], None],
|
||||
*,
|
||||
queue_num: int = -1,
|
||||
**options: int,
|
||||
) -> Tuple[int, netfilterqueue.NetfilterQueue]:
|
||||
nfq = netfilterqueue.NetfilterQueue()
|
||||
# Use a smaller socket buffer to avoid a warning in CI
|
||||
options.setdefault("sock_len", 131072)
|
||||
if queue_num >= 0:
|
||||
nfq.bind(queue_num, stash_packet, **options)
|
||||
nfq.bind(queue_num, cb, **options)
|
||||
else:
|
||||
for queue_num in range(16):
|
||||
try:
|
||||
nfq.bind(queue_num, stash_packet, **options)
|
||||
nfq.bind(queue_num, cb, **options)
|
||||
break
|
||||
except Exception as ex:
|
||||
last_error = ex
|
||||
|
@ -217,26 +231,53 @@ class Harness:
|
|||
raise RuntimeError(
|
||||
"Couldn't bind any netfilter queue number between 0-15"
|
||||
) from last_error
|
||||
return queue_num, nfq
|
||||
|
||||
@asynccontextmanager
|
||||
async def enqueue_packets_to(
|
||||
self, idx: int, queue_num: int, *, forwarded: bool = True
|
||||
) -> AsyncIterator[None]:
|
||||
if forwarded:
|
||||
chain = "FORWARD"
|
||||
else:
|
||||
chain = "OUTPUT"
|
||||
|
||||
rule = f"{chain} -d {PEER_IP[idx]} -j NFQUEUE --queue-num {queue_num}"
|
||||
await trio.run_process(f"/sbin/iptables -A {rule}".split())
|
||||
try:
|
||||
rule = f"-d {PEER_IP[idx]} -j NFQUEUE --queue-num {queue_num}"
|
||||
await trio.run_process(f"/sbin/iptables -A FORWARD {rule}".split())
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await trio.run_process(f"/sbin/iptables -D {rule}".split())
|
||||
|
||||
@asynccontextmanager
|
||||
async def capture_packets_to(
|
||||
self,
|
||||
idx: int,
|
||||
cb: Callable[
|
||||
["trio.MemorySendChannel[netfilterqueue.Packet]", netfilterqueue.Packet],
|
||||
None,
|
||||
] = _default_capture_cb,
|
||||
**options: int,
|
||||
) -> AsyncIterator["trio.MemoryReceiveChannel[netfilterqueue.Packet]"]:
|
||||
|
||||
packets_w, packets_r = trio.open_memory_channel[netfilterqueue.Packet](math.inf)
|
||||
queue_num, nfq = self.bind_queue(partial(cb, packets_w), **options)
|
||||
try:
|
||||
async with self.enqueue_packets_to(idx, queue_num):
|
||||
async with packets_w, trio.open_nursery() as nursery:
|
||||
|
||||
@nursery.start_soon
|
||||
async def listen_for_packets():
|
||||
async def listen_for_packets() -> None:
|
||||
while True:
|
||||
await trio.lowlevel.wait_readable(nfq.get_fd())
|
||||
nfq.run(block=False)
|
||||
|
||||
yield packets_r
|
||||
nursery.cancel_scope.cancel()
|
||||
finally:
|
||||
await trio.run_process(f"/sbin/iptables -D FORWARD {rule}".split())
|
||||
finally:
|
||||
nfq.unbind()
|
||||
|
||||
async def expect(self, idx: int, *packets: bytes):
|
||||
async def expect(self, idx: int, *packets: bytes) -> None:
|
||||
for expected in packets:
|
||||
with trio.move_on_after(5) as scope:
|
||||
received = await self._received[idx].receive()
|
||||
|
@ -252,13 +293,13 @@ class Harness:
|
|||
f"received {received!r}"
|
||||
)
|
||||
|
||||
async def send(self, idx: int, *packets: bytes):
|
||||
async def send(self, idx: int, *packets: bytes) -> None:
|
||||
for packet in packets:
|
||||
await self._conn[3 - idx].send(packet)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def harness() -> Harness:
|
||||
async def harness() -> AsyncIterator[Harness]:
|
||||
h = Harness()
|
||||
async with h.run():
|
||||
yield h
|
||||
|
|
|
@ -1,6 +1,16 @@
|
|||
import gc
|
||||
import struct
|
||||
import trio
|
||||
import os
|
||||
import pytest
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import trio
|
||||
import trio.testing
|
||||
import weakref
|
||||
|
||||
from netfilterqueue import NetfilterQueue, COPY_META
|
||||
|
||||
|
||||
async def test_comms_without_queue(harness):
|
||||
|
@ -13,6 +23,7 @@ async def test_comms_without_queue(harness):
|
|||
async def test_queue_dropping(harness):
|
||||
async def drop(packets, msg):
|
||||
async for packet in packets:
|
||||
assert "UDP packet" in str(packet)
|
||||
if packet.get_payload()[28:] == msg:
|
||||
packet.drop()
|
||||
else:
|
||||
|
@ -61,6 +72,7 @@ async def test_rewrite_reorder(harness):
|
|||
payload = packet.get_payload()[28:]
|
||||
if payload == b"one":
|
||||
set_udp_payload(packet, b"numero uno")
|
||||
assert b"numero uno" == packet.get_payload()[28:]
|
||||
packet.accept()
|
||||
elif payload == b"two":
|
||||
two = packet
|
||||
|
@ -78,11 +90,99 @@ async def test_rewrite_reorder(harness):
|
|||
await harness.expect(2, b"numero uno", b"three", b"TWO", b"four")
|
||||
|
||||
|
||||
async def test_mark_repeat(harness):
|
||||
counter = 0
|
||||
timestamps = []
|
||||
|
||||
def cb(chan, pkt):
|
||||
nonlocal counter
|
||||
with pytest.raises(RuntimeError, match="Packet has no payload"):
|
||||
pkt.get_payload()
|
||||
assert pkt.get_mark() == counter
|
||||
timestamps.append(pkt.get_timestamp())
|
||||
if counter < 5:
|
||||
counter += 1
|
||||
pkt.set_mark(counter)
|
||||
pkt.repeat()
|
||||
assert pkt.get_mark() == counter
|
||||
else:
|
||||
pkt.accept()
|
||||
|
||||
async with harness.capture_packets_to(2, cb, mode=COPY_META):
|
||||
t0 = time.time()
|
||||
await harness.send(2, b"testing")
|
||||
await harness.expect(2, b"testing")
|
||||
t1 = time.time()
|
||||
assert counter == 5
|
||||
# All iterations of the packet have the same timestamps
|
||||
assert all(t == timestamps[0] for t in timestamps[1:])
|
||||
assert t0 < timestamps[0] < t1
|
||||
|
||||
|
||||
async def test_hwaddr_and_inoutdev(harness):
|
||||
hwaddrs = []
|
||||
inoutdevs = []
|
||||
|
||||
def cb(pkt):
|
||||
hwaddrs.append((pkt.get_hw(), pkt.hook, pkt.get_payload()[28:]))
|
||||
inoutdevs.append((pkt.indev, pkt.outdev))
|
||||
pkt.accept()
|
||||
|
||||
queue_num, nfq = harness.bind_queue(cb)
|
||||
try:
|
||||
async with trio.open_nursery() as nursery:
|
||||
|
||||
@nursery.start_soon
|
||||
async def listen_for_packets():
|
||||
while True:
|
||||
await trio.lowlevel.wait_readable(nfq.get_fd())
|
||||
nfq.run(block=False)
|
||||
|
||||
async with harness.enqueue_packets_to(2, queue_num, forwarded=True):
|
||||
await harness.send(2, b"one", b"two")
|
||||
await harness.expect(2, b"one", b"two")
|
||||
async with harness.enqueue_packets_to(2, queue_num, forwarded=False):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
||||
for payload in (b"three", b"four"):
|
||||
sock.sendto(payload, harness.dest_addr[2])
|
||||
with trio.fail_after(1):
|
||||
while len(hwaddrs) < 4:
|
||||
await trio.sleep(0.1)
|
||||
nursery.cancel_scope.cancel()
|
||||
finally:
|
||||
nfq.unbind()
|
||||
|
||||
# Forwarded packets capture a hwaddr, but OUTPUT don't
|
||||
FORWARD = 2
|
||||
OUTPUT = 3
|
||||
mac1 = hwaddrs[0][0]
|
||||
assert mac1 is not None
|
||||
assert hwaddrs == [
|
||||
(mac1, FORWARD, b"one"),
|
||||
(mac1, FORWARD, b"two"),
|
||||
(None, OUTPUT, b"three"),
|
||||
(None, OUTPUT, b"four"),
|
||||
]
|
||||
|
||||
if sys.implementation.name != "pypy":
|
||||
# pypy doesn't appear to provide if_nametoindex()
|
||||
iface1 = socket.if_nametoindex("veth1")
|
||||
iface2 = socket.if_nametoindex("veth2")
|
||||
else:
|
||||
iface1, iface2 = inoutdevs[0]
|
||||
assert 0 != iface1 != iface2 != 0
|
||||
assert inoutdevs == [
|
||||
(iface1, iface2),
|
||||
(iface1, iface2),
|
||||
(0, iface2),
|
||||
(0, iface2),
|
||||
]
|
||||
|
||||
|
||||
async def test_errors(harness):
|
||||
with pytest.warns(RuntimeWarning, match="rcvbuf limit is") as record:
|
||||
async with harness.capture_packets_to(2, sock_len=2 ** 30):
|
||||
pass
|
||||
|
||||
assert record[0].filename.endswith("conftest.py")
|
||||
|
||||
async with harness.capture_packets_to(2, queue_num=0):
|
||||
|
@ -90,9 +190,152 @@ async def test_errors(harness):
|
|||
async with harness.capture_packets_to(2, queue_num=0):
|
||||
pass
|
||||
|
||||
from netfilterqueue import NetfilterQueue
|
||||
_, nfq = harness.bind_queue(lambda: None, queue_num=1)
|
||||
with pytest.raises(RuntimeError, match="A queue is already bound"):
|
||||
nfq.bind(2, lambda p: None)
|
||||
|
||||
# Test unbinding via __del__
|
||||
nfq = weakref.ref(nfq)
|
||||
for _ in range(4):
|
||||
gc.collect()
|
||||
if nfq() is None:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Couldn't trigger garbage collection of NFQ")
|
||||
|
||||
|
||||
async def test_unretained(harness):
|
||||
def cb(chan, pkt):
|
||||
# Can access payload within callback
|
||||
assert "UDP packet" in str(pkt)
|
||||
assert pkt.get_payload()[-3:] in (b"one", b"two")
|
||||
chan.send_nowait(pkt)
|
||||
|
||||
# Capture packets without retaining -> can't access payload after cb returns
|
||||
async with harness.capture_packets_to(2, cb) as chan:
|
||||
await harness.send(2, b"one", b"two")
|
||||
accept = True
|
||||
async for p in chan:
|
||||
with pytest.raises(
|
||||
RuntimeError, match="Payload data is no longer available"
|
||||
):
|
||||
p.get_payload()
|
||||
assert "contents unretained" in str(p)
|
||||
# Can still issue verdicts though
|
||||
if accept:
|
||||
p.accept()
|
||||
accept = False
|
||||
else:
|
||||
break
|
||||
|
||||
with pytest.raises(RuntimeError, match="Parent queue has already been unbound"):
|
||||
p.drop()
|
||||
await harness.expect(2, b"one")
|
||||
|
||||
|
||||
async def test_cb_exception(harness):
|
||||
pkt = None
|
||||
|
||||
def cb(channel, p):
|
||||
nonlocal pkt
|
||||
pkt = p
|
||||
raise ValueError("test")
|
||||
|
||||
# Error raised within run():
|
||||
with pytest.raises(ValueError, match="test"):
|
||||
async with harness.capture_packets_to(2, cb):
|
||||
await harness.send(2, b"boom")
|
||||
with trio.fail_after(1):
|
||||
try:
|
||||
await trio.sleep_forever()
|
||||
finally:
|
||||
# At this point the error has been raised (since we were
|
||||
# cancelled) but the queue is still open. We shouldn't
|
||||
# be able to access the payload, since we didn't retain(),
|
||||
# but verdicts should otherwise work.
|
||||
with pytest.raises(RuntimeError, match="Payload data is no longer"):
|
||||
pkt.get_payload()
|
||||
pkt.accept()
|
||||
|
||||
await harness.expect(2, b"boom")
|
||||
|
||||
with pytest.raises(RuntimeError, match="Verdict already given for this packet"):
|
||||
pkt.drop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.implementation.name == "pypy",
|
||||
reason="pypy does not support PyErr_CheckSignals",
|
||||
)
|
||||
def test_signal():
|
||||
nfq = NetfilterQueue()
|
||||
nfq.bind(1, lambda p: None, sock_len=131072)
|
||||
with pytest.raises(RuntimeError, match="A queue is already bound"):
|
||||
nfq.bind(2, lambda p: None, sock_len=131072)
|
||||
|
||||
def raise_alarm(sig, frame):
|
||||
raise KeyboardInterrupt("brrrrrring!")
|
||||
|
||||
old_handler = signal.signal(signal.SIGALRM, raise_alarm)
|
||||
old_timer = signal.setitimer(signal.ITIMER_REAL, 0.5, 0)
|
||||
try:
|
||||
with pytest.raises(KeyboardInterrupt, match="brrrrrring!") as exc_info:
|
||||
nfq.run()
|
||||
assert any("NetfilterQueue.run" in line.name for line in exc_info.traceback)
|
||||
finally:
|
||||
nfq.unbind()
|
||||
signal.setitimer(signal.ITIMER_REAL, *old_timer)
|
||||
signal.signal(signal.SIGALRM, old_handler)
|
||||
|
||||
|
||||
async def test_external_fd(harness):
|
||||
child_prog = """
|
||||
import os, sys, unshare
|
||||
from netfilterqueue import NetfilterQueue
|
||||
unshare.unshare(unshare.CLONE_NEWNET)
|
||||
nfq = NetfilterQueue(sockfd=int(sys.argv[1]))
|
||||
def cb(pkt):
|
||||
pkt.accept()
|
||||
sys.exit(pkt.get_payload()[28:].decode("ascii"))
|
||||
nfq.bind(1, cb, sock_len=131072)
|
||||
os.write(1, b"ok\\n")
|
||||
try:
|
||||
nfq.run()
|
||||
finally:
|
||||
nfq.unbind()
|
||||
"""
|
||||
async with trio.open_nursery() as nursery:
|
||||
|
||||
async def monitor_in_child(task_status):
|
||||
with trio.fail_after(5):
|
||||
r, w = os.pipe()
|
||||
# 12 is NETLINK_NETFILTER family
|
||||
nlsock = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, 12)
|
||||
|
||||
@nursery.start_soon
|
||||
async def wait_started():
|
||||
await trio.lowlevel.wait_readable(r)
|
||||
assert b"ok\n" == os.read(r, 16)
|
||||
nlsock.close()
|
||||
os.close(w)
|
||||
os.close(r)
|
||||
task_status.started()
|
||||
|
||||
result = await trio.run_process(
|
||||
[sys.executable, "-c", child_prog, str(nlsock.fileno())],
|
||||
stdout=w,
|
||||
capture_stderr=True,
|
||||
check=False,
|
||||
pass_fds=(nlsock.fileno(),),
|
||||
)
|
||||
assert result.stderr == b"this is a test\n"
|
||||
|
||||
await nursery.start(monitor_in_child)
|
||||
async with harness.enqueue_packets_to(2, queue_num=1):
|
||||
await harness.send(2, b"this is a test")
|
||||
await harness.expect(2, b"this is a test")
|
||||
|
||||
with pytest.raises(OSError, match="dup2 failed"):
|
||||
NetfilterQueue(sockfd=1000)
|
||||
|
||||
with pytest.raises(OSError, match="Failed to open NFQueue"):
|
||||
with open("/dev/null") as fp:
|
||||
NetfilterQueue(sockfd=fp.fileno())
|
||||
|
|
Loading…
Reference in New Issue