229 lines
6.4 KiB
C
229 lines
6.4 KiB
C
|
// SPDX-License-Identifier: GPL-2.0-or-later
|
||
|
/*
|
||
|
* sch_plug.c Queue traffic until an explicit release command
|
||
|
*
|
||
|
* There are two ways to use this qdisc:
|
||
|
* 1. A simple "instantaneous" plug/unplug operation, by issuing an alternating
|
||
|
* sequence of TCQ_PLUG_BUFFER & TCQ_PLUG_RELEASE_INDEFINITE commands.
|
||
|
*
|
||
|
* 2. For network output buffering (a.k.a output commit) functionality.
|
||
|
* Output commit property is commonly used by applications using checkpoint
|
||
|
* based fault-tolerance to ensure that the checkpoint from which a system
|
||
|
* is being restored is consistent w.r.t outside world.
|
||
|
*
|
||
|
* Consider for e.g. Remus - a Virtual Machine checkpointing system,
|
||
|
* wherein a VM is checkpointed, say every 50ms. The checkpoint is replicated
|
||
|
* asynchronously to the backup host, while the VM continues executing the
|
||
|
* next epoch speculatively.
|
||
|
*
|
||
|
* The following is a typical sequence of output buffer operations:
|
||
|
* 1.At epoch i, start_buffer(i)
|
||
|
* 2. At end of epoch i (i.e. after 50ms):
|
||
|
* 2.1 Stop VM and take checkpoint(i).
|
||
|
* 2.2 start_buffer(i+1) and Resume VM
|
||
|
* 3. While speculatively executing epoch(i+1), asynchronously replicate
|
||
|
* checkpoint(i) to backup host.
|
||
|
* 4. When checkpoint_ack(i) is received from backup, release_buffer(i)
|
||
|
* Thus, this Qdisc would receive the following sequence of commands:
|
||
|
* TCQ_PLUG_BUFFER (epoch i)
|
||
|
* .. TCQ_PLUG_BUFFER (epoch i+1)
|
||
|
* ....TCQ_PLUG_RELEASE_ONE (epoch i)
|
||
|
* ......TCQ_PLUG_BUFFER (epoch i+2)
|
||
|
* ........
|
||
|
*/
|
||
|
|
||
|
#include <linux/module.h>
|
||
|
#include <linux/types.h>
|
||
|
#include <linux/kernel.h>
|
||
|
#include <linux/errno.h>
|
||
|
#include <linux/netdevice.h>
|
||
|
#include <linux/skbuff.h>
|
||
|
#include <net/pkt_sched.h>
|
||
|
|
||
|
/*
|
||
|
* State of the queue, when used for network output buffering:
|
||
|
*
|
||
|
* plug(i+1) plug(i) head
|
||
|
* ------------------+--------------------+---------------->
|
||
|
* | |
|
||
|
* | |
|
||
|
* pkts_current_epoch| pkts_last_epoch |pkts_to_release
|
||
|
* ----------------->|<--------+--------->|+--------------->
|
||
|
* v v
|
||
|
*
|
||
|
*/
|
||
|
|
||
|
struct plug_sched_data {
|
||
|
/* If true, the dequeue function releases all packets
|
||
|
* from head to end of the queue. The queue turns into
|
||
|
* a pass-through queue for newly arriving packets.
|
||
|
*/
|
||
|
bool unplug_indefinite;
|
||
|
|
||
|
bool throttled;
|
||
|
|
||
|
/* Queue Limit in bytes */
|
||
|
u32 limit;
|
||
|
|
||
|
/* Number of packets (output) from the current speculatively
|
||
|
* executing epoch.
|
||
|
*/
|
||
|
u32 pkts_current_epoch;
|
||
|
|
||
|
/* Number of packets corresponding to the recently finished
|
||
|
* epoch. These will be released when we receive a
|
||
|
* TCQ_PLUG_RELEASE_ONE command. This command is typically
|
||
|
* issued after committing a checkpoint at the target.
|
||
|
*/
|
||
|
u32 pkts_last_epoch;
|
||
|
|
||
|
/*
|
||
|
* Number of packets from the head of the queue, that can
|
||
|
* be released (committed checkpoint).
|
||
|
*/
|
||
|
u32 pkts_to_release;
|
||
|
};
|
||
|
|
||
|
static int plug_enqueue(struct sk_buff *skb, struct Qdisc *sch,
|
||
|
struct sk_buff **to_free)
|
||
|
{
|
||
|
struct plug_sched_data *q = qdisc_priv(sch);
|
||
|
|
||
|
if (likely(sch->qstats.backlog + skb->len <= q->limit)) {
|
||
|
if (!q->unplug_indefinite)
|
||
|
q->pkts_current_epoch++;
|
||
|
return qdisc_enqueue_tail(skb, sch);
|
||
|
}
|
||
|
|
||
|
return qdisc_drop(skb, sch, to_free);
|
||
|
}
|
||
|
|
||
|
static struct sk_buff *plug_dequeue(struct Qdisc *sch)
|
||
|
{
|
||
|
struct plug_sched_data *q = qdisc_priv(sch);
|
||
|
|
||
|
if (q->throttled)
|
||
|
return NULL;
|
||
|
|
||
|
if (!q->unplug_indefinite) {
|
||
|
if (!q->pkts_to_release) {
|
||
|
/* No more packets to dequeue. Block the queue
|
||
|
* and wait for the next release command.
|
||
|
*/
|
||
|
q->throttled = true;
|
||
|
return NULL;
|
||
|
}
|
||
|
q->pkts_to_release--;
|
||
|
}
|
||
|
|
||
|
return qdisc_dequeue_head(sch);
|
||
|
}
|
||
|
|
||
|
static int plug_init(struct Qdisc *sch, struct nlattr *opt,
|
||
|
struct netlink_ext_ack *extack)
|
||
|
{
|
||
|
struct plug_sched_data *q = qdisc_priv(sch);
|
||
|
|
||
|
q->pkts_current_epoch = 0;
|
||
|
q->pkts_last_epoch = 0;
|
||
|
q->pkts_to_release = 0;
|
||
|
q->unplug_indefinite = false;
|
||
|
|
||
|
if (opt == NULL) {
|
||
|
q->limit = qdisc_dev(sch)->tx_queue_len
|
||
|
* psched_mtu(qdisc_dev(sch));
|
||
|
} else {
|
||
|
struct tc_plug_qopt *ctl = nla_data(opt);
|
||
|
|
||
|
if (nla_len(opt) < sizeof(*ctl))
|
||
|
return -EINVAL;
|
||
|
|
||
|
q->limit = ctl->limit;
|
||
|
}
|
||
|
|
||
|
q->throttled = true;
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
/* Receives 4 types of messages:
|
||
|
* TCQ_PLUG_BUFFER: Inset a plug into the queue and
|
||
|
* buffer any incoming packets
|
||
|
* TCQ_PLUG_RELEASE_ONE: Dequeue packets from queue head
|
||
|
* to beginning of the next plug.
|
||
|
* TCQ_PLUG_RELEASE_INDEFINITE: Dequeue all packets from queue.
|
||
|
* Stop buffering packets until the next TCQ_PLUG_BUFFER
|
||
|
* command is received (just act as a pass-thru queue).
|
||
|
* TCQ_PLUG_LIMIT: Increase/decrease queue size
|
||
|
*/
|
||
|
static int plug_change(struct Qdisc *sch, struct nlattr *opt,
|
||
|
struct netlink_ext_ack *extack)
|
||
|
{
|
||
|
struct plug_sched_data *q = qdisc_priv(sch);
|
||
|
struct tc_plug_qopt *msg;
|
||
|
|
||
|
msg = nla_data(opt);
|
||
|
if (nla_len(opt) < sizeof(*msg))
|
||
|
return -EINVAL;
|
||
|
|
||
|
switch (msg->action) {
|
||
|
case TCQ_PLUG_BUFFER:
|
||
|
/* Save size of the current buffer */
|
||
|
q->pkts_last_epoch = q->pkts_current_epoch;
|
||
|
q->pkts_current_epoch = 0;
|
||
|
if (q->unplug_indefinite)
|
||
|
q->throttled = true;
|
||
|
q->unplug_indefinite = false;
|
||
|
break;
|
||
|
case TCQ_PLUG_RELEASE_ONE:
|
||
|
/* Add packets from the last complete buffer to the
|
||
|
* packets to be released set.
|
||
|
*/
|
||
|
q->pkts_to_release += q->pkts_last_epoch;
|
||
|
q->pkts_last_epoch = 0;
|
||
|
q->throttled = false;
|
||
|
netif_schedule_queue(sch->dev_queue);
|
||
|
break;
|
||
|
case TCQ_PLUG_RELEASE_INDEFINITE:
|
||
|
q->unplug_indefinite = true;
|
||
|
q->pkts_to_release = 0;
|
||
|
q->pkts_last_epoch = 0;
|
||
|
q->pkts_current_epoch = 0;
|
||
|
q->throttled = false;
|
||
|
netif_schedule_queue(sch->dev_queue);
|
||
|
break;
|
||
|
case TCQ_PLUG_LIMIT:
|
||
|
/* Limit is supplied in bytes */
|
||
|
q->limit = msg->limit;
|
||
|
break;
|
||
|
default:
|
||
|
return -EINVAL;
|
||
|
}
|
||
|
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
static struct Qdisc_ops plug_qdisc_ops __read_mostly = {
|
||
|
.id = "plug",
|
||
|
.priv_size = sizeof(struct plug_sched_data),
|
||
|
.enqueue = plug_enqueue,
|
||
|
.dequeue = plug_dequeue,
|
||
|
.peek = qdisc_peek_dequeued,
|
||
|
.init = plug_init,
|
||
|
.change = plug_change,
|
||
|
.reset = qdisc_reset_queue,
|
||
|
.owner = THIS_MODULE,
|
||
|
};
|
||
|
|
||
|
static int __init plug_module_init(void)
|
||
|
{
|
||
|
return register_qdisc(&plug_qdisc_ops);
|
||
|
}
|
||
|
|
||
|
static void __exit plug_module_exit(void)
|
||
|
{
|
||
|
unregister_qdisc(&plug_qdisc_ops);
|
||
|
}
|
||
|
module_init(plug_module_init)
|
||
|
module_exit(plug_module_exit)
|
||
|
MODULE_LICENSE("GPL");
|