1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

Merge pull request #6969 from brummer-simon/gnrc_tcp-replace_msg_queue

gnrc_tcp: improvement: Replace tcbs msg queue with mbox
This commit is contained in:
Sebastian Meiling 2017-05-17 15:13:03 +02:00 committed by GitHub
commit f0ae5d2dd8
5 changed files with 158 additions and 112 deletions

View File

@ -327,6 +327,7 @@ ifneq (,$(filter gnrc_tcp,$(USEMODULE)))
USEMODULE += random
USEMODULE += tcp
USEMODULE += xtimer
USEMODULE += core_mbox
endif
ifneq (,$(filter gnrc_nettest,$(USEMODULE)))

View File

@ -28,6 +28,7 @@
#include "xtimer.h"
#include "mutex.h"
#include "msg.h"
#include "mbox.h"
#include "net/gnrc/pkt.h"
#include "config.h"
@ -40,9 +41,9 @@ extern "C" {
#endif
/**
* @brief Size of the TCB message queue
* @brief Size of the TCB mbox
*/
#define GNRC_TCP_TCB_MSG_QUEUE_SIZE (8U)
#define GNRC_TCP_TCB_MBOX_SIZE (8U)
/**
* @brief Transmission control block of GNRC TCP.
@ -75,8 +76,8 @@ typedef struct _transmission_control_block {
xtimer_t tim_tout; /**< Timer struct for timeouts */
msg_t msg_tout; /**< Message, sent on timeouts */
gnrc_pktsnip_t *pkt_retransmit; /**< Pointer to packet in "retransmit queue" */
kernel_pid_t owner; /**< PID of this connection handling thread */
msg_t msg_queue[GNRC_TCP_TCB_MSG_QUEUE_SIZE]; /**< TCB message queue */
msg_t mbox_raw[GNRC_TCP_TCB_MBOX_SIZE]; /**< Msg queue for mbox */
mbox_t mbox; /**< TCB mbox for synchronization */
uint8_t *rcv_buf_raw; /**< Pointer to the receive buffer */
ringbuffer_t rcv_buf; /**< Receive buffer data structure */
mutex_t fsm_lock; /**< Mutex for FSM access synchronization */

View File

@ -60,7 +60,43 @@ gnrc_tcp_tcb_t *_list_tcb_head;
mutex_t _list_tcb_lock;
/**
* @brief Establishes a new TCP connection.
* @brief Helper struct, holding all argument data for_cb_mbox_put_msg.
*/
typedef struct _cb_arg {
uint32_t msg_type; /**< Message Type to Put into mbox behind mbox_ptr */
mbox_t *mbox_ptr; /**< Pointer to mbox */
} cb_arg_t;
/**
* @brief Callback for xtimer, puts a message in a mbox.
*
* @param[in] arg Ptr to cb_arg_t. Must not be NULL or anything else.
*/
static void _cb_mbox_put_msg(void *arg)
{
msg_t msg;
msg.type = ((cb_arg_t *) arg)->msg_type;
mbox_try_put(((cb_arg_t *) arg)->mbox_ptr, &msg);
}
/**
* @brief Setup timer with a callback function.
*
* @param[in/out] timer Ptr to timer, which should be set.
* @param[in] duration Duration after @p timer expires.
* @param[in] cb Function to be called after @p duration.
* @param[in] arg Arguments for @p cb.
*/
static void _setup_timeout(xtimer_t *timer, const uint32_t duration, const xtimer_callback_t cb,
cb_arg_t *arg)
{
timer->callback = cb;
timer->arg = arg;
xtimer_set(timer, duration);
}
/**
* @brief Establishes a new TCP connection
*
* @param[in,out] tcb TCB holding the connection information.
* @param[in] target_addr Target address to connect to, if this is a active connection.
@ -79,10 +115,10 @@ mutex_t _list_tcb_lock;
static int _gnrc_tcp_open(gnrc_tcp_tcb_t *tcb, const uint8_t *target_addr, uint16_t target_port,
const uint8_t *local_addr, uint16_t local_port, uint8_t passive)
{
msg_t msg; /* Message for incomming messages */
msg_t connection_timeout_msg; /* Connection timeout message */
xtimer_t connection_timeout_timer; /* Connection timeout timer */
int8_t ret = 0; /* Return value */
msg_t msg;
xtimer_t connection_timeout;
cb_arg_t connection_timeout_arg = {MSG_TYPE_CONNECTION_TIMEOUT, &(tcb->mbox)};
int8_t ret = 0;
/* Lock the TCB for this function call */
mutex_lock(&(tcb->function_lock));
@ -93,9 +129,12 @@ static int _gnrc_tcp_open(gnrc_tcp_tcb_t *tcb, const uint8_t *target_addr, uint1
return -EISCONN;
}
/* Setup connection (common parts) */
msg_init_queue(tcb->msg_queue, GNRC_TCP_TCB_MSG_QUEUE_SIZE);
tcb->owner = thread_getpid();
/* Mark TCB as waiting for incomming messages */
tcb->status |= STATUS_WAIT_FOR_MSG;
/* 'Flush' mbox */
while (mbox_try_get(&(tcb->mbox), &msg) != 0) {
}
/* Setup passive connection */
if (passive) {
@ -125,11 +164,9 @@ static int _gnrc_tcp_open(gnrc_tcp_tcb_t *tcb, const uint8_t *target_addr, uint1
tcb->local_port = local_port;
tcb->peer_port = target_port;
/* Setup timeout: If connection could not be established before */
/* the timer expired, the connection attempt failed */
connection_timeout_msg.type = MSG_TYPE_CONNECTION_TIMEOUT;
xtimer_set_msg(&connection_timeout_timer, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
&connection_timeout_msg, tcb->owner);
/* Setup connection timeout: Put timeout message in TCBs mbox on expiration */
_setup_timeout(&connection_timeout, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
_cb_mbox_put_msg, &connection_timeout_arg);
}
/* Call FSM with event: CALL_OPEN */
@ -144,7 +181,7 @@ static int _gnrc_tcp_open(gnrc_tcp_tcb_t *tcb, const uint8_t *target_addr, uint1
/* Wait until a connection was established or closed */
while (ret >= 0 && tcb->state != FSM_STATE_CLOSED && tcb->state != FSM_STATE_ESTABLISHED &&
tcb->state != FSM_STATE_CLOSE_WAIT) {
msg_receive(&msg);
mbox_get(&(tcb->mbox), &msg);
switch (msg.type) {
case MSG_TYPE_CONNECTION_TIMEOUT:
DEBUG("gnrc_tcp.c : _gnrc_tcp_open() : CONNECTION_TIMEOUT\n");
@ -162,11 +199,11 @@ static int _gnrc_tcp_open(gnrc_tcp_tcb_t *tcb, const uint8_t *target_addr, uint1
}
/* Cleanup */
xtimer_remove(&connection_timeout_timer);
xtimer_remove(&connection_timeout);
if (tcb->state == FSM_STATE_CLOSED && ret == 0) {
ret = -ECONNREFUSED;
}
tcb->owner = KERNEL_PID_UNDEF;
tcb->status &= ~STATUS_WAIT_FOR_MSG;
mutex_unlock(&(tcb->function_lock));
return ret;
}
@ -184,8 +221,6 @@ int gnrc_tcp_init(void)
/* Initialize TCB list */
_list_tcb_head = NULL;
/* Initialize receive buffers */
_rcvbuf_init();
/* Start TCP processing thread */
@ -205,6 +240,7 @@ void gnrc_tcp_tcb_init(gnrc_tcp_tcb_t *tcb)
tcb->rtt_var = RTO_UNINITIALIZED;
tcb->srtt = RTO_UNINITIALIZED;
tcb->rto = RTO_UNINITIALIZED;
mbox_init(&(tcb->mbox), tcb->mbox_raw, GNRC_TCP_TCB_MBOX_SIZE);
mutex_init(&(tcb->fsm_lock));
mutex_init(&(tcb->function_lock));
}
@ -264,16 +300,16 @@ ssize_t gnrc_tcp_send(gnrc_tcp_tcb_t *tcb, const void *data, const size_t len,
assert(tcb != NULL);
assert(data != NULL);
msg_t msg; /* Message for incomming messages */
msg_t connection_timeout_msg; /* Connection timeout message */
msg_t probe_timeout_msg; /* Probe timeout message */
msg_t user_timeout_msg; /* User specified timeout message */
xtimer_t connection_timeout_timer; /* Connection timeout timer */
xtimer_t probe_timeout_timer; /* Probe timeout timer */
xtimer_t user_timeout_timer; /* User specified timeout timer */
uint32_t probe_timeout_duration_us = 0; /* Probe timeout duration in microseconds */
ssize_t ret = 0; /* Return value */
bool probing = false; /* True if this connection is probing */
msg_t msg;
xtimer_t connection_timeout;
cb_arg_t connection_timeout_arg = {MSG_TYPE_CONNECTION_TIMEOUT, &(tcb->mbox)};
xtimer_t user_timeout;
cb_arg_t user_timeout_arg = {MSG_TYPE_USER_SPEC_TIMEOUT, &(tcb->mbox)};
xtimer_t probe_timeout;
cb_arg_t probe_timeout_arg = {MSG_TYPE_PROBE_TIMEOUT, &(tcb->mbox)};
uint32_t probe_timeout_duration_us = 0;
ssize_t ret = 0;
bool probing_mode = false;
/* Lock the TCB for this function call */
mutex_lock(&(tcb->function_lock));
@ -284,19 +320,20 @@ ssize_t gnrc_tcp_send(gnrc_tcp_tcb_t *tcb, const void *data, const size_t len,
return -ENOTCONN;
}
/* Re-init message queue, take ownership. FSM can send messages to this thread now */
msg_init_queue(tcb->msg_queue, GNRC_TCP_TCB_MSG_QUEUE_SIZE);
tcb->owner = thread_getpid();
/* Mark TCB as waiting for incomming messages */
tcb->status |= STATUS_WAIT_FOR_MSG;
/* Setup connection timeout */
connection_timeout_msg.type = MSG_TYPE_CONNECTION_TIMEOUT;
xtimer_set_msg(&connection_timeout_timer, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
&connection_timeout_msg, tcb->owner);
/* 'Flush' mbox */
while (mbox_try_get(&(tcb->mbox), &msg) != 0) {
}
/* Setup connection timeout: Put timeout message in tcb's mbox on expiration */
_setup_timeout(&connection_timeout, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
_cb_mbox_put_msg, &connection_timeout_arg);
/* Setup user specified timeout if timeout_us is greater than zero */
if (timeout_duration_us > 0) {
user_timeout_msg.type = MSG_TYPE_USER_SPEC_TIMEOUT;
xtimer_set_msg(&user_timeout_timer, timeout_duration_us, &user_timeout_msg, tcb->owner);
_setup_timeout(&user_timeout, timeout_duration_us, _cb_mbox_put_msg, &user_timeout_arg);
}
/* Loop until something was sent and acked */
@ -310,23 +347,22 @@ ssize_t gnrc_tcp_send(gnrc_tcp_tcb_t *tcb, const void *data, const size_t len,
/* If the send window is closed: Setup Probing */
if (tcb->snd_wnd <= 0) {
/* If this is the first probe: Setup probing duration */
if (!probing) {
probing = true;
if (!probing_mode) {
probing_mode = true;
probe_timeout_duration_us = tcb->rto;
}
/* Initialize probe timer */
probe_timeout_msg.type = MSG_TYPE_PROBE_TIMEOUT;
xtimer_set_msg(&probe_timeout_timer, probe_timeout_duration_us, &probe_timeout_msg,
tcb->owner);
/* Setup probe timeout */
_setup_timeout(&probe_timeout, timeout_duration_us, _cb_mbox_put_msg,
&probe_timeout_arg);
}
/* Try to send data if nothing has been sent and we are not probing */
if (ret == 0 && !probing) {
/* Try to send data in case there nothing has been sent and we are not probing */
if (ret == 0 && !probing_mode) {
ret = _fsm(tcb, FSM_EVENT_CALL_SEND, NULL, (void *) data, len);
}
/* Wait for responses */
msg_receive(&msg);
mbox_get(&(tcb->mbox), &msg);
switch (msg.type) {
case MSG_TYPE_CONNECTION_TIMEOUT:
DEBUG("gnrc_tcp.c : gnrc_tcp_send() : CONNECTION_TIMEOUT\n");
@ -357,14 +393,15 @@ ssize_t gnrc_tcp_send(gnrc_tcp_tcb_t *tcb, const void *data, const size_t len,
case MSG_TYPE_NOTIFY_USER:
DEBUG("gnrc_tcp.c : gnrc_tcp_send() : NOTIFY_USER\n");
/* Connection is alive: Reset connection timeout */
xtimer_set_msg(&connection_timeout_timer, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
&connection_timeout_msg, tcb->owner);
/* Connection is alive: Reset Connection Timeout */
_setup_timeout(&connection_timeout, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
_cb_mbox_put_msg, &connection_timeout_arg);
/* If the window re-opened and we are probing: Stop it */
if (tcb->snd_wnd > 0 && probing) {
probing = false;
xtimer_remove(&probe_timeout_timer);
if (tcb->snd_wnd > 0 && probing_mode) {
probing_mode = false;
xtimer_remove(&probe_timeout);
}
break;
@ -374,10 +411,10 @@ ssize_t gnrc_tcp_send(gnrc_tcp_tcb_t *tcb, const void *data, const size_t len,
}
/* Cleanup */
xtimer_remove(&probe_timeout_timer);
xtimer_remove(&connection_timeout_timer);
xtimer_remove(&user_timeout_timer);
tcb->owner = KERNEL_PID_UNDEF;
xtimer_remove(&probe_timeout);
xtimer_remove(&connection_timeout);
xtimer_remove(&user_timeout);
tcb->status &= ~STATUS_WAIT_FOR_MSG;
mutex_unlock(&(tcb->function_lock));
return ret;
}
@ -388,12 +425,12 @@ ssize_t gnrc_tcp_recv(gnrc_tcp_tcb_t *tcb, void *data, const size_t max_len,
assert(tcb != NULL);
assert(data != NULL);
msg_t msg; /* Message for incomming messages */
msg_t connection_timeout_msg; /* Connection timeout message */
msg_t user_timeout_msg; /* User specified timeout message */
xtimer_t connection_timeout_timer; /* Connection timeout timer */
xtimer_t user_timeout_timer; /* User specified timeout timer */
ssize_t ret = 0; /* Return value */
msg_t msg;
xtimer_t connection_timeout;
cb_arg_t connection_timeout_arg = {MSG_TYPE_CONNECTION_TIMEOUT, &(tcb->mbox)};
xtimer_t user_timeout;
cb_arg_t user_timeout_arg = {MSG_TYPE_USER_SPEC_TIMEOUT, &(tcb->mbox)};
ssize_t ret = 0;
/* Lock the TCB for this function call */
mutex_lock(&(tcb->function_lock));
@ -415,18 +452,19 @@ ssize_t gnrc_tcp_recv(gnrc_tcp_tcb_t *tcb, void *data, const size_t max_len,
return ret;
}
/* If this call is blocking, setup messages and timers */
msg_init_queue(tcb->msg_queue, GNRC_TCP_TCB_MSG_QUEUE_SIZE);
tcb->owner = thread_getpid();
/* Mark TCB as waiting for incomming messages */
tcb->status |= STATUS_WAIT_FOR_MSG;
/* Setup connection timeout */
connection_timeout_msg.type = MSG_TYPE_CONNECTION_TIMEOUT;
xtimer_set_msg(&connection_timeout_timer, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
&connection_timeout_msg, tcb->owner);
/* 'Flush' mbox */
while (mbox_try_get(&(tcb->mbox), &msg) != 0) {
}
/* Setup connection timeout: Put timeout message in tcb's mbox on expiration */
_setup_timeout(&connection_timeout, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
_cb_mbox_put_msg, &connection_timeout_arg);
/* Setup user specified timeout */
user_timeout_msg.type = MSG_TYPE_USER_SPEC_TIMEOUT;
xtimer_set_msg(&user_timeout_timer, timeout_duration_us, &user_timeout_msg, tcb->owner);
_setup_timeout(&user_timeout, timeout_duration_us, _cb_mbox_put_msg, &user_timeout_arg);
/* Processing loop */
while (ret == 0) {
@ -441,7 +479,7 @@ ssize_t gnrc_tcp_recv(gnrc_tcp_tcb_t *tcb, void *data, const size_t max_len,
/* If there was no data: Wait for next packet or until the timeout fires */
if (ret <= 0) {
msg_receive(&msg);
mbox_get(&(tcb->mbox), &msg);
switch (msg.type) {
case MSG_TYPE_CONNECTION_TIMEOUT:
DEBUG("gnrc_tcp.c : gnrc_tcp_recv() : CONNECTION_TIMEOUT\n");
@ -466,9 +504,9 @@ ssize_t gnrc_tcp_recv(gnrc_tcp_tcb_t *tcb, void *data, const size_t max_len,
}
/* Cleanup */
xtimer_remove(&connection_timeout_timer);
xtimer_remove(&user_timeout_timer);
tcb->owner = KERNEL_PID_UNDEF;
xtimer_remove(&connection_timeout);
xtimer_remove(&user_timeout);
tcb->status &= ~STATUS_WAIT_FOR_MSG;
mutex_unlock(&(tcb->function_lock));
return ret;
}
@ -477,49 +515,54 @@ void gnrc_tcp_close(gnrc_tcp_tcb_t *tcb)
{
assert(tcb != NULL);
msg_t msg; /* Message for incomming messages */
msg_t connection_timeout_msg; /* Connection timeout message */
xtimer_t connection_timeout_timer; /* Connection timeout timer */
msg_t msg;
xtimer_t connection_timeout;
cb_arg_t connection_timeout_arg = {MSG_TYPE_CONNECTION_TIMEOUT, &(tcb->mbox)};
/* Lock the TCB for this function call */
mutex_lock(&(tcb->function_lock));
/* Start connection teardown if the connection was not closed before */
if (tcb->state != FSM_STATE_CLOSED) {
/* Take ownership */
msg_init_queue(tcb->msg_queue, GNRC_TCP_TCB_MSG_QUEUE_SIZE);
tcb->owner = thread_getpid();
/* Return if connection is closed */
if (tcb->state == FSM_STATE_CLOSED) {
mutex_unlock(&(tcb->function_lock));
return 0;
}
/* Setup connection timeout */
connection_timeout_msg.type = MSG_TYPE_CONNECTION_TIMEOUT;
xtimer_set_msg(&connection_timeout_timer, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
&connection_timeout_msg, tcb->owner);
/* Mark TCB as waiting for incomming messages */
tcb->status |= STATUS_WAIT_FOR_MSG;
/* Start connection teardown sequence */
_fsm(tcb, FSM_EVENT_CALL_CLOSE, NULL, NULL, 0);
/* 'Flush' mbox */
while (mbox_try_get(&(tcb->mbox), &msg) != 0) {
}
/* Loop until the connection has been closed */
while (tcb->state != FSM_STATE_CLOSED) {
msg_receive(&msg);
switch (msg.type) {
case MSG_TYPE_CONNECTION_TIMEOUT:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : CONNECTION_TIMEOUT\n");
_fsm(tcb, FSM_EVENT_TIMEOUT_CONNECTION, NULL, NULL, 0);
break;
/* Setup connection timeout: Put timeout message in tcb's mbox on expiration */
_setup_timeout(&connection_timeout, GNRC_TCP_CONNECTION_TIMEOUT_DURATION,
_cb_mbox_put_msg, &connection_timeout_arg);
case MSG_TYPE_NOTIFY_USER:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : NOTIFY_USER\n");
break;
/* Start connection teardown sequence */
_fsm(tcb, FSM_EVENT_CALL_CLOSE, NULL, NULL, 0);
default:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : other message type\n");
}
/* Loop until the connection has been closed */
while (tcb->state != FSM_STATE_CLOSED) {
mbox_get(&(tcb->mbox), &msg);
switch (msg.type) {
case MSG_TYPE_CONNECTION_TIMEOUT:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : CONNECTION_TIMEOUT\n");
_fsm(tcb, FSM_EVENT_TIMEOUT_CONNECTION, NULL, NULL, 0);
break;
case MSG_TYPE_NOTIFY_USER:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : NOTIFY_USER\n");
break;
default:
DEBUG("gnrc_tcp.c : gnrc_tcp_close() : other message type\n");
}
}
/* Cleanup */
xtimer_remove(&connection_timeout_timer);
tcb->owner = KERNEL_PID_UNDEF;
xtimer_remove(&connection_timeout);
tcb->status &= ~STATUS_WAIT_FOR_MSG;
mutex_unlock(&(tcb->function_lock));
}

View File

@ -875,14 +875,14 @@ int _fsm(gnrc_tcp_tcb_t *tcb, fsm_event_t event, gnrc_pktsnip_t *in_pkt, void *b
mutex_lock(&(tcb->fsm_lock));
/* Call FSM */
tcb->status &= ~(STATUS_NOTIFY_USER);
tcb->status &= ~STATUS_NOTIFY_USER;
int32_t result = _fsm_unprotected(tcb, event, in_pkt, buf, len);
/* Notify blocked thread if something interesting happend */
if ((tcb->status & STATUS_NOTIFY_USER) && (tcb->owner != KERNEL_PID_UNDEF)) {
if ((tcb->status & STATUS_NOTIFY_USER) && (tcb->status & STATUS_WAIT_FOR_MSG)) {
msg_t msg;
msg.type = MSG_TYPE_NOTIFY_USER;
msg_send(&msg, tcb->owner);
mbox_try_put(&(tcb->mbox), &msg);
}
/* Unlock FSM */
mutex_unlock(&(tcb->fsm_lock));

View File

@ -48,6 +48,7 @@ extern "C" {
#define STATUS_PASSIVE (1 << 0)
#define STATUS_ALLOW_ANY_ADDR (1 << 1)
#define STATUS_NOTIFY_USER (1 << 2)
#define STATUS_WAIT_FOR_MSG (1 << 3)
/** @} */
/**