1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

Merge pull request #15952 from miri64/congure/feat/congure_quic

congure_quic: initial import of QUIC congestion control
This commit is contained in:
Martine Lenders 2022-10-17 14:35:18 +02:00 committed by GitHub
commit 68b9637295
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1825 additions and 0 deletions

View File

@ -37,6 +37,10 @@ ifneq (,$(filter congure_%,$(USEMODULE)))
USEMODULE += congure
endif
ifneq (,$(filter congure_quic,$(USEMODULE)))
USEMODULE += ztimer_msec
endif
ifneq (,$(filter congure_test,$(USEMODULE)))
USEMODULE += fmt
endif

View File

@ -9,6 +9,7 @@ menu "CongURE congestion control abstraction"
depends on USEMODULE_CONGURE
rsource "mock/Kconfig"
rsource "quic/Kconfig"
rsource "reno/Kconfig"
rsource "test/Kconfig"
@ -23,6 +24,7 @@ menuconfig MODULE_CONGURE
if MODULE_CONGURE
rsource "mock/Kconfig"
rsource "quic/Kconfig"
rsource "reno/Kconfig"
rsource "test/Kconfig"

View File

@ -1,3 +1,6 @@
ifneq (,$(filter congure_quic,$(USEMODULE)))
DIRS += quic
endif
ifneq (,$(filter congure_mock,$(USEMODULE)))
DIRS += mock
endif

4
sys/congure/quic/Kconfig Normal file
View File

@ -0,0 +1,4 @@
config MODULE_CONGURE_QUIC
bool "CongURE implementation of QUIC's congestion control"
depends on MODULE_CONGURE
depends on MODULE_ZTIMER

View File

@ -0,0 +1,3 @@
MODULE := congure_quic
include $(RIOTBASE)/Makefile.base

View File

@ -0,0 +1,296 @@
/*
* Copyright (C) 2021 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @{
*
* @file
* @author Martine Lenders <m.lenders@fu-berlin.de>
*
* See [RFC 9002, Appendix B](https://tools.ietf.org/html/rfc9002#appendix-B)
* and parts of [RFC 9002, Appendix A](https://tools.ietf.org/html/rfc9002#appendix-A)
* (for pacing calculation) as basis for this implementation.
*/
#include <assert.h>
#include <limits.h>
#include <stdint.h>
#include <stdlib.h>
#include "clist.h"
#include "timex.h"
#include "ztimer.h"
#include "congure/quic.h"
static void _snd_init(congure_snd_t *cong, void *ctx);
static int32_t _snd_inter_msg_interval(congure_snd_t *cong, unsigned msg_size);
static void _snd_report_msg_sent(congure_snd_t *cong, unsigned sent_size);
static void _snd_report_msg_discarded(congure_snd_t *cong, unsigned msg_size);
static void _snd_report_msgs_lost(congure_snd_t *cong, congure_snd_msg_t *msgs);
static void _snd_report_msg_acked(congure_snd_t *cong, congure_snd_msg_t *msg,
congure_snd_ack_t *ack);
static void _snd_report_ecn_ce(congure_snd_t *cong, ztimer_now_t time);
static const congure_snd_driver_t _driver = {
.init = _snd_init,
.inter_msg_interval = _snd_inter_msg_interval,
.report_msg_sent = _snd_report_msg_sent,
.report_msg_discarded = _snd_report_msg_discarded,
.report_msgs_timeout = _snd_report_msgs_lost,
.report_msgs_lost = _snd_report_msgs_lost,
.report_msg_acked = _snd_report_msg_acked,
.report_ecn_ce = _snd_report_ecn_ce,
};
static inline bool _in_recov(congure_quic_snd_t *c, ztimer_now_t sent_time)
{
return sent_time <= c->recovery_start;
}
static void _on_congestion_event(congure_quic_snd_t *c, ztimer_now_t sent_time)
{
if (_in_recov(c, sent_time)) {
return;
}
/* enter congestion recovery period */
c->recovery_start = ztimer_now(ZTIMER_MSEC);
c->ssthresh = (c->super.cwnd * c->consts->loss_reduction_numerator)
/ c->consts->loss_reduction_denominator;
c->super.cwnd = (c->ssthresh > c->consts->min_wnd)
? c->ssthresh : c->consts->min_wnd;
if (c->consts->cong_event_cb) {
c->consts->cong_event_cb(c->super.ctx);
}
}
static void _update_rtts(congure_quic_snd_t *c, ztimer_now_t msg_send_time,
ztimer_now_t ack_recv_time, uint16_t ack_delay)
{
uint16_t latest_rtt;
assert((ack_recv_time - msg_send_time) <= UINT16_MAX);
/* we assume that is in the uint16_t range, but just in case NDEBUG
* is set, let's cap it at UINT16_MAX */
if ((ack_recv_time - msg_send_time) > UINT16_MAX) {
latest_rtt = UINT16_MAX;
}
else {
latest_rtt = ack_recv_time - msg_send_time;
}
if (c->first_rtt_sample > 0) { /* an RTT sample was taken */
c->min_rtt = (c->min_rtt > latest_rtt) ? latest_rtt : c->min_rtt;
/* adjust latest_rtt for ack_delay if plausible */
if (latest_rtt > (c->min_rtt + ack_delay)) {
latest_rtt -= ack_delay;
}
c->rtt_var = ((3U * c->rtt_var) / 4U)
+ (abs((int)c->smoothed_rtt - (int)latest_rtt) / 4U);
c->smoothed_rtt = ((7U * c->smoothed_rtt) / 8U) + (latest_rtt / 8U);
}
else {
c->min_rtt = latest_rtt;
c->smoothed_rtt = latest_rtt;
c->rtt_var = latest_rtt / 2;
c->first_rtt_sample = ztimer_now(ZTIMER_MSEC);
}
}
static void _reset_cwnd_in_pc(congure_quic_snd_t *c)
{
c->super.cwnd = c->consts->min_wnd;
if (c->ssthresh < c->consts->min_wnd) {
/* See https://github.com/quicwg/base-drafts/issues/4826#issuecomment-776305871
* XXX: this differs from the pseudo-code in
* Appendix B.8, where when `ssthresh` is lower than
* `cwnd` (e.g. because )
*/
c->ssthresh = c->consts->min_wnd;
}
c->recovery_start = 0;
}
static void _reset_cwnd(congure_quic_snd_t *c, congure_snd_msg_t *msgs)
{
/* Reset the congestion window if the loss of these packets indicates
* persistent congestion. Only consider packets sent after getting an RTT
* sample */
if (c->first_rtt_sample > 0U) {
/* XXX need to untangle clist_foreach() to add to lost and remove
* elements from `msgs` in-place (using prev and next) */
congure_snd_msg_t *ptr = (congure_snd_msg_t *)msgs->super.next;
/* untangle clist_foreach, since there is no easy
* way to provide both `lost` and `c` to the handler function */
if (ptr) {
ztimer_now_t latest = 0U;
ztimer_now_t earliest =
((congure_snd_msg_t *)ptr->super.next)->send_time;
uint32_t pc_duration; /* use uint32_t here to prevent overflows */
uint16_t rtt_var = (4 * c->rtt_var);
if (rtt_var > c->consts->granularity) {
rtt_var = c->consts->granularity;
}
pc_duration = (c->smoothed_rtt + rtt_var + c->max_ack_delay) *
c->consts->pc_thresh;
do {
ptr = (congure_snd_msg_t *)ptr->super.next;
if (ptr->send_time > c->first_rtt_sample) {
/* consider for persistent congestion */
if (latest < ptr->send_time) {
latest = ptr->send_time;
}
if (earliest > ptr->send_time) {
earliest = ptr->send_time;
}
if ((latest - earliest) > pc_duration) {
/* in persistent congestion */
_reset_cwnd_in_pc(c);
}
}
} while ((&ptr->super) != msgs->super.next);
}
}
}
static void _dec_flight_size(congure_quic_snd_t *c, unsigned msg_size)
{
/* check for integer underflow */
if ((c->in_flight_size - msg_size) > c->in_flight_size) {
c->in_flight_size = 0U;
}
else {
c->in_flight_size -= msg_size;
}
}
static void _snd_init(congure_snd_t *cong, void *ctx)
{
congure_quic_snd_t *c = (congure_quic_snd_t *)cong;
c->super.ctx = ctx;
c->first_rtt_sample = 0;
c->super.cwnd = c->consts->init_wnd;
c->in_flight_size = 0U;
c->recovery_start = 0U;
c->ssthresh = CONGURE_WND_SIZE_MAX;
c->limited = 0U;
c->max_ack_delay = 0U;
c->smoothed_rtt = c->consts->init_rtt;
c->rtt_var = c->consts->init_rtt / 2U;
c->min_rtt = 0U;
}
static int32_t _snd_inter_msg_interval(congure_snd_t *cong, unsigned msg_size)
{
congure_quic_snd_t *c = container_of(cong, congure_quic_snd_t, super);
/* interval in QUIC spec is a divisor, so flip denominator and numerator;
* smoothed_rtt is in ms, but expected result is in us */
return (c->consts->inter_msg_interval_denominator * c->smoothed_rtt *
msg_size * US_PER_MS) /
(c->consts->inter_msg_interval_numerator * c->super.cwnd);
}
static void _snd_report_msg_sent(congure_snd_t *cong, unsigned sent_size)
{
congure_quic_snd_t *c = (congure_quic_snd_t *)cong;
if ((c->in_flight_size + sent_size) < c->super.cwnd) {
c->in_flight_size += sent_size;
}
else {
/* state machine is dependent on flight size being smaller or equal
* to cwnd as such cap cwnd here, in case caller reports a message in
* flight that was marked as lost, but the caller is using a later
* message to send another ACK. */
c->in_flight_size = c->super.cwnd;
}
}
static void _snd_report_msg_discarded(congure_snd_t *cong, unsigned msg_size)
{
congure_quic_snd_t *c = (congure_quic_snd_t *)cong;
assert(msg_size <= c->in_flight_size);
_dec_flight_size(c, msg_size);
}
static void _snd_report_msgs_lost(congure_snd_t *cong, congure_snd_msg_t *msgs)
{
congure_quic_snd_t *c = (congure_quic_snd_t *)cong;
/* XXX need to untangle clist_foreach() to record last_lost_sent */
congure_snd_msg_t *ptr = (congure_snd_msg_t *)msgs->super.next;
ztimer_now_t last_lost_sent = 0U;
if (ptr) {
do {
ptr = (congure_snd_msg_t *)ptr->super.next;
_dec_flight_size(c, ptr->size);
if (last_lost_sent < ptr->send_time) {
last_lost_sent = ptr->send_time;
}
} while ((&ptr->super) != msgs->super.next);
}
if (last_lost_sent) {
_on_congestion_event(c, last_lost_sent);
}
_reset_cwnd(c, msgs);
}
static void _snd_report_msg_acked(congure_snd_t *cong, congure_snd_msg_t *msg,
congure_snd_ack_t *ack)
{
congure_quic_snd_t *c = (congure_quic_snd_t *)cong;
_dec_flight_size(c, msg->size);
/* https://tools.ietf.org/html/rfc9002#appendix-A.7 */
if ((msg->size > 0) && (ack->recv_time > 0)) {
_update_rtts(c, msg->send_time, ack->recv_time, ack->delay);
}
/* Do not increase congestion_window if application limited or flow control
* limited. */
if (c->limited) {
return;
}
/* do not change congestion window in recovery period */
if (_in_recov(c, msg->send_time)) {
return;
}
if (c->super.cwnd < c->ssthresh) {
/* in slow start mode */
c->super.cwnd += msg->size;
}
else {
/* congestion avoidance */
c->super.cwnd += (c->consts->max_msg_size * msg->size) / c->super.cwnd;
}
}
static void _snd_report_ecn_ce(congure_snd_t *cong, ztimer_now_t time)
{
_on_congestion_event((congure_quic_snd_t *)cong, time);
}
void congure_quic_snd_setup(congure_quic_snd_t *c,
const congure_quic_snd_consts_t *consts)
{
assert(consts->inter_msg_interval_numerator >=
consts->inter_msg_interval_denominator);
c->super.driver = &_driver;
c->consts = consts;
}
/** @} */

233
sys/include/congure/quic.h Normal file
View File

@ -0,0 +1,233 @@
/*
* Copyright (C) 2021 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup sys_congure_quic CongURE implementation of QUIC's CC
* @ingroup sys_congure
* @brief Implementation of QUIC's congestion control algorithm for the
* CongURE framework.
* @{
*
* @file
*
* @author Martine S. Lenders <m.lenders@fu-berlin.de>
*/
#ifndef CONGURE_QUIC_H
#define CONGURE_QUIC_H
#include "ztimer.h"
#include "congure.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Constants for the congestion control.
*
* Example usage (to use the same values as specified in
* [RFC 9002](https://tools.ietf.org/html/rfc9002#section-7.6)):
*
* ~~~~~~~~~~~~~~~~ {.c}
* static const congure_quic_snd_consts_t consts = {
* .cong_event_cb = _maybe_send_one_pkt,
* .init_wnd = 12000, // 10 * max_datagram_size
* .min_wnd = 2400, // 2 * max_datagram_size
* .init_rtt = 333, // kInitialRtt = 333ms
* .max_msg_size = 1200, // max_datagram_size
* .pc_thresh = 3000, // kPersistentCongestionThreshold = 3s
* .granularity = 1, // kGranularity = 1ms
* .loss_reduction_numerator = 1, // kLossReductionFactor = .5
* .loss_reduction_denominator = 2,
* .inter_msg_interval_numerator = 5, // Pacing factor N = 1.25
* .inter_msg_interval_denominator = 4,
* }
* static congure_quic_snd_t cong;
*
* // ...
* congure_quic_snd_setup(&cong, &const);
* ~~~~~~~~~~~~~~~~
*/
typedef struct {
/**
* @brief congestion event callback
*
* This callback is called when congestion event is detected by
* message loss or a CE notification. QUIC typically uses this to send
* a packet to speed up loss
* recovery.
*
* @param[in] ctx callback context
*/
void (*cong_event_cb)(void *ctx);
/**
* @brief Initial congestion window size in initiator-defined units.
*/
congure_wnd_size_t init_wnd;
/**
* @brief minimum congestion window size in initiator-defined units.
*/
congure_wnd_size_t min_wnd;
/**
* @brief The assumed RTT in milliseconds before an RTT sample is taken
*/
uint16_t init_rtt;
/**
* @brief maximum message size in initiator-defined units.
*/
uint16_t max_msg_size;
/**
* @brief period of time in milliseconds for persistent congestion
* to be establisched
* @see [RFC 9002, section 7.6](https://tools.ietf.org/html/rfc9002#section-7.6)
*/
uint16_t pc_thresh;
/**
* @brief system timer granularity in milliseconds (typically 1)
*/
uint16_t granularity;
/**
* @brief numerator for the factor the congestion window should be
* reduced by when a new loss event is detected
*/
uint8_t loss_reduction_numerator;
/**
* @brief denominator for the factor the congestion window should be
* reduced by when a new loss event is detected
*/
uint8_t loss_reduction_denominator;
/**
* @brief numerator for the factor N used to adapt the message interval
*
* @see [RFC 9002, section 7.7](https://tools.ietf.org/html/rfc9002#section-7.7)
*/
uint8_t inter_msg_interval_numerator;
/**
* @brief denominator for the factor N used to adapt the message interval
*
* @see [RFC 9002, section 7.7](https://tools.ietf.org/html/rfc9002#section-7.7)
*/
uint8_t inter_msg_interval_denominator;
} congure_quic_snd_consts_t;
/**
* @brief State object for CongURE QUIC
*
* @extends congure_snd_t
*/
typedef struct {
congure_snd_t super; /**< see @ref congure_snd_t */
/**
* @brief Constants
*/
const congure_quic_snd_consts_t *consts;
/**
* @brief Timestamp in milliseconds of when the first RTT sample was
* obtained
*/
ztimer_now_t first_rtt_sample;
/**
* @brief Sum of caller-defined units of message sizes of all messages
* that are yet not ack'd or declared lost
*/
unsigned in_flight_size;
/**
* @brief Timestamp in milliseconds of when congestion was first detected.
*
* This is the time when congestion recovery mode is entered.
*/
ztimer_now_t recovery_start;
/**
* @brief Slow start threshold in caller-defined units.
*
* When congure_quic_snd_t::cwnd is below congure_quic_snd_t::ssthresh the
* algorithm is in slow start mode and congure_quic_snd_t::cwnd grows in
* number of caller-defined units of acknowledged messages sizes
*/
congure_wnd_size_t ssthresh;
/**
* @brief The smoothed RTT of a connection between peers in milliseconds
*/
uint16_t smoothed_rtt;
/**
* @brief The RTT variation
*/
uint16_t rtt_var;
/**
* @brief The minimum RTT seen over a period of time
*/
uint16_t min_rtt;
/**
* @brief Set to one if congestion control should is limited by the
* application or flow control
*
* Should be supplied and may be changed by user before calling a @ref
* sys_congure function.
*
* @see [RFC 9002, Appendix B.5](https://tools.ietf.org/html/rfc9002#appendix-B.5)
*/
uint16_t limited;
/**
* @brief Advertised maximum amount of time in milliseconds a receiver
* intends to delay its acknowledgements
*
* Used to establish persistent congestion.
*
* Should be supplied and may be changed by user before calling a @ref
* sys_congure function. If this value is not provided by the * protocol,
* leave it at 0.
*/
uint16_t max_ack_delay;
} congure_quic_snd_t;
/**
* @brief Set's up the driver for a CongURE QUIC object
*
* @pre inter_msg_interval_numerator of `consts` must be greater than or equal
* to its inter_msg_interval_denominator.
* See [RFC 9002, section 7.7](https://tools.ietf.org/html/rfc9002#section-7.7):
* > Using a value for "N" that is small, but at least 1 (for
* > example, 1.25) ensures that variations in round-trip time do not
* > result in under-utilization of the congestion window.
*
* @param[in] c A CongURE QUIC object.
* @param[in] consts The constants to use for @p c.
* congure_quic_snd_consts_t::inter_msg_interval_numerator
* must be greater than or equal to
* congure_quic_snd_consts_t::inter_msg_interval_denominator
*/
void congure_quic_snd_setup(congure_quic_snd_t *c,
const congure_quic_snd_consts_t *consts);
#ifdef __cplusplus
}
#endif
#endif /* CONGURE_QUIC_H */
/** @} */

View File

@ -0,0 +1,18 @@
include ../Makefile.tests_common
USEMODULE += congure_quic
USEMODULE += congure_test
USEMODULE += fmt
USEMODULE += shell
INCLUDES += -I$(CURDIR)
include $(RIOTBASE)/Makefile.include
ifndef CONFIG_SHELL_NO_ECHO
CFLAGS += -DCONFIG_SHELL_NO_ECHO=1
endif
ifndef CONFIG_CONGURE_TEST_LOST_MSG_POOL_SIZE
CFLAGS += -DCONFIG_CONGURE_TEST_LOST_MSG_POOL_SIZE=6
endif

View File

@ -0,0 +1,11 @@
BOARD_INSUFFICIENT_MEMORY := \
arduino-duemilanove \
arduino-leonardo \
arduino-nano \
arduino-uno \
atmega328p \
atmega328p-xplained-mini \
nucleo-l011k4 \
samd10-xmini \
stm32f030f4-demo \
#

View File

@ -0,0 +1,30 @@
Tests for the CongURE QUIC implementation
=========================================
This test tests the `congure_quic` implementation.
Usage
-----
The test requires an up-to-date version of `riotctrl` with `rapidjson` support:
```console
$ pip install --upgrade riotctrl[rapidjson]
```
Then simply run the application using:
```console
$ BOARD="<board>" make flash test
```
It can also executed with pytest:
```console
$ pytest tests/01-run.py
```
Expected result
---------------
The application's test script passes without error code.

View File

@ -0,0 +1,4 @@
CONFIG_KCONFIG_USEMODULE_CONGURE_TEST=y
CONFIG_KCONFIG_USEMODULE_SHELL=y
CONFIG_CONGURE_TEST_LOST_MSG_POOL_SIZE=6
CONFIG_SHELL_NO_ECHO=y

View File

@ -0,0 +1,10 @@
CONFIG_MODULE_CONGURE=y
CONFIG_MODULE_CONGURE_QUIC=y
CONFIG_MODULE_CONGURE_TEST=y
CONFIG_MODULE_FMT=y
CONFIG_MODULE_SHELL=y
CONFIG_MODULE_ZTIMER=y
CONFIG_MODULE_ZTIMER_MSEC=y
CONFIG_CONGURE_TEST_LOST_MSG_POOL_SIZE=6
CONFIG_SHELL_NO_ECHO=y

View File

@ -0,0 +1,82 @@
/*
* Copyright (C) 2021 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @{
*
* @file
* @author Martine Lenders <m.lenders@fu-berlin.de>
*/
#include <stdbool.h>
#include "kernel_defines.h"
#include "congure_impl.h"
static unsigned _event_cb_calls;
static void *_event_cb_arg;
static void _event_cb(void *);
static const congure_quic_snd_consts_t _consts[] = {
{
.cong_event_cb = NULL,
.init_wnd = 12000,
.min_wnd = 2400,
.init_rtt = 333,
.max_msg_size = 1200,
.pc_thresh = 3000,
.granularity = 1,
.loss_reduction_numerator = 1,
.loss_reduction_denominator = 2,
.inter_msg_interval_numerator = 5,
.inter_msg_interval_denominator = 4,
},
{
.cong_event_cb = _event_cb,
.init_wnd = 12000,
.min_wnd = 2400,
.init_rtt = 333,
.max_msg_size = 1200,
.pc_thresh = 3000,
.granularity = 1,
.loss_reduction_numerator = 1,
.loss_reduction_denominator = 2,
.inter_msg_interval_numerator = 5,
.inter_msg_interval_denominator = 4,
},
};
int congure_test_snd_setup(congure_test_snd_t *c, unsigned id)
{
if (id >= ARRAY_SIZE(_consts)) {
return -1;
}
_event_cb_calls = 0;
_event_cb_arg = NULL;
congure_quic_snd_setup(c, &_consts[id]);
return 0;
}
unsigned congure_quic_test_get_event_cb_calls(void)
{
return _event_cb_calls;
}
void *congure_quic_test_get_event_cb_arg(void)
{
return _event_cb_arg;
}
static void _event_cb(void *ctx)
{
_event_cb_calls++;
_event_cb_arg = ctx;
}
/** @} */

View File

@ -0,0 +1,36 @@
/*
* Copyright (C) 2021 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @{
*
* @file
*
* @author Martine Lenders <m.lenders@fu-berlin.de>
*/
#ifndef CONGURE_IMPL_H
#define CONGURE_IMPL_H
#include "congure/quic.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef congure_quic_snd_t congure_test_snd_t;
int congure_test_snd_setup(congure_test_snd_t *c, unsigned id);
unsigned congure_quic_test_get_event_cb_calls(void);
void *congure_quic_test_get_event_cb_arg(void);
#ifdef __cplusplus
}
#endif
#endif /* CONGURE_IMPL_H */
/** @} */

218
tests/congure_quic/main.c Normal file
View File

@ -0,0 +1,218 @@
/*
* Copyright (C) 2021 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @{
*
* @file
* @author Martine S. Lenders <m.lenders@fu-berlin.de>
*/
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include "clist.h"
#include "congure/test.h"
#include "fmt.h"
#include "shell.h"
#include "congure_impl.h"
static int _json_statham(int argc, char **argv);
static int _set_cwnd(int argc, char **argv);
static int _set_ssthresh(int argc, char **argv);
static int _set_limited(int argc, char **argv);
static int _set_max_ack_delay(int argc, char **argv);
static int _set_recovery_start(int argc, char **argv);
static int _get_event_cb(int argc, char **argv);
static congure_quic_snd_t _congure_state;
static const shell_command_t shell_commands[] = {
{ "state", "Prints current CongURE state object as JSON", _json_statham },
{ "set_cwnd", "Set cwnd member for CongURE state object", _set_cwnd },
{ "set_ssthresh", "Set ssthresh member for CongURE state object",
_set_ssthresh },
{ "set_limited", "Set limited member for CongURE state object",
_set_limited },
{ "set_max_ack_delay", "Set max_ack_delay member for CongURE state object",
_set_max_ack_delay },
{ "set_recovery_start", "Set recovery_start member for CongURE state object",
_set_recovery_start },
{ "get_event_cb",
"Get state of cong_event_cb mock of CongURE state object",
_get_event_cb },
{ NULL, NULL, NULL }
};
int main(void)
{
char line_buf[SHELL_DEFAULT_BUFSIZE];
shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE);
return 0;
}
congure_test_snd_t *congure_test_get_state(void)
{
return &_congure_state;
}
#define PRINT_FIELD_PTR(obj_ptr, field) \
print_str("\"" #field "\":\"0x"); \
print_u32_hex((intptr_t)((obj_ptr).field)); \
print_str("\",")
#define PRINT_FIELD_UINT(obj, field) \
print_str("\"" #field "\":"); \
print_u32_dec((obj).field); \
print_str(",")
#define PRINT_FIELD_BOOL(obj, field) \
print_str("\"" #field "\":"); \
print_str(((obj).field) ? "true" : "false"); \
print_str(",")
static void _print_congure_quic_consts(const congure_quic_snd_consts_t *consts)
{
print_str("\"consts\":");
if (consts) {
print_str("{");
PRINT_FIELD_PTR(*consts, cong_event_cb);
PRINT_FIELD_UINT(*consts, init_wnd);
PRINT_FIELD_UINT(*consts, min_wnd);
PRINT_FIELD_UINT(*consts, init_rtt);
PRINT_FIELD_UINT(*consts, max_msg_size);
PRINT_FIELD_UINT(*consts, pc_thresh);
PRINT_FIELD_UINT(*consts, granularity);
PRINT_FIELD_UINT(*consts, loss_reduction_numerator);
PRINT_FIELD_UINT(*consts, loss_reduction_denominator);
PRINT_FIELD_UINT(*consts, inter_msg_interval_numerator);
PRINT_FIELD_UINT(*consts, inter_msg_interval_denominator);
print_str("},");
}
else {
print_str("null,");
}
}
static int _json_statham(int argc, char **argv)
{
(void)argc;
(void)argv;
print_str("{");
PRINT_FIELD_PTR(_congure_state.super, ctx);
PRINT_FIELD_UINT(_congure_state.super, cwnd);
_print_congure_quic_consts(_congure_state.consts);
PRINT_FIELD_UINT(_congure_state, first_rtt_sample);
PRINT_FIELD_UINT(_congure_state, in_flight_size);
PRINT_FIELD_UINT(_congure_state, recovery_start);
PRINT_FIELD_UINT(_congure_state, ssthresh);
PRINT_FIELD_UINT(_congure_state, smoothed_rtt);
PRINT_FIELD_UINT(_congure_state, rtt_var);
PRINT_FIELD_UINT(_congure_state, min_rtt);
PRINT_FIELD_BOOL(_congure_state, limited);
PRINT_FIELD_UINT(_congure_state, max_ack_delay);
print_str("}\n");
return 0;
}
static int _set_cwnd(int argc, char **argv)
{
uint32_t tmp;
if (argc < 2) {
print_str("{\"error\":\"`cwnd` argument expected\"}");
return 1;
}
tmp = scn_u32_dec(argv[1], strlen(argv[1]));
if (tmp > CONGURE_WND_SIZE_MAX) {
print_str("{\"error\":\"`ssthresh` not 16 bit wide\"}\n");
}
_congure_state.super.cwnd = (congure_wnd_size_t)tmp;
return 0;
}
static int _set_ssthresh(int argc, char **argv)
{
uint32_t tmp;
if (argc < 2) {
print_str("{\"error\":\"`ssthresh` argument expected\"}");
return 1;
}
tmp = scn_u32_dec(argv[1], strlen(argv[1]));
if (tmp > CONGURE_WND_SIZE_MAX) {
print_str("{\"error\":\"`ssthresh` not 16 bit wide\"}\n");
}
_congure_state.ssthresh = (congure_wnd_size_t)tmp;
return 0;
}
static int _set_limited(int argc, char **argv)
{
uint32_t tmp;
if (argc < 2) {
print_str("{\"error\":\"`limited` argument expected\"}");
return 1;
}
tmp = scn_u32_dec(argv[1], strlen(argv[1]));
if (tmp > UINT16_MAX) {
print_str("{\"error\":\"`limited` not 16 bit wide\"}\n");
}
_congure_state.limited = (uint16_t)tmp;
return 0;
}
static int _set_max_ack_delay(int argc, char **argv)
{
uint32_t tmp;
if (argc < 2) {
print_str("{\"error\":\"`max_ack_delay` argument expected\"}");
return 1;
}
tmp = scn_u32_dec(argv[1], strlen(argv[1]));
if (tmp > UINT16_MAX) {
print_str("{\"error\":\"`max_ack_delay` not 16 bit wide\"}\n");
}
_congure_state.max_ack_delay = (uint16_t)tmp;
return 0;
}
static int _set_recovery_start(int argc, char **argv)
{
if (argc < 2) {
print_str("{\"error\":\"`recovery_start` argument expected\"}");
return 1;
}
_congure_state.recovery_start = scn_u32_dec(argv[1], strlen(argv[1]));
return 0;
}
static int _get_event_cb(int argc, char **argv)
{
(void)argc;
(void)argv;
print_str("{\"event_cb\":");
print_str("{\"calls\":");
print_u32_dec(congure_quic_test_get_event_cb_calls());
print_str(",");
print_str("\"last_args\":{\"ctx\":\"0x");
print_u32_hex((intptr_t)congure_quic_test_get_event_cb_arg());
print_str("\"}");
print_str("}");
print_str("}\n");
return 0;
}
/** @} */

View File

@ -0,0 +1,871 @@
#! /usr/bin/env python3
# Copyright (C) 2021 Freie Universität Berlin
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import logging
import sys
import unittest
from riotctrl.ctrl import RIOTCtrl
from riotctrl.shell.json import RapidJSONShellInteractionParser, rapidjson
from riotctrl_shell.congure_test import CongureTest
class TestCongUREBase(unittest.TestCase):
DEBUG = False
# pylint: disable=too-many-public-methods
# it's just one more ...
@classmethod
def setUpClass(cls):
cls.ctrl = RIOTCtrl()
cls.ctrl.reset()
cls.ctrl.start_term()
if cls.DEBUG:
cls.ctrl.term.logfile = sys.stdout
cls.shell = CongureTest(cls.ctrl)
cls.json_parser = RapidJSONShellInteractionParser()
cls.json_parser.set_parser_args(
parse_mode=rapidjson.PM_TRAILING_COMMAS
)
cls.logger = logging.getLogger(cls.__name__)
if cls.DEBUG:
cls.logger.setLevel(logging.DEBUG)
@classmethod
def tearDownClass(cls):
cls.ctrl.stop_term()
def setUp(self):
self.shell.clear()
def tearDown(self):
self.shell.msgs_reset()
def _parse(self, res):
self.logger.debug(res)
if res.strip():
return self.json_parser.parse(res)
return None
def exec_cmd(self, cmd, timeout=-1, async_=False):
res = self.shell.cmd(cmd, timeout, async_)
return self._parse(res)
def assertSlowStart(self, state):
# pylint: disable=invalid-name
# trying to be in line with `unittest`
"""
https://tools.ietf.org/html/rfc9002#section-7.3.1
> A NewReno sender is in slow start any time the congestion window is
> below the slow start threshold. A sender begins in slow start because
> the slow start threshold is initialized to an infinite value.
"""
self.assertLess(state['cwnd'], state['ssthresh'])
def assertCongestionAvoidance(self, state):
# pylint: disable=invalid-name
# trying to be in line with `unittest`
"""
https://tools.ietf.org/html/rfc9002#section-7.3.3
> A NewReno sender is in congestion avoidance any time the congestion
> window is at or above the slow start threshold and not in a recovery
> period.
"""
self.assertGreaterEqual(state['cwnd'], state['ssthresh'])
def assertInRecovery(self, state):
# pylint: disable=invalid-name
# trying to be in line with `unittest`
"""recovery_start is set to current system time when entering recovery
period"""
self.assertGreater(state['recovery_start'], 0)
def assertNotInRecovery(self, state):
# pylint: disable=invalid-name
# trying to be in line with `unittest`
"""recovery_start is reset to 0 when leaving recovery period"""
self.assertEqual(state['recovery_start'], 0)
def get_event_cb(self):
res = self.exec_cmd('get_event_cb')
return res['event_cb']
def set_same_wnd_adv(self, value):
self.exec_cmd('set_same_wnd_adv {value:d}'.format(value=value))
def set_cwnd(self, cwnd):
self.exec_cmd('set_cwnd {cwnd}'.format(cwnd=cwnd))
def set_ssthresh(self, ssthresh):
self.exec_cmd('set_ssthresh {ssthresh}'.format(ssthresh=ssthresh))
def set_limited(self, limited):
self.exec_cmd('set_limited {limited:d}'.format(limited=limited))
def set_max_ack_delay(self, max_ack_delay):
self.exec_cmd('set_max_ack_delay {max_ack_delay:d}'
.format(max_ack_delay=max_ack_delay))
def set_recovery_start(self, recovery_start):
self.exec_cmd('set_recovery_start {recovery_start}'
.format(recovery_start=recovery_start))
def cong_state(self):
return self.exec_cmd('state')
def cong_init(self, ctx=0):
res = self.shell.init(ctx)
return self._parse(res)
def cong_inter_msg_interval(self, msg_size):
res = self.shell.inter_msg_interval(msg_size)
return self._parse(res)['success']
def cong_report_msg_sent(self, msg_size):
res = self.shell.report_msg_sent(msg_size)
return self._parse(res)
def cong_report_msg_discarded(self, msg_size):
res = self.shell.report_msg_discarded(msg_size)
return self._parse(res)
def cong_report_msgs_timeout(self, msgs):
res = self.shell.report_msgs_timeout(msgs)
return self._parse(res)
def cong_report_msgs_lost(self, msgs):
res = self.shell.report_msgs_lost(msgs)
return self._parse(res)
def cong_report_msg_acked(self, msg, ack):
res = self.shell.report_msg_acked(msg, ack)
return self._parse(res)
def cong_report_ecn_ce(self, time):
res = self.shell.report_ecn_ce(time)
return self._parse(res)
class TestCongUREQUICWithoutSetup(TestCongUREBase):
def test_no_setup(self):
state = self.cong_state()
self.assertEqual(state, {
'ctx': '0x00000000',
'cwnd': 0,
'consts': None,
'first_rtt_sample': 0,
'in_flight_size': 0,
'recovery_start': 0,
'ssthresh': 0,
'smoothed_rtt': 0,
'rtt_var': 0,
'min_rtt': 0,
'limited': False,
'max_ack_delay': 0,
})
class TestCongUREQUICDefaultInitTests(TestCongUREBase):
"""
The implementation is based on
https://tools.ietf.org/html/rfc9002#appendix-B
so we can check the viability based on that.
"""
def setUp(self):
super().setUp()
res = self._parse(self.shell.setup(0))
self.assertIn('success', res)
def test_setup(self):
state = self.cong_state()
self.assertIsNotNone(state['consts'])
self.assertEqual(int(state['consts']['cong_event_cb'], base=16), 0)
self.assertEqual(state['consts']['init_wnd'], 12000)
self.assertEqual(state['consts']['min_wnd'], 2400)
self.assertEqual(state['consts']['init_rtt'], 333)
self.assertEqual(state['consts']['max_msg_size'], 1200)
self.assertEqual(state['consts']['pc_thresh'], 3000)
self.assertEqual(state['consts']['granularity'], 1)
self.assertEqual(state['consts']['loss_reduction_numerator'], 1)
self.assertEqual(state['consts']['loss_reduction_denominator'], 2)
self.assertEqual(state['consts']['inter_msg_interval_numerator'], 5)
self.assertEqual(state['consts']['inter_msg_interval_denominator'], 4)
def test_init(self):
"""
> B.3. Initialization
>
> At the beginning of the connection, initialize the congestion
> control variables as follows:
>
> congestion_window = kInitialWindow
> bytes_in_flight = 0
> congestion_recovery_start_time = 0
> ssthresh = infinite
Packet number spaces are QUIC protocol-specific and thus not
implemented
Some recovery variables also apply for congestion, thus we also need to
check if https://tools.ietf.org/html/rfc9002#appendix-A.4
upholds:
> A.4. Initialization
>
> At the beginning of the connection, initialize the loss detection
> variables as follows:
>
> (loss_detection_timer.reset())
> (pto_count = 0)
> (latest_rtt = 0)
> smoothed_rtt = kInitialRtt
> rttvar = kInitialRtt / 2
> min_rtt = 0
> (first_rtt_sample = 0)
Again, packet number spaces are QUIC protocol-specific and thus not
implemented
"""
res = self.cong_init()
self.assertIn('success', res)
state = self.cong_state()
# congestion_window = kInitialWindow
self.assertEqual(state['consts']['init_wnd'], state['cwnd'])
# bytes_in_flight = 0
self.assertEqual(state['in_flight_size'], 0)
# recovery_start = 0
self.assertNotInRecovery(state)
# ssthresh = infinite
self.assertEqual(state['ssthresh'], 0xffff)
# smoothed_rtt = kInitialRtt
self.assertEqual(state['smoothed_rtt'], state['consts']['init_rtt'])
# rttvar = kInitialRtt / 2
self.assertEqual(state['rtt_var'], state['consts']['init_rtt'] // 2)
# min_rtt = 0
self.assertEqual(state['min_rtt'], 0)
class TestCongUREQUICDefault(TestCongUREBase):
"""
The implementation is based on
https://tools.ietf.org/html/rfc9002#appendix-B
so we can check the viability based on that.
"""
def setUp(self):
super().setUp()
res = self._parse(self.shell.setup(0))
self.assertIn('success', res)
res = self.cong_init()
self.assertIn('success', res)
self.sent_bytes = 42
self.ack_delay = 10
def test_on_packet_sent(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.4
"""
state = self.cong_state()
init_in_flight_size = state['in_flight_size']
self.cong_report_msg_sent(self.sent_bytes)
state = self.cong_state()
self.assertEqual(state['in_flight_size'],
init_in_flight_size + self.sent_bytes)
def _send_msg_recv_ack(self, send_time=1000, recv_time=1120):
# put ACK'd packet in flight
self.test_on_packet_sent()
msg = {'send_time': send_time, 'size': self.sent_bytes, 'resends': 0}
ack = {'recv_time': recv_time, 'id': 1337, 'size': 0, 'clean': True,
'wnd': 1234, 'delay': self.ack_delay}
self.cong_report_msg_acked(msg=msg, ack=ack)
# this method is reused a lot, so reset internal message buffer of
# `congure_test`
res = self._parse(self.shell.msgs_reset())
self.assertIn('success', res)
return msg, ack
def test_on_packet_ack_no_rtt_sample(self, state=None):
"""
See https://tools.ietf.org/html/rfc9002#appendix-A.7
"""
if state is None:
state = self.cong_state()
# From A.7, UpdateRtt
# if (first_rtt_sample == 0):
self.assertEqual(state['first_rtt_sample'], 0)
msg, ack = self._send_msg_recv_ack()
state = self.cong_state()
# min_rtt = latest_rtt
self.assertEqual(state['min_rtt'], ack['recv_time'] - msg['send_time'])
# smoothed_rtt = latest_rtt
self.assertEqual(state['smoothed_rtt'],
ack['recv_time'] - msg['send_time'])
# rttvar = latest_rtt / 2
self.assertEqual(state['rtt_var'],
(ack['recv_time'] - msg['send_time']) // 2)
# first_rtt_sample = now()
self.assertGreater(state['first_rtt_sample'], 0)
return msg, ack
def assertRTTSampleCorrect(self, state, init_min_rtt, init_smoothed_rtt,
init_rtt_var, adjusted_rtt):
# rttvar = 3/4 * rttvar + 1/4 * abs(smoothed_rtt - adjusted_rtt)
self.assertEqual(
state['rtt_var'],
((3 * init_rtt_var) // 4) +
abs(init_smoothed_rtt - adjusted_rtt) // 4
)
# smoothed_rtt = 7/8 * smoothed_rtt + 1/8 * adjusted_rtt
self.assertEqual(
state['smoothed_rtt'],
((7 * init_smoothed_rtt) // 8) + (adjusted_rtt // 8)
)
# first_rtt_sample = now()
self.assertGreater(state['first_rtt_sample'], 0)
def test_on_packet_ack_rtt_sample_greater_rtt(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-A.7
"""
# set first RTT sample
self.test_on_packet_ack_no_rtt_sample()
state = self.cong_state()
init_min_rtt = state['min_rtt']
init_smoothed_rtt = state['smoothed_rtt']
init_rtt_var = state['rtt_var']
# From A.7, UpdateRtt
# if (first_rtt_sample == 0):
self.assertNotEqual(state['first_rtt_sample'], 0)
# another message ACK pair, but with larger RTT
msg, ack = self._send_msg_recv_ack(send_time=1140, recv_time=1300)
state = self.cong_state()
# min_rtt = min(min_rtt, latest_rtt)
self.assertGreater(ack['recv_time'] - msg['send_time'], init_min_rtt)
# latest_rtt is greater, so it should stay the same
self.assertEqual(state['min_rtt'], init_min_rtt)
# as min_rtt == 120, ack_delay == 10, latest_rtt = 160
# => adjusted_rtt = latest_rtt - ack_delay
adjusted_rtt = ack['recv_time'] - msg['send_time'] - ack['delay']
self.assertRTTSampleCorrect(state, init_min_rtt, init_smoothed_rtt,
init_rtt_var, adjusted_rtt)
def test_on_packet_ack_rtt_sample_less_rtt(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-A.7
"""
# set first RTT sample
self.test_on_packet_ack_no_rtt_sample()
state = self.cong_state()
init_min_rtt = state['min_rtt']
init_smoothed_rtt = state['smoothed_rtt']
init_rtt_var = state['rtt_var']
# From A.7, UpdateRtt
# if (first_rtt_sample == 0):
self.assertNotEqual(state['first_rtt_sample'], 0)
# another message ACK pair, but with larger RTT
msg, ack = self._send_msg_recv_ack(send_time=1140, recv_time=1250)
state = self.cong_state()
# min_rtt = min(min_rtt, latest_rtt)
self.assertLess(ack['recv_time'] - msg['send_time'], init_min_rtt)
# latest_rtt is less, so it should become latest_rtt
self.assertEqual(state['min_rtt'], ack['recv_time'] - msg['send_time'])
# as min_rtt == 110, ack_delay == 10, latest_rtt = 110
# => adjusted_rtt = latest_rtt (so unadjusted)
adjusted_rtt = ack['recv_time'] - msg['send_time']
self.assertRTTSampleCorrect(state, init_min_rtt, init_smoothed_rtt,
init_rtt_var, adjusted_rtt)
def test_on_packet_ack_not_limited_slow_start(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.5
"""
state = self.cong_state()
self.assertSlowStart(state)
self.assertNotInRecovery(state)
self.assertFalse(state['limited'])
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly
msg, ack = self.test_on_packet_ack_no_rtt_sample(state)
state = self.cong_state()
# bytes_in_flight -= acked_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
# // Slow start.
# congestion_window += acked_packet.sent_bytes
self.assertEqual(state['cwnd'], init_cwnd + msg['size'])
def test_on_packet_ack_not_limited_congestion_avoidance(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.5
"""
state = self.cong_state()
# enforce congestion avoidance
self.set_ssthresh(state['cwnd'] - 100)
state = self.cong_state()
self.assertCongestionAvoidance(state)
self.assertNotInRecovery(state)
self.assertFalse(state['limited'])
init_cwnd = state['cwnd']
init_in_flight_size = state['in_flight_size']
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly
msg, ack = self.test_on_packet_ack_no_rtt_sample(state)
state = self.cong_state()
# bytes_in_flight -= acked_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
# // Congestion avoidance.
# congestion_window +=
# max_datagram_size * acked_packet.sent_bytes
# / congestion_window
self.assertEqual(
state['cwnd'],
init_cwnd +
((state['consts']['max_msg_size'] * msg['size']) // init_cwnd)
)
def test_on_packet_ack_not_limited_recovery(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.5
"""
# enforce recovery mode for given send time 1000 and ack recv time 1120
self.set_recovery_start(1130)
state = self.cong_state()
self.assertInRecovery(state)
self.assertFalse(state['limited'])
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly
msg, ack = self.test_on_packet_ack_no_rtt_sample(state)
state = self.cong_state()
# bytes_in_flight -= acked_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
# // Do not increase congestion window in recovery period.
# if (InCongestionRecovery(acked_packet.time_sent)):
# return
self.assertEqual(state['cwnd'], init_cwnd)
def test_on_packet_ack_limited(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.5
"""
# set to limited by application or flow control
self.set_limited(True)
state = self.cong_state()
self.assertSlowStart(state)
self.assertNotInRecovery(state)
self.assertTrue(state['limited'])
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly
msg, ack = self.test_on_packet_ack_no_rtt_sample(state)
state = self.cong_state()
# bytes_in_flight -= acked_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
# // Do not increase congestion_window if application
# // limited or flow control limited.
# if (IsAppOrFlowControlLimited())
# return
self.assertEqual(state['cwnd'], init_cwnd)
def assertMaybeSendOnePacket(self, state):
"""
Maybe send one packet is modeled as cong_event_cb in congure_quic.
It is not assigned in this test case so it should not have been called.
"""
event_cb = self.get_event_cb()
self.assertEqual(event_cb['calls'], 0)
def assertOnNewCongestionEvent(self, state, init_cwnd, init_ssthresh,
in_recovery=False,
persistent_congestion=False):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.6
"""
if persistent_congestion:
# persistent congestion triggers return to slow start; see
# https://tools.ietf.org/html/rfc9002#section-7.3
self.assertNotInRecovery(state)
# See https://tools.ietf.org/html/rfc9002#appendix-B.8
# if (InPersistentCongestion(pc_lost)):
# congestion_window = kMinimumWindow
# congestion_recovery_start_time = 0
self.assertEqual(state['cwnd'], state['consts']['min_wnd'])
# persistent congestion triggers return to slow start; see
# https://tools.ietf.org/html/rfc9002#section-7.3
# or at least congestion avoidance with ssthresh being at least
# kMinimumWindow
# see https://github.com/quicwg/base-drafts/issues/4826#issuecomment-776316765
self.assertLessEqual(state['cwnd'], state['ssthresh'])
elif in_recovery:
self.assertInRecovery(state)
# // No reaction if already in a recovery period.
# if (InCongestionRecovery(sent_time)):
# return
# => ssthresh remains unchanged
self.assertEqual(state['ssthresh'], init_ssthresh)
self.assertEqual(state['cwnd'], init_cwnd)
else:
# persistent congestion triggers return to slow start; see
# https://tools.ietf.org/html/rfc9002#section-7.3
# Enter recovery period.
# congestion_recovery_start_time = now()
self.assertInRecovery(state)
# ssthresh = congestion_window * kLossReductionFactor
exp_ssthresh = (
init_cwnd * state['consts']['loss_reduction_numerator']
) // state['consts']['loss_reduction_denominator']
self.assertEqual(state['ssthresh'], exp_ssthresh)
# congestion_window = max(ssthresh, kMinimumWindow)
if exp_ssthresh < state['consts']['min_wnd']:
self.assertEqual(state['cwnd'], state['consts']['min_wnd'])
else:
self.assertEqual(state['cwnd'], exp_ssthresh)
self.assertMaybeSendOnePacket(state)
def test_on_ecn_information_not_recovery(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.7
"""
state = self.cong_state()
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.cong_report_ecn_ce(1000)
state = self.cong_state()
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh)
# just making sure we took the right path in that method
self.assertNotEqual(state['cwnd'], state['consts']['min_wnd'])
def test_on_ecn_information_not_recovery_low_ssthresh(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.7
"""
state = self.cong_state()
# enforce new cwnd to be smaller than `min_wnd`
init_cwnd = state['consts']['min_wnd']
self.set_cwnd(init_cwnd)
init_ssthresh = state['ssthresh']
self.cong_report_ecn_ce(1000)
state = self.cong_state()
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh)
# just making sure we took the right path in that method
self.assertEqual(state['cwnd'], state['consts']['min_wnd'])
def test_on_ecn_information_recovery(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.7
"""
# enforce recovery mode for given send time = 1000
self.set_recovery_start(2000)
state = self.cong_state()
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.cong_report_ecn_ce(1000)
state = self.cong_state()
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh,
in_recovery=True)
# just making sure we took the right path in that method
self.assertNotEqual(state['cwnd'], state['consts']['min_wnd'])
def _send_msgs(self, msgs):
for msg in msgs:
self.cong_report_msg_sent(msg['size'])
def _send_msgs_and_report_lost(self, msgs, init_in_flight_size):
self._send_msgs(msgs)
state = self.cong_state()
self.assertEqual(state['in_flight_size'],
init_in_flight_size + sum(m['size'] for m in msgs))
self.cong_report_msgs_lost(msgs)
def test_on_packets_lost_not_recovery_no_pc(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
msgs = [{'send_time': 1000, 'size': 124, 'resends': 1},
{'send_time': 1050, 'size': 643, 'resends': 0},
{'send_time': 1100, 'size': 134, 'resends': 0}]
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
state = self.cong_state()
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertNotInRecovery(state)
self._send_msgs_and_report_lost(msgs, init_in_flight_size)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh)
def test_on_packets_lost_not_recovery_no_pc_low_cwnd(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
msgs = [{'send_time': 1000, 'size': 124, 'resends': 1},
{'send_time': 1050, 'size': 643, 'resends': 0},
{'send_time': 1100, 'size': 134, 'resends': 0}]
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
# enforce new cwnd to be smaller than `min_wnd`
state = self.cong_state()
init_cwnd = state['consts']['min_wnd']
self.set_cwnd(init_cwnd)
state = self.cong_state()
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertNotInRecovery(state)
self._send_msgs_and_report_lost(msgs, init_in_flight_size)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh)
def test_on_packets_lost_recovery_no_pc(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
msgs = [{'send_time': 1000, 'size': 124, 'resends': 1},
{'send_time': 1050, 'size': 643, 'resends': 0},
{'send_time': 1100, 'size': 134, 'resends': 0}]
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
# enforce recovery mode for given send time = 1100
self.set_recovery_start(2000)
state = self.cong_state()
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertInRecovery(state)
self._send_msgs_and_report_lost(msgs, init_in_flight_size)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh,
in_recovery=True)
def test_on_packets_lost_not_recovery_pc(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
max_ack_delay = 120
self.set_max_ack_delay(max_ack_delay)
state = self.cong_state()
# only messages sent after the first RTT sample are considered for
# persistent congestion, so add that time to the message's send_time
offset = state['first_rtt_sample']
# See https://tools.ietf.org/html/rfc9002#section-7.6.1
# (smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay) *
# kPersistentCongestionThreshold
pc_duration = (
state['smoothed_rtt'] +
max(4 * state['rtt_var'], state['consts']['granularity']) +
max_ack_delay
) * state['consts']['pc_thresh']
# for lost in lost_packets:
# if lost.time_sent > first_rtt_sample:
# pc_lost.insert(lost)
msgs = [{'send_time': 1000 + offset, 'size': 124, 'resends': 1},
{'send_time': 1050 + offset, 'size': 643, 'resends': 0},
{'send_time': 1100 + offset + pc_duration,
'size': 134, 'resends': 0}]
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertNotInRecovery(state)
self._send_msgs_and_report_lost(msgs, init_in_flight_size)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh,
persistent_congestion=True)
def test_on_packets_lost_not_recovery_pc_low_cwnd(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
# enforce new cwnd to be smaller than `min_wnd`
state = self.cong_state()
init_cwnd = state['consts']['min_wnd']
self.set_cwnd(init_cwnd)
state = self.cong_state()
max_ack_delay = 120
self.set_max_ack_delay(max_ack_delay)
state = self.cong_state()
# only messages sent after the first RTT sample are considered for
# persistent congestion, so add that time to the message's send_time
offset = state['first_rtt_sample']
# See https://tools.ietf.org/html/rfc9002#section-7.6.1
# (smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay) *
# kPersistentCongestionThreshold
pc_duration = (
state['smoothed_rtt'] +
max(4 * state['rtt_var'], state['consts']['granularity']) +
max_ack_delay
) * state['consts']['pc_thresh']
# for lost in lost_packets:
# if lost.time_sent > first_rtt_sample:
# pc_lost.insert(lost)
msgs = [{'send_time': 1000 + offset, 'size': 124, 'resends': 1},
{'send_time': 1050 + offset, 'size': 643, 'resends': 0},
{'send_time': 1100 + offset + pc_duration,
'size': 134, 'resends': 0}]
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertNotInRecovery(state)
self._send_msgs(msgs)
self.cong_report_msgs_lost(msgs)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh,
persistent_congestion=True)
def test_on_packets_lost_recovery_pc(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.8
"""
# trigger message + ACK and check if UpdateRTT (see A.7) was called
# correctly to have an RTT sample
msg, ack = self.test_on_packet_ack_no_rtt_sample(self.cong_state())
# enforce recovery mode for given send time = 1100
self.set_recovery_start(2000)
state = self.cong_state()
max_ack_delay = 120
self.set_max_ack_delay(max_ack_delay)
state = self.cong_state()
# only messages sent after the first RTT sample are considered for
# persistent congestion, so add that time to the message's send_time
offset = state['first_rtt_sample']
# See https://tools.ietf.org/html/rfc9002#section-7.6.1
# (smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay) *
# kPersistentCongestionThreshold
pc_duration = (
state['smoothed_rtt'] +
max(4 * state['rtt_var'], state['consts']['granularity']) +
max_ack_delay
) * state['consts']['pc_thresh']
# for lost in lost_packets:
# if lost.time_sent > first_rtt_sample:
# pc_lost.insert(lost)
msgs = [{'send_time': 1000 + offset, 'size': 124, 'resends': 1},
{'send_time': 1050 + offset, 'size': 643, 'resends': 0},
{'send_time': 1100 + offset + pc_duration,
'size': 134, 'resends': 0}]
init_in_flight_size = state['in_flight_size']
init_cwnd = state['cwnd']
init_ssthresh = state['ssthresh']
self.assertInRecovery(state)
self._send_msgs(msgs)
self.cong_report_msgs_lost(msgs)
state = self.cong_state()
# for lost_packet in lost_packets:
# if lost_packet.in_flight:
# bytes_in_flight -= lost_packet.sent_bytes
self.assertEqual(state['in_flight_size'], init_in_flight_size)
self.assertOnNewCongestionEvent(state, init_cwnd, init_ssthresh,
in_recovery=True,
persistent_congestion=True)
# persistent congestion triggers return to slow start; see
# https://tools.ietf.org/html/rfc9002#section-7.3
self.assertSlowStart(state)
self.assertNotInRecovery(state)
def test_removing_discarded_packets_from_bytes_in_flight(self):
"""
See https://tools.ietf.org/html/rfc9002#appendix-B.9
"""
state = self.cong_state()
init_in_flight_size = state['in_flight_size']
self.cong_report_msg_sent(1337)
self.cong_report_msg_discarded(1337)
state = self.cong_state()
self.assertEqual(state['in_flight_size'], init_in_flight_size)
def test_pacing(self):
"""
See https://tools.ietf.org/html/rfc9002#section-7.7
"""
state = self.cong_state()
self.assertGreater(state["smoothed_rtt"], 0)
self.assertGreater(state["cwnd"], 0)
# pylint: disable=invalid-name
# name chosen to be in line with draft
N = state["consts"]["inter_msg_interval_numerator"] / \
state["consts"]["inter_msg_interval_denominator"]
# Using a value for "N" that is small, but at least 1 (for example,
# 1.25) ensures that variations in round-trip time do not result in
# under-utilization of the congestion window.
self.assertGreaterEqual(N, 1)
msg_size = 760
# interval = ( smoothed_rtt * packet_size / congestion_window ) / N
# packet_size is expected to be multiplied in by the user.
self.assertAlmostEqual(
# smoothed_rtt is in milliseconds, expected return value of
# inter_msg_interval is microseconds.
int((state["smoothed_rtt"] * msg_size * 1000) / state["cwnd"] / N),
self.cong_inter_msg_interval(msg_size)
)
class TestCongUREQUICEventCb(TestCongUREQUICDefault):
def setUp(self):
super().setUp()
res = self._parse(self.shell.setup(1))
self.assertIn('success', res)
res = self.cong_init(0xdead)
self.assertIn('success', res)
def test_setup_and_init(self):
state = self.cong_state()
self.assertNotEqual(int(state['consts']['cong_event_cb'], base=16), 0)
self.assertEqual(int(state['ctx'], base=16), 0xdead)
def assertMaybeSendOnePacket(self, state):
"""
Maybe send one packet is modeled as cong_event_cb in congure_quic.
It is assigned in this test case so it should have been called.
"""
event_cb = self.get_event_cb()
self.assertEqual(event_cb['calls'], 1)
self.assertEqual(int(event_cb['last_args']['ctx'], base=16),
int(state['ctx'], base=16))
if __name__ == '__main__':
unittest.main()