diff --git a/sys/Makefile.dep b/sys/Makefile.dep index 66ca09d648..33bee38e2d 100644 --- a/sys/Makefile.dep +++ b/sys/Makefile.dep @@ -802,6 +802,7 @@ endif ifneq (,$(filter asymcute,$(USEMODULE))) USEMODULE += sock_udp USEMODULE += sock_util + USEMODULE += sock_async_event USEMODULE += random USEMODULE += event_timeout USEMODULE += event_callback diff --git a/sys/include/net/asymcute.h b/sys/include/net/asymcute.h index d8939737db..1a6c77eb32 100644 --- a/sys/include/net/asymcute.h +++ b/sys/include/net/asymcute.h @@ -169,22 +169,6 @@ extern "C" { #define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT) #endif -#ifndef ASYMCUTE_LISTENER_PRIO -/** - * @brief Default priority for an Asymcute listener thread - * - * @note Must be of higher priority than @ref ASYMCUTE_HANDLER_PRIO - */ -#define ASYMCUTE_LISTENER_PRIO (THREAD_PRIORITY_MAIN - 3) -#endif - -#ifndef ASYMCUTE_LISTENER_STACKSIZE -/** - * @brief Default stack size for an Asymcute listener thread - */ -#define ASYMCUTE_LISTENER_STACKSIZE (THREAD_STACKSIZE_DEFAULT) -#endif - /** * @brief Return values used by public Asymcute functions */ @@ -196,6 +180,7 @@ enum { ASYMCUTE_BUSY = -4, /**< error: context already in use */ ASYMCUTE_REGERR = -5, /**< error: registration invalid */ ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */ + ASYMCUTE_SENDERR = -7, /**< error: unable to sent packet */ }; /** @@ -295,7 +280,6 @@ struct asymcute_req { struct asymcute_con { mutex_t lock; /**< synchronization lock */ sock_udp_t sock; /**< socket used by a connections */ - sock_udp_ep_t server_ep; /**< the gateway's UDP endpoint */ asymcute_req_t *pending; /**< list holding pending requests */ asymcute_sub_t *subscriptions; /**< list holding active subscriptions */ asymcute_evt_cb_t user_cb; /**< event callback provided by user */ @@ -471,25 +455,6 @@ static inline bool asymcute_topic_equal(const asymcute_topic_t *a, int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, uint16_t topic_id); -/** - * @brief Start a listener thread - * - * @note Must have higher priority then the handler thread (defined by - * @ref ASYMCUTE_HANDLER_PRIO) - * - * @param[in] con connection context to use for this connection - * @param[in] stack stack used to run the listener thread - * @param[in] stacksize size of @p stack in bytes - * @param[in] priority priority of the listener thread created by this function - * @param[in] callback user callback for notification about connection related - * events - * - * @return ASYMCUTE_OK on success - * @return ASYMCUTE_BUSY if connection context is already in use - */ -int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize, - char priority, asymcute_evt_cb_t callback); - /** * @brief Start the global Asymcute handler thread for processing timeouts and * keep alive events @@ -517,16 +482,17 @@ bool asymcute_is_connected(const asymcute_con_t *con); * @param[in] cli_id client ID to register with the gateway * @param[in] clean set `true` to start a clean session * @param[in] will last will (currently not implemented) + * @param[in] callback user callback triggered on defined events * * @return ASYMCUTE_OK if CONNECT message has been sent * @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented) * @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN - * @return ASYMCUTE_GWERR if the connection is not in idle state - * @return ASYMCUTE_BUSY if the given request context is already in use + * @return ASYMCUTE_GWERR if initializing the socket for the connection failed + * @return ASYMCUTE_BUSY if the connection or the request context are in use */ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, sock_udp_ep_t *server, const char *cli_id, bool clean, - asymcute_will_t *will); + asymcute_will_t *will, asymcute_evt_cb_t callback); /** * @brief Close the given connection diff --git a/sys/net/application_layer/asymcute/asymcute.c b/sys/net/application_layer/asymcute/asymcute.c index 0a9b0dad45..b2d9b67b4d 100644 --- a/sys/net/application_layer/asymcute/asymcute.c +++ b/sys/net/application_layer/asymcute/asymcute.c @@ -21,11 +21,11 @@ #include #include -#include "log.h" #include "random.h" #include "byteorder.h" #include "timex.h" +#include "net/sock/async/event.h" #include "net/asymcute.h" #define ENABLE_DEBUG 0 @@ -53,10 +53,11 @@ #define LEN_PINGRESP (2U) +#define MIN_PKT_LEN (2) + /* Internally used connection states */ enum { - UNINITIALIZED = 0, /**< connection context is not initialized */ - NOTCON, /**< not connected to any gateway */ + NOTCON = 0, /**< not connected to any gateway */ CONNECTING, /**< connection is being setup */ CONNECTED, /**< connection is established */ TEARDOWN, /**< connection is being torn down */ @@ -175,14 +176,19 @@ static void _compile_sub_unsub(asymcute_req_t *req, asymcute_con_t *con, req->arg = sub; } -static void _req_resend(asymcute_req_t *req, asymcute_con_t *con) +static ssize_t _req_resend(asymcute_req_t *req, asymcute_con_t *con, int initial) { - event_timeout_set(&req->to_timer, RETRY_TO); - sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); + ssize_t n = sock_udp_send(&con->sock, req->data, req->data_len, NULL); + /* if sending the initial packet fails we do not set the retry timer, as we + * handle the return value directly */ + if (!((initial == 1) && (n < MIN_PKT_LEN))) { + event_timeout_set(&req->to_timer, RETRY_TO); + } + return n; } /* @pre con is locked */ -static void _req_send(asymcute_req_t *req, asymcute_con_t *con, +static int _req_send(asymcute_req_t *req, asymcute_con_t *con, asymcute_to_cb_t cb) { /* initialize request */ @@ -195,13 +201,20 @@ static void _req_send(asymcute_req_t *req, asymcute_con_t *con, req->next = con->pending; con->pending = req; /* send request */ - _req_resend(req, con); + ssize_t n = _req_resend(req, con, 1); + if (n < MIN_PKT_LEN) { + req->con = NULL; + mutex_unlock(&req->lock); + return ASYMCUTE_SENDERR; + } + return ASYMCUTE_OK; } -static void _req_send_once(asymcute_req_t *req, asymcute_con_t *con) +static int _req_send_once(asymcute_req_t *req, asymcute_con_t *con) { - sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); + ssize_t n = sock_udp_send(&con->sock, req->data, req->data_len, NULL); mutex_unlock(&req->lock); + return (n >= MIN_PKT_LEN) ? ASYMCUTE_OK : ASYMCUTE_SENDERR; } static void _req_cancel(asymcute_req_t *req) @@ -234,6 +247,9 @@ static void _disconnect(asymcute_con_t *con, uint8_t state) } con->subscriptions = NULL; } + if (state == NOTCON) { + sock_udp_close(&con->sock); + } con->state = state; } @@ -248,7 +264,7 @@ static void _on_req_timeout(void *arg) if (req->retry_cnt--) { /* resend the packet */ - _req_resend(req, req->con); + _req_resend(req, req->con, 0); return; } else { @@ -271,6 +287,7 @@ static unsigned _on_con_timeout(asymcute_con_t *con, asymcute_req_t *req) (void)req; con->state = NOTCON; + sock_udp_close(&con->sock); return ASYMCUTE_TIMEOUT; } @@ -278,7 +295,7 @@ static unsigned _on_discon_timeout(asymcute_con_t *con, asymcute_req_t *req) { (void)req; - con->state = NOTCON; + _disconnect(con, NOTCON); return ASYMCUTE_DISCONNECTED; } @@ -309,7 +326,7 @@ static void _on_keepalive_evt(void *arg) if (con->keepalive_retry_cnt) { /* (re)send keep alive ping and set dedicated retransmit timer */ uint8_t ping[2] = { 2, MQTTSN_PINGREQ }; - sock_udp_send(&con->sock, ping, sizeof(ping), &con->server_ep); + sock_udp_send(&con->sock, ping, sizeof(ping), NULL); con->keepalive_retry_cnt--; event_timeout_set(&con->keepalive_timer, RETRY_TO); mutex_unlock(&con->lock); @@ -360,7 +377,6 @@ static void _on_disconnect(asymcute_con_t *con, size_t len) } mutex_unlock(&con->lock); con->user_cb(req, ASYMCUTE_DISCONNECTED); - } static void _on_pingreq(asymcute_con_t *con) @@ -368,7 +384,7 @@ static void _on_pingreq(asymcute_con_t *con) /* simply reply with a PINGRESP message */ mutex_lock(&con->lock); uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP }; - sock_udp_send(&con->sock, resp, sizeof(resp), &con->server_ep); + sock_udp_send(&con->sock, resp, sizeof(resp), NULL); mutex_unlock(&con->lock); } @@ -440,7 +456,7 @@ static void _on_publish(asymcute_con_t *con, uint8_t *data, uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret }; /* copy topic and message id */ memcpy(&pkt[2], &data[pos + 2], 4); - sock_udp_send(&con->sock, pkt, 7, &con->server_ep); + sock_udp_send(&con->sock, pkt, 7, NULL); } /* release the context and notify the user (on success) */ @@ -541,85 +557,7 @@ static void _on_unsuback(asymcute_con_t *con, const uint8_t *data, size_t len) con->user_cb(req, ASYMCUTE_UNSUBSCRIBED); } -static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote) -{ - if (pkt_len < 2) { - return; - } - - size_t len; - size_t pos = _len_get(con->rxbuf, &len); - - /* make sure the incoming data was send by 'our' gateway */ - if (!sock_udp_ep_equal(&con->server_ep, remote)) { - return; - } - /* validate incoming data: verify message length */ - if ((pkt_len <= pos) || (pkt_len < len)) { - /* length field of MQTT-SN packet seems to be invalid -> drop the pkt */ - return; - } - - /* figure out required action based on message type */ - uint8_t type = con->rxbuf[pos]; - switch (type) { - case MQTTSN_CONNACK: - _on_connack(con, con->rxbuf, len); - break; - case MQTTSN_DISCONNECT: - _on_disconnect(con, len); - break; - case MQTTSN_PINGREQ: - _on_pingreq(con); - break; - case MQTTSN_PINGRESP: - _on_pingresp(con); - break; - case MQTTSN_REGACK: - _on_regack(con, con->rxbuf, len); - break; - case MQTTSN_PUBLISH: - _on_publish(con, con->rxbuf, pos, len); - break; - case MQTTSN_PUBACK: - _on_puback(con, con->rxbuf, len); - break; - case MQTTSN_SUBACK: - _on_suback(con, con->rxbuf, len); - break; - case MQTTSN_UNSUBACK: - _on_unsuback(con, con->rxbuf, len); - break; - default: - break; - } -} - -void *_listener(void *arg) -{ - asymcute_con_t *con = arg; - - /* create a socket for this listener, using an ephemeral port */ - sock_udp_ep_t local = SOCK_IPV6_EP_ANY; - if (sock_udp_create(&con->sock, &local, NULL, 0) != 0) { - LOG_ERROR("[asymcute] error creating listener socket\n"); - return NULL; - } - - while (1) { - sock_udp_ep_t remote; - int n = sock_udp_recv(&con->sock, con->rxbuf, CONFIG_ASYMCUTE_BUFSIZE, - SOCK_NO_TIMEOUT, &remote); - if (n > 0) { - _on_data(con, (size_t)n, &remote); - } - } - - /* should never be reached */ - return NULL; -} - -void *_handler(void *arg) +void *_eventloop(void *arg) { (void)arg; event_queue_init(&_queue); @@ -628,50 +566,64 @@ void *_handler(void *arg) return NULL; } -int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize, - char priority, asymcute_evt_cb_t callback) +void _on_pkt(sock_udp_t *sock, sock_async_flags_t type, void *arg) { - /* make sure con is not running */ - assert(con); - assert((priority > 0) && (priority < THREAD_PRIORITY_IDLE - 1)); - assert(callback); + asymcute_con_t *con = (asymcute_con_t *)arg; - int ret = ASYMCUTE_OK; + if (type & SOCK_ASYNC_MSG_RECV) { + ssize_t pkt_len = sock_udp_recv(sock, con->rxbuf, + CONFIG_ASYMCUTE_BUFSIZE, 0, NULL); + if (pkt_len >= MIN_PKT_LEN) { + size_t len; + size_t pos = _len_get(con->rxbuf, &len); - /* make sure the connection context is not already used */ - mutex_lock(&con->lock); - if (con->state != UNINITIALIZED) { - ret = ASYMCUTE_BUSY; - goto end; + /* validate incoming data: verify message length */ + if (((size_t)pkt_len <= pos) || ((size_t)pkt_len < len)) { + /* length field of MQTT-SN packet seems to be invalid -> drop the pkt */ + return; + } + + /* figure out required action based on message type */ + uint8_t type = con->rxbuf[pos]; + switch (type) { + case MQTTSN_CONNACK: + _on_connack(con, con->rxbuf, len); + break; + case MQTTSN_DISCONNECT: + _on_disconnect(con, len); + break; + case MQTTSN_PINGREQ: + _on_pingreq(con); + break; + case MQTTSN_PINGRESP: + _on_pingresp(con); + break; + case MQTTSN_REGACK: + _on_regack(con, con->rxbuf, len); + break; + case MQTTSN_PUBLISH: + _on_publish(con, con->rxbuf, pos, len); + break; + case MQTTSN_PUBACK: + _on_puback(con, con->rxbuf, len); + break; + case MQTTSN_SUBACK: + _on_suback(con, con->rxbuf, len); + break; + case MQTTSN_UNSUBACK: + _on_unsuback(con, con->rxbuf, len); + break; + default: + break; + } + } } - - /* initialize the connection context */ - memset(con, 0, sizeof(asymcute_con_t)); - random_bytes((uint8_t *)&con->last_id, 2); - event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con); - event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super); - con->keepalive_retry_cnt = CONFIG_ASYMCUTE_N_RETRY; - con->state = NOTCON; - con->user_cb = callback; - - /* start listener thread */ - thread_create(stack, - stacksize, - priority, - THREAD_CREATE_WOUT_YIELD, - _listener, - con, - "asymcute_listener"); - -end: - mutex_unlock(&con->lock); - return ret; } void asymcute_handler_run(void) { thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO, - 0, _handler, NULL, "asymcute_main"); + THREAD_CREATE_STACKTEST, _eventloop, NULL, "asymcute_main"); } int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, @@ -723,7 +675,7 @@ bool asymcute_is_connected(const asymcute_con_t *con) int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, sock_udp_ep_t *server, const char *cli_id, bool clean, - asymcute_will_t *will) + asymcute_will_t *will, asymcute_evt_cb_t callback) { assert(con); assert(req); @@ -744,7 +696,7 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, /* check if the context is not already connected to any gateway */ mutex_lock(&con->lock); if (con->state != NOTCON) { - ret = ASYMCUTE_GWERR; + ret = ASYMCUTE_BUSY; goto end; } /* get mutual access to the request context */ @@ -753,10 +705,26 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, goto end; } - /* prepare the connection context */ + /* initialize the connection context */ + memset(con, 0, sizeof(asymcute_con_t)); + random_bytes((uint8_t *)&con->last_id, 2); + con->keepalive_retry_cnt = CONFIG_ASYMCUTE_N_RETRY; + event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con); + event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super); + con->user_cb = callback; con->state = CONNECTING; strncpy(con->cli_id, cli_id, sizeof(con->cli_id)); - memcpy(&con->server_ep, server, sizeof(con->server_ep)); + + /* create a socket for this listener, using an ephemeral port */ + sock_udp_ep_t local = SOCK_IPV6_EP_ANY; + local.port = 0; + local.netif = server->netif; + if (sock_udp_create(&con->sock, &local, server, 0) != 0) { + con->state = NOTCON; + ret = ASYMCUTE_GWERR; + goto end; + } + sock_udp_event_init(&con->sock, &_queue, _on_pkt, con); /* compile and send connect message */ req->msg_id = 0; @@ -767,7 +735,10 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, byteorder_htobebufs(&req->data[4], CONFIG_ASYMCUTE_KEEPALIVE); memcpy(&req->data[6], cli_id, id_len); req->data_len = (size_t)req->data[0]; - _req_send(req, con, _on_con_timeout); + ret = _req_send(req, con, _on_con_timeout); + if (ret != ASYMCUTE_OK) { + _disconnect(con, NOTCON); + } end: mutex_unlock(&con->lock); @@ -801,7 +772,7 @@ int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req) req->data[0] = 2; req->data[1] = MQTTSN_DISCONNECT; req->data_len = 2; - _req_send(req, con, _on_discon_timeout); + ret = _req_send(req, con, _on_discon_timeout); end: mutex_unlock(&con->lock); @@ -855,7 +826,7 @@ int asymcute_register(asymcute_con_t *con, asymcute_req_t *req, req->data_len = (pos + 5 + topic_len); /* send the request */ - _req_send(req, con, NULL); + ret = _req_send(req, con, NULL); end: mutex_unlock(&con->lock); @@ -917,10 +888,10 @@ int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req, /* publish selected data */ if (flags & MQTTSN_QOS_1) { - _req_send(req, con, NULL); + ret = _req_send(req, con, NULL); } else { - _req_send_once(req, con); + ret = _req_send_once(req, con); } end: @@ -978,7 +949,7 @@ int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req, /* send SUBSCRIBE message */ _compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE); - _req_send(req, con, _on_suback_timeout); + ret = _req_send(req, con, _on_suback_timeout); end: mutex_unlock(&con->lock); @@ -1012,7 +983,7 @@ int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req, /* prepare and send UNSUBSCRIBE message */ _compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE); - _req_send(req, con, NULL); + ret = _req_send(req, con, NULL); end: mutex_unlock(&con->lock);