1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-17 05:12:57 +01:00

net: add Asymcute (asynchronous MQTT-SN client)

This commit is contained in:
Hauke Petersen 2018-06-29 17:25:55 +02:00
parent ed01bf0e1d
commit ad20c4f1cc
5 changed files with 1548 additions and 0 deletions

View File

@ -659,6 +659,14 @@ ifneq (,$(filter openthread_contrib,$(USEMODULE)))
FEATURES_REQUIRED += cpp
endif
ifneq (,$(filter asymcute,$(USEMODULE)))
USEMODULE += sock_udp
USEMODULE += sock_util
USEMODULE += random
USEMODULE += event_timeout
USEMODULE += event_callback
endif
ifneq (,$(filter emcute,$(USEMODULE)))
USEMODULE += core_thread_flags
USEMODULE += sock_udp

View File

@ -103,6 +103,9 @@ endif
ifneq (,$(filter gcoap,$(USEMODULE)))
DIRS += net/application_layer/gcoap
endif
ifneq (,$(filter asymcute,$(USEMODULE)))
DIRS += net/application_layer/asymcute
endif
ifneq (,$(filter emcute,$(USEMODULE)))
DIRS += net/application_layer/emcute
endif

561
sys/include/net/asymcute.h Normal file
View File

@ -0,0 +1,561 @@
/*
* Copyright (C) 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup net_asymcute MQTT-SN Client (Asymcute)
* @ingroup net
* @brief Asymcute is an asynchronous MQTT-SN implementation
*
* # About
* `Asymcute` is a asynchronous MQTT-SN client implementation, aiming at
* providing the user a high degree of flexibility. It provides a flexible
* interface that allows users to issue any number of concurrent requests to
* one or more different gateways simultaneously.
*
* # Implementation state
*
* Implemented features:
* - Connecting to multiple gateways simultaneously
* - Registration of topic names
* - Publishing of data (QoS 0 and QoS 1)
* - Subscription to topics
* - Pre-defined topic IDs as well as short and normal topic names
*
* Missing features:
* - Gateway discovery process not implemented
* - Last will feature not implemented
* - No support for QoS level 2
* - No support for wildcard characters in topic names when subscribing
* - Actual granted QoS level on subscription is ignored
*
* @{
* @file
* @brief Asymcute MQTT-SN interface definition
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef NET_ASYMCUTE_H
#define NET_ASYMCUTE_H
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "assert.h"
#include "event/timeout.h"
#include "event/callback.h"
#include "net/mqttsn.h"
#include "net/sock/udp.h"
#include "net/sock/util.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef ASYMCUTE_BUFSIZE
/**
* @brief Default buffer size used for receive and request buffers
*/
#define ASYMCUTE_BUFSIZE (128U)
#endif
#ifndef ASYMCUTE_HANDLER_PRIO
/**
* @brief Default priority for Asymcute's handler thread
*/
#define ASYMCUTE_HANDLER_PRIO (THREAD_PRIORITY_MAIN - 2)
#endif
#ifndef ASYMCUTE_HANDLER_STACKSIZE
/**
* @brief Default stack size for Asymcute's handler thread
*/
#define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif
#ifndef ASYMCUTE_LISTENER_PRIO
/**
* @brief Default priority for an Asymcute listener thread
*
* @note Must be of higher priority than @ref ASYMCUTE_HANDLER_PRIO
*/
#define ASYMCUTE_LISTENER_PRIO (THREAD_PRIORITY_MAIN - 3)
#endif
#ifndef ASYMCUTE_LISTENER_STACKSIZE
/**
* @brief Default stack size for an Asymcute listener thread
*/
#define ASYMCUTE_LISTENER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif
#ifndef ASYMCUTE_ID_MAXLEN
/**
* @brief Maximum client ID length
*
* @note Must be less than (256 - 8) and less than (ASYMCUTE_BUFSIZE - 8)
*/
#define ASYMCUTE_ID_MAXLEN (32U)
#endif
#ifndef ASYMCUTE_TOPIC_MAXLEN
/**
* @brief Maximum topic length
*
* @note Must be less than (256 - 8) AND less than (ASYMCUTE_BUFSIZE - 8).
*/
#define ASYMCUTE_TOPIC_MAXLEN (32U)
#endif
#ifndef ASYMCUTE_KEEPALIVE
/**
* @brief Keep alive interval [in s] communicated to the gateway
*
* For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min
*/
#define ASYMCUTE_KEEPALIVE (360) /* -> 6 min*/
#endif
#ifndef ASYMCUTE_KEEPALIVE_PING
/**
* @brief Interval to use for sending periodic ping messages
*
* The default behavior of this implementation is to send ping messages as soon
* as three quarters of the keep alive interval have passed.
*
* @note Must be less than ASYMCUTE_KEEPALIVE
*/
#define ASYMCUTE_KEEPALIVE_PING ((ASYMCUTE_KEEPALIVE / 4) * 3)
#endif
#ifndef ASYMCUTE_T_RETRY
/**
* @brief Resend interval [in seconds]
*
* For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec
*/
#define ASYMCUTE_T_RETRY (10U) /* -> 10 sec */
#endif
#ifndef ASYMCUTE_N_RETRY
/**
* @brief Number of retransmissions until requests time out
*
* For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5
*/
#define ASYMCUTE_N_RETRY (3U)
#endif
/**
* @brief Return values used by public Asymcute functions
*/
enum {
ASYMCUTE_OK = 0, /**< all is good */
ASYMCUTE_OVERFLOW = -1, /**< error: insufficient buffer space */
ASYMCUTE_GWERR = -2, /**< error: bad gateway connection state */
ASYMCUTE_NOTSUP = -3, /**< error: feature not supported */
ASYMCUTE_BUSY = -4, /**< error: context already in use */
ASYMCUTE_REGERR = -5, /**< error: registration invalid */
ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */
};
/**
* @brief Possible event types passed to the event callback
*/
enum {
ASYMCUTE_TIMEOUT, /**< request timed out */
ASYMCUTE_CANCELED, /**< request was canceled */
ASYMCUTE_REJECTED, /**< request was rejected */
ASYMCUTE_CONNECTED, /**< connected to gateway */
ASYMCUTE_DISCONNECTED, /**< connection got disconnected */
ASYMCUTE_REGISTERED, /**< topic was registered */
ASYMCUTE_PUBLISHED, /**< data was published */
ASYMCUTE_SUBSCRIBED, /**< client was subscribed to topic */
ASYMCUTE_UNSUBSCRIBED, /**< client was unsubscribed from topic */
};
/**
* @brief Forward type declaration for connections contexts
*/
typedef struct asymcute_con asymcute_con_t;
/**
* @brief Forward type declaration for request contexts
*/
typedef struct asymcute_req asymcute_req_t;
/**
* @brief Forward type declaration for subscription contexts
*/
typedef struct asymcute_sub asymcute_sub_t;
/**
* @brief Forward type declaration for topic definitions
*/
typedef struct asymcute_topic asymcute_topic_t;
/**
* @brief Forward type declaration for last will definitions
*/
typedef struct asymcute_will asymcute_will_t;
/**
* @brief Event callback used for communicating connection and request related
* events to the user
*
* @param[in] req pointer to the request context that triggered the event,
* may be NULL of unsolicited events
* @param[in] evt_type type of the event
*/
typedef void(*asymcute_evt_cb_t)(asymcute_req_t *req, unsigned evt_type);
/**
* @brief Callback triggered on events for active subscriptions
*
* @param[in] sub pointer to subscription context triggering this event
* @param[in] evt_type type of the event
* @param[in] data incoming data for PUBLISHED events, may be NULL
* @param[in] len length of @p data in bytes
* @param[in] arg user supplied argument
*/
typedef void(*asymcute_sub_cb_t)(const asymcute_sub_t *sub, unsigned evt_type,
const void *data, size_t len, void *arg);
/**
* @brief Context specific timeout callback, only used internally
*
* @internal
*
* @param[in] con connection context for this timeout
* @param[in] req request that timed out
*
* @return Event type to communicate to the user
*/
typedef unsigned(*asymcute_to_cb_t)(asymcute_con_t *con, asymcute_req_t *req);
/**
* @brief Asymcute request context
*/
struct asymcute_req {
mutex_t lock; /**< synchronization lock */
struct asymcute_req *next; /**< the requests list entry */
asymcute_con_t *con; /**< connection the request is using */
asymcute_to_cb_t cb; /**< internally used callback */
void *arg; /**< internally used additional state */
event_callback_t to_evt; /**< timeout event */
event_timeout_t to_timer; /**< timeout timer */
uint8_t data[ASYMCUTE_BUFSIZE]; /**< buffer holding the request's data */
size_t data_len; /**< length of the request packet in byte */
uint16_t msg_id; /**< used message id for this request */
uint8_t retry_cnt; /**< retransmission counter */
};
/**
* @brief Asymcute connection context
*/
struct asymcute_con {
mutex_t lock; /**< synchronization lock */
sock_udp_t sock; /**< socket used by a connections */
sock_udp_ep_t server_ep; /**< the gateway's UDP endpoint */
asymcute_req_t *pending; /**< list holding pending requests */
asymcute_sub_t *subscriptions; /**< list holding active subscriptions */
asymcute_evt_cb_t user_cb; /**< event callback provided by user */
event_callback_t keepalive_evt; /**< keep alive event */
event_timeout_t keepalive_timer; /**< keep alive timer */
uint16_t last_id; /**< last used message ID for this
* connection */
uint8_t keepalive_retry_cnt; /**< keep alive transmission counter */
uint8_t state; /**< connection state */
uint8_t rxbuf[ASYMCUTE_BUFSIZE]; /**< connection specific receive buf */
char cli_id[ASYMCUTE_ID_MAXLEN + 1];/**< buffer to store client ID */
};
/**
* @brief Data-structure for holding topics and their registration status
*/
struct asymcute_topic {
asymcute_con_t *con; /**< connection used for registration */
char name[ASYMCUTE_TOPIC_MAXLEN + 1]; /**< topic string (ACSII only) */
uint8_t flags; /**< normal, short, or pre-defined */
uint16_t id; /**< topic id */
};
/**
* @brief Data-structure holding the state of subscriptions
*/
struct asymcute_sub {
asymcute_sub_t *next; /**< the subscriptions list entry */
asymcute_topic_t *topic; /**< topic we subscribe to */
asymcute_sub_cb_t cb; /**< called on incoming data */
void *arg; /**< user supplied callback argument */
};
/**
* @brief Data structure for defining a last will
*/
struct asymcute_will {
const char *topic; /**< last will topic */
void *msg; /**< last will message content */
size_t msg_len; /**< length of last will message content */
};
/**
* @brief Check if a given request context is currently used
*
* @param[in] req request context to check
*
* @return true if context is currently used
* @return false if context is not used
*/
static inline bool asymcute_req_in_use(const asymcute_req_t *req)
{
assert(req);
return (req->con != NULL);
}
/**
* @brief Check if a given subscription is currently active
*
* @param[in] sub subscription context to check
*
* @return true if subscription is active
* @return false if subscription is not active
*/
static inline bool asymcute_sub_active(const asymcute_sub_t *sub)
{
assert(sub);
return (sub->topic != NULL);
}
/**
* @brief Reset the given topic
*
* @warning Make sure that the given topic is not used by any subscription or
* last will when calling this function
*
* @param[out] topic topic to reset
*/
static inline void asymcute_topic_reset(asymcute_topic_t *topic)
{
assert(topic);
memset(topic, 0, sizeof(asymcute_topic_t));
}
/**
* @brief Check if a given topic is currently registered with a gateway
*
* @param[in] topic topic to check
*
* @return true if topic is registered
* @return false if topic is not registered
*/
static inline bool asymcute_topic_is_reg(const asymcute_topic_t *topic)
{
assert(topic);
return (topic->con != NULL);
}
/**
* @brief Check if a given topic is initialized
*
* @param[in] topic topic to check
*
* @return true if topic is initialized
* @return false if topic is not initialized
*/
static inline bool asymcute_topic_is_init(const asymcute_topic_t *topic)
{
assert(topic);
return (topic->name[0] != '\0');
}
/**
* @brief Compare two given topics and check if they are equal
*
* @param[in] a topic A
* @param[in] b topic B
*
* @return true if both topics are equal
* @return false if topics differ
*/
static inline bool asymcute_topic_equal(const asymcute_topic_t *a,
const asymcute_topic_t *b)
{
assert(a);
assert(b);
return ((a->flags == b->flags) && (a->id == b->id));
}
/**
* @brief Initialize the given topic
*
* @param[out] topic topic to initialize
* @param[in] topic_name topic name (ASCII), may be NULL if topic should use
* a pre-defined topic ID
* @param[in] topic_id pre-defined topic ID, or don't care if @p topic_name
* is given
*
* @return ASYMCUTE_OK on success
* @return ASYMCUTE_REGERR if topic is already registered
* @return ASYMCUTE_OVERFLOW if topic name does not fit into buffer or if pre-
* defined topic ID is invalid
*/
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
uint16_t topic_id);
/**
* @brief Start a listener thread
*
* @note Must have higher priority then the handler thread (defined by
* @ref ASYMCUTE_HANDLER_PRIO)
*
* @param[in] con connection context to use for this connection
* @param[in] stack stack used to run the listener thread
* @param[in] stacksize size of @p stack in bytes
* @param[in] priority priority of the listener thread created by this function
* @param[in] callback user callback for notification about connection related
* events
*
* @return ASYMCUTE_OK on success
* @return ASYMCUTE_BUSY if connection context is already in use
*/
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback);
/**
* @brief Start the global Asymcute handler thread for processing timeouts and
* keep alive events
*
* This function is typically called during system initialization.
*/
void asymcute_handler_run(void);
/**
* @brief Check if the given connection context is connected to a gateway
*
* @param[in] con connection to check
*
* @return true if context is connected
* @return false if not connected
*/
bool asymcute_is_connected(const asymcute_con_t *con);
/**
* @brief Connect to the given MQTT-SN gateway
*
* @param[in,out] con connection to use
* @param[in,out] req request context to use for CONNECT procedure
* @param[in] server UDP endpoint of the target gateway
* @param[in] cli_id client ID to register with the gateway
* @param[in] clean set `true` to start a clean session
* @param[in] will last will (currently not implemented)
*
* @return ASYMCUTE_OK if CONNECT message has been sent
* @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented)
* @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN
* @return ASYMCUTE_GWERR if the connection is not in idle state
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will);
/**
* @brief Close the given connection
*
* @param[in,out] con connection to close
* @param[in,out] req request context to use for DISCONNECT procedure
*
* @return ASYMCUTE_OK if DISCONNECT message has been sent
* @return ASYMCUTE_GWERR if connection context is not connected
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req);
/**
* @brief Register a given topic with the connected gateway
*
* @param[in] con connection to use
* @param[in,out] req request context to use for REGISTER procedure
* @param[in,out] topic topic to register
*
* @return ASYMCUTE_OK if REGISTER message has been sent
* @return ASYMCUTE_REGERR if topic is already registered
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_register(asymcute_con_t *con, asymcute_req_t *req,
asymcute_topic_t *topic);
/**
* @brief Publish the given data to the given topic
*
* @param[in] con connection to use
* @param[in,out] req request context used for PUBLISH procedure
* @param[in] topic publish data to this topic
* @param[in] data actual payload to send
* @param[in] data_len size of @p data in bytes
* @param[in] flags additional flags (QoS level, DUP, and RETAIN)
*
* @return ASYMCUTE_OK if PUBLISH message has been sent
* @return ASYMCUTE_NOTSUP if unsupported flags have been set
* @return ASYMCUTE_OVERFLOW if data does not fit into transmit buffer
* @return ASYMCUTE_REGERR if given topic is not registered
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req,
const asymcute_topic_t *topic,
const void *data, size_t data_len, uint8_t flags);
/**
* @brief Subscribe to a given topic
*
* @param[in] con connection to use
* @param[in,out] req request context used for SUBSCRIBE procedure
* @param[out] sub subscription context to store subscription state
* @param[in,out] topic topic to subscribe to, must be initialized (see
* asymcute_topic_init())
* @param[in] callback user callback triggered on events for this subscription
* @param[in] arg user supplied argument passed to the event callback
* @param[in] flags additional flags (QoS level and DUP)
*
* @return ASYMCUTE_OK if SUBSCRIBE message has been sent
* @return ASYMCUTE_NOTSUP if invalid or unsupported flags have been set
* @return ASYMCUTE_REGERR if topic is not initialized
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_SUBERR if already subscribed to the given topic
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub, asymcute_topic_t *topic,
asymcute_sub_cb_t callback, void *arg, uint8_t flags);
/**
* @brief Cancel an active subscription
*
* @param[in] con connection to use
* @param[in,out] req request context used for UNSUBSCRIBE procedure
* @param[in,out] sub subscription to cancel
*
* @return ASYMCUTE_OK if UNSUBSCRIBE message has been sent
* @return ASYMCUTE_SUBERR if subscription is not currently active
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub);
#ifdef __cplusplus
}
#endif
#endif /* NET_ASYMCUTE_H */
/** @} */

View File

@ -0,0 +1,3 @@
MODULE = asymcute
include $(RIOTBASE)/Makefile.base

View File

@ -0,0 +1,973 @@
/*
* Copyright (C) 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup net_asymcute
* @{
*
* @file
* @brief Asynchronous MQTT-SN implementation
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*
* @}
*/
#include <limits.h>
#include "log.h"
#include "random.h"
#include "byteorder.h"
#include "net/asymcute.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
#define PROTOCOL_VERSION (0x01)
#define RETRY_TO (ASYMCUTE_T_RETRY * US_PER_SEC)
#define KEEPALIVE_TO (ASYMCUTE_KEEPALIVE_PING * US_PER_SEC)
#define VALID_PUBLISH_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP | MQTTSN_RETAIN)
#define VALID_SUBSCRIBE_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP)
#define MINLEN_CONNACK (3U)
#define MINLEN_DISCONNECT (2U)
#define MINLEN_REGACK (7U)
#define MINLEN_PUBACK (7U)
#define MINLEN_SUBACK (8U)
#define MINLEN_UNSUBACK (4U)
#define IDPOS_REGACK (4U)
#define IDPOS_PUBACK (4U)
#define IDPOS_SUBACK (5U)
#define IDPOS_UNSUBACK (2U)
#define LEN_PINGRESP (2U)
/* Internally used connection states */
enum {
UNINITIALIZED = 0, /**< connection context is not initialized */
NOTCON, /**< not connected to any gateway */
CONNECTING, /**< connection is being setup */
CONNECTED, /**< connection is established */
TEARDOWN, /**< connection is being torn down */
};
/* the main handler thread needs a stack and a message queue */
static event_queue_t _queue;
static char _stack[ASYMCUTE_HANDLER_STACKSIZE];
/* necessary forward function declarations */
static void _on_req_timeout(void *arg);
static size_t _len_set(uint8_t *buf, size_t len)
{
if (len < (0xff - 7)) {
buf[0] = len + 1;
return 1;
}
else {
buf[0] = 0x01;
byteorder_htobebufs(&buf[1], (uint16_t)(len + 3));
return 3;
}
}
static size_t _len_get(uint8_t *buf, size_t *len)
{
if (buf[0] != 0x01) {
*len = (uint16_t)buf[0];
return 1;
}
else {
*len = byteorder_bebuftohs(&buf[1]);
return 3;
}
}
/* @pre con is locked */
static uint16_t _msg_id_next(asymcute_con_t *con)
{
if (++con->last_id == 0) {
return ++con->last_id;
}
return con->last_id;
}
/* @pre con is locked */
static asymcute_req_t *_req_preprocess(asymcute_con_t *con,
size_t msg_len, size_t min_len,
const uint8_t *buf, unsigned id_pos)
{
/* verify message length */
if (msg_len < min_len) {
return NULL;
}
uint16_t msg_id = (buf == NULL) ? 0 : byteorder_bebuftohs(&buf[id_pos]);
asymcute_req_t *res = NULL;
asymcute_req_t *iter = con->pending;
if (iter == NULL) {
return NULL;
}
if (iter->msg_id == msg_id) {
res = iter;
con->pending = iter->next;
}
while (iter && !res) {
if (iter->next && (iter->next->msg_id == msg_id)) {
res = iter->next;
iter->next = iter->next->next;
}
iter = iter->next;
}
if (res) {
res->con = NULL;
event_timeout_clear(&res->to_timer);
}
return res;
}
/* @pre con is locked */
static void _req_remove(asymcute_con_t *con, asymcute_req_t *req)
{
if (con->pending == req) {
con->pending = con->pending->next;
}
for (asymcute_req_t *cur = con->pending; cur; cur = cur->next) {
if (cur->next == req) {
cur->next = cur->next->next;
}
}
req->con = NULL;
}
/* @pre con is locked */
static void _compile_sub_unsub(asymcute_req_t *req, asymcute_con_t *con,
asymcute_sub_t *sub, uint8_t type)
{
size_t topic_len = strlen(sub->topic->name);
size_t pos = _len_set(req->data, (topic_len + 4));
req->msg_id = _msg_id_next(con);
req->data[pos] = type;
req->data[pos + 1] = sub->topic->flags;
byteorder_htobebufs(&req->data[pos + 2], req->msg_id);
memcpy(&req->data[pos + 4], sub->topic->name, topic_len);
req->data_len = (pos + 4 + topic_len);
req->arg = (void *)sub;
}
static void _req_resend(asymcute_req_t *req, asymcute_con_t *con)
{
event_timeout_set(&req->to_timer, RETRY_TO);
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep);
}
/* @pre con is locked */
static void _req_send(asymcute_req_t *req, asymcute_con_t *con,
asymcute_to_cb_t cb)
{
/* initialize request */
req->con = con;
req->cb = cb;
req->retry_cnt = ASYMCUTE_N_RETRY;
event_callback_init(&req->to_evt, _on_req_timeout, (void *)req);
event_timeout_init(&req->to_timer, &_queue, &req->to_evt.super);
/* add request to the pending queue (if non-con request) */
req->next = con->pending;
con->pending = req;
/* send request */
_req_resend(req, con);
}
static void _req_send_once(asymcute_req_t *req, asymcute_con_t *con)
{
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep);
mutex_unlock(&req->lock);
}
static void _req_cancel(asymcute_req_t *req)
{
asymcute_con_t *con = req->con;
event_timeout_clear(&req->to_timer);
req->con = NULL;
mutex_unlock(&req->lock);
con->user_cb(req, ASYMCUTE_CANCELED);
}
static void _sub_cancel(asymcute_sub_t *sub)
{
sub->cb(sub, ASYMCUTE_CANCELED, NULL, 0, sub->arg);
sub->topic = NULL;
}
/* @pre con is locked */
static void _disconnect(asymcute_con_t *con, uint8_t state)
{
if (con->state == CONNECTED) {
/* cancel all pending requests */
event_timeout_clear(&con->keepalive_timer);
for (asymcute_req_t *req = con->pending; req; req = req->next) {
_req_cancel(req);
}
con->pending = NULL;
for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) {
_sub_cancel(sub);
}
con->subscriptions = NULL;
}
con->state = state;
}
static void _on_req_timeout(void *arg)
{
asymcute_req_t *req = (asymcute_req_t *)arg;
/* only process the timeout, if the request is still active */
if (req->con == NULL) {
return;
}
if (req->retry_cnt--) {
/* resend the packet */
_req_resend(req, req->con);
return;
}
else {
asymcute_con_t *con = req->con;
mutex_lock(&con->lock);
_req_remove(con, req);
/* communicate timeout to outer world */
unsigned ret = ASYMCUTE_TIMEOUT;
if (req->cb) {
ret = req->cb(con, req);
}
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
}
static unsigned _on_con_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)req;
con->state = NOTCON;
return ASYMCUTE_TIMEOUT;
}
static unsigned _on_discon_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)req;
con->state = NOTCON;
return ASYMCUTE_DISCONNECTED;
}
static unsigned _on_suback_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)con;
/* reset the subscription context */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
sub->topic = NULL;
return ASYMCUTE_TIMEOUT;
}
static void _on_keepalive_evt(void *arg)
{
asymcute_con_t *con = (asymcute_con_t *)arg;
mutex_lock(&con->lock);
if (con->state != CONNECTED) {
mutex_unlock(&con->lock);
return;
}
if (con->keepalive_retry_cnt) {
/* (re)send keep alive ping and set dedicated retransmit timer */
uint8_t ping[2] = { 2, MQTTSN_PINGREQ };
sock_udp_send(&con->sock, ping, sizeof(ping), &con->server_ep);
con->keepalive_retry_cnt--;
event_timeout_set(&con->keepalive_timer, RETRY_TO);
mutex_unlock(&con->lock);
}
else {
_disconnect(con, NOTCON);
mutex_unlock(&con->lock);
con->user_cb(NULL, ASYMCUTE_DISCONNECTED);
}
}
static void _on_connack(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_CONNACK, NULL, 0);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* check return code and mark connection as established */
unsigned ret = ASYMCUTE_REJECTED;
if (data[2] == MQTTSN_ACCEPTED) {
con->state = CONNECTED;
/* start keep alive timer */
event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO);
ret = ASYMCUTE_CONNECTED;
}
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_disconnect(asymcute_con_t *con, size_t len)
{
mutex_lock(&con->lock);
/* we might have triggered the DISCONNECT process ourselves, so make sure
* the pending request is being handled */
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_DISCONNECT, NULL, 0);
/* put the connection back to NOTCON in any case and let the user know */
_disconnect(con, NOTCON);
if (req) {
mutex_unlock(&req->lock);
}
mutex_unlock(&con->lock);
con->user_cb(req, ASYMCUTE_DISCONNECTED);
}
static void _on_pingreq(asymcute_con_t *con)
{
/* simply reply with a PINGRESP message */
mutex_lock(&con->lock);
uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP };
sock_udp_send(&con->sock, resp, sizeof(resp), &con->server_ep);
mutex_unlock(&con->lock);
}
static void _on_pingresp(asymcute_con_t *con)
{
mutex_lock(&con->lock);
/* only handle ping resp message if we are actually waiting for a reply */
if (con->keepalive_retry_cnt < ASYMCUTE_N_RETRY) {
event_timeout_clear(&con->keepalive_timer);
con->keepalive_retry_cnt = ASYMCUTE_N_RETRY;
event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO);
}
mutex_unlock(&con->lock);
}
static void _on_regack(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_REGACK,
data, IDPOS_REGACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* check return code */
unsigned ret = ASYMCUTE_REJECTED;
if (data[6] == MQTTSN_ACCEPTED) {
/* finish the registration by applying the topic id */
asymcute_topic_t *topic = (asymcute_topic_t *)req->arg;
topic->id = byteorder_bebuftohs(&data[2]);
topic->con = con;
ret = ASYMCUTE_REGISTERED;
}
/* finally notify the user and free the request */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_publish(asymcute_con_t *con, uint8_t *data,
size_t pos, size_t len)
{
/* verify message length */
if (len < (pos + 6)) {
return;
}
uint16_t topic_id = byteorder_bebuftohs(&data[pos + 2]);
/* find any subscription for that topic */
mutex_lock(&con->lock);
asymcute_sub_t *sub = NULL;
for (asymcute_sub_t *cur = con->subscriptions; cur; cur = cur->next) {
if (cur->topic->id == topic_id) {
sub = cur;
break;
}
}
/* send PUBACK if needed (QoS > 0 or on invalid topic ID) */
if ((sub == NULL) || (data[pos + 1] & MQTTSN_QOS_1)) {
uint8_t ret = (sub) ? MQTTSN_ACCEPTED : MQTTSN_REJ_INV_TOPIC_ID;
uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret };
/* copy topic and message id */
memcpy(&pkt[2], &data[pos + 2], 4);
sock_udp_send(&con->sock, pkt, 7, &con->server_ep);
}
/* release the context and notify the user (on success) */
mutex_unlock(&con->lock);
if (sub) {
sub->cb(sub, ASYMCUTE_PUBLISHED,
&data[pos + 6], (len - (pos + 6)), sub->arg);
}
}
static void _on_puback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_PUBACK,
data, IDPOS_PUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
unsigned ret = (data[6] == MQTTSN_ACCEPTED) ?
ASYMCUTE_PUBLISHED : ASYMCUTE_REJECTED;
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_suback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_SUBACK,
data, IDPOS_SUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
unsigned ret = ASYMCUTE_REJECTED;
if (data[7] == MQTTSN_ACCEPTED) {
/* parse and apply assigned topic id */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
sub->topic->id = byteorder_bebuftohs(&data[3]);
sub->topic->con = con;
/* insert subscription to connection context */
sub->next = con->subscriptions;
con->subscriptions = sub;
ret = ASYMCUTE_SUBSCRIBED;
}
/* notify the user */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_unsuback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_UNSUBACK,
data, IDPOS_UNSUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* remove subscription from list */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
if (con->subscriptions == sub) {
con->subscriptions = sub->next;
}
else {
for (asymcute_sub_t *e = con->subscriptions; e && e->next; e = e->next) {
if (e->next == sub) {
e->next = e->next->next;
break;
}
}
}
/* reset subscription context */
sub->topic = NULL;
/* notify user */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ASYMCUTE_UNSUBSCRIBED);
}
static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote)
{
size_t len;
size_t pos = _len_get(con->rxbuf, &len);
/* make sure the incoming data was send by 'our' gateway */
if (!sock_udp_ep_equal(&con->server_ep, remote)) {
return;
}
/* validate incoming data: verify message length */
if ((pkt_len < 2) ||
(pkt_len <= pos) || (pkt_len < len)) {
/* length field of MQTT-SN packet seems to be invalid -> drop the pkt */
return;
}
/* figure out required action based on message type */
uint8_t type = con->rxbuf[pos];
switch (type) {
case MQTTSN_CONNACK:
_on_connack(con, con->rxbuf, len);
break;
case MQTTSN_DISCONNECT:
_on_disconnect(con, len);
break;
case MQTTSN_PINGREQ:
_on_pingreq(con);
break;
case MQTTSN_PINGRESP:
_on_pingresp(con);
break;
case MQTTSN_REGACK:
_on_regack(con, con->rxbuf, len);
break;
case MQTTSN_PUBLISH:
_on_publish(con, con->rxbuf, pos, len);
break;
case MQTTSN_PUBACK:
_on_puback(con, con->rxbuf, len);
break;
case MQTTSN_SUBACK:
_on_suback(con, con->rxbuf, len);
break;
case MQTTSN_UNSUBACK:
_on_unsuback(con, con->rxbuf, len);
break;
default:
break;
}
}
void *_listener(void *arg)
{
asymcute_con_t *con = (asymcute_con_t *)arg;
/* create a socket for this listener, using an ephemeral port */
sock_udp_ep_t local = SOCK_IPV6_EP_ANY;
if (sock_udp_create(&con->sock, &local, NULL, 0) != 0) {
LOG_ERROR("[asymcute] error creating listener socket\n");
return NULL;
}
while (1) {
sock_udp_ep_t remote;
int n = sock_udp_recv(&con->sock, con->rxbuf, ASYMCUTE_BUFSIZE,
SOCK_NO_TIMEOUT, &remote);
if (n > 0) {
_on_data(con, (size_t)n, &remote);
}
}
/* should never be reached */
return NULL;
}
void *_handler(void *arg)
{
(void)arg;
event_queue_init(&_queue);
event_loop(&_queue);
/* should never be reached */
return NULL;
}
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback)
{
/* make sure con is not running */
assert(con);
assert((priority > 0) && (priority < THREAD_PRIORITY_IDLE - 1));
assert(callback);
int ret = ASYMCUTE_OK;
/* make sure the connection context is not already used */
mutex_lock(&con->lock);
if (con->state != UNINITIALIZED) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* initialize the connection context */
memset(con, 0, sizeof(asymcute_con_t));
random_bytes((uint8_t *)&con->last_id, 2);
event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con);
event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super);
con->keepalive_retry_cnt = ASYMCUTE_N_RETRY;
con->state = NOTCON;
con->user_cb = callback;
/* start listener thread */
thread_create(stack,
stacksize,
priority,
THREAD_CREATE_WOUT_YIELD,
_listener,
con,
"asymcute_listener");
end:
mutex_unlock(&con->lock);
return ret;
}
void asymcute_handler_run(void)
{
thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO,
0, _handler, NULL, "asymcute_main");
}
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
uint16_t topic_id)
{
assert(topic);
size_t len = 0;
if (asymcute_topic_is_reg(topic)) {
return ASYMCUTE_REGERR;
}
if (topic_name == NULL) {
if ((topic_id == 0) || (topic_id == UINT16_MAX)) {
return ASYMCUTE_OVERFLOW;
}
}
else {
len = strlen(topic_name);
if ((len == 0) || (len > ASYMCUTE_TOPIC_MAXLEN)) {
return ASYMCUTE_OVERFLOW;
}
}
/* reset given topic */
asymcute_topic_reset(topic);
/* pre-defined topic ID? */
if (topic_name == NULL) {
topic->id = topic_id;
topic->flags = MQTTSN_TIT_PREDEF;
memcpy(topic->name, &topic_id, 2);
topic->name[2] = '\0';
}
else {
strncpy(topic->name, topic_name, sizeof(topic->name));
if (len == 2) {
memcpy(&topic->id, topic_name, 2);
topic->flags = MQTTSN_TIT_SHORT;
}
}
return ASYMCUTE_OK;
}
bool asymcute_is_connected(const asymcute_con_t *con)
{
return (con->state == CONNECTED);
}
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will)
{
assert(con);
assert(req);
assert(server);
assert(cli_id);
int ret = ASYMCUTE_OK;
size_t id_len = strlen(cli_id);
/* the will feature is not yet supported */
if (will) {
return ASYMCUTE_NOTSUP;
}
/* make sure the client ID will fit into the dedicated buffer */
if (id_len > ASYMCUTE_ID_MAXLEN) {
return ASYMCUTE_OVERFLOW;
}
/* check if the context is not already connected to any gateway */
mutex_lock(&con->lock);
if (con->state != NOTCON) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare the connection context */
con->state = CONNECTING;
strncpy(con->cli_id, cli_id, sizeof(con->cli_id));
memcpy(&con->server_ep, server, sizeof(con->server_ep));
/* compile and send connect message */
req->msg_id = 0;
req->data[0] = (uint8_t)(id_len + 6);
req->data[1] = MQTTSN_CONNECT;
req->data[2] = ((clean) ? MQTTSN_CS : 0);
req->data[3] = PROTOCOL_VERSION;
byteorder_htobebufs(&req->data[4], ASYMCUTE_KEEPALIVE);
memcpy(&req->data[6], cli_id, id_len);
req->data_len = (size_t)req->data[0];
_req_send(req, con, _on_con_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req)
{
assert(con);
assert(req);
int ret = ASYMCUTE_OK;
/* check if we are actually connected */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* put connection into TEARDOWN state */
_disconnect(con, TEARDOWN);
/* prepare and send disconnect message */
req->msg_id = 0;
req->data[0] = 2;
req->data[1] = MQTTSN_DISCONNECT;
req->data_len = 2;
_req_send(req, con, _on_discon_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_register(asymcute_con_t *con, asymcute_req_t *req,
asymcute_topic_t *topic)
{
assert(con);
assert(req);
assert(topic);
int ret = ASYMCUTE_OK;
/* test if topic is already registered */
if (asymcute_topic_is_reg(topic)) {
return ASYMCUTE_REGERR;
}
/* make sure we are connected */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare topic */
req->arg = (void *)topic;
size_t topic_len = strlen(topic->name);
/* prepare registration request */
req->msg_id = _msg_id_next(con);
size_t pos = _len_set(req->data, (topic_len + 5));
req->data[pos] = MQTTSN_REGISTER;
byteorder_htobebufs(&req->data[pos + 1], 0);
byteorder_htobebufs(&req->data[pos + 3], req->msg_id);
memcpy(&req->data[pos + 5], topic->name, topic_len);
req->data_len = (pos + 5 + topic_len);
/* send the request */
_req_send(req, con, NULL);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req,
const asymcute_topic_t *topic,
const void *data, size_t data_len, uint8_t flags)
{
assert(con);
assert(req);
assert(topic);
assert((data_len == 0) || data);
int ret = ASYMCUTE_OK;
/* check for valid flags */
if ((flags & VALID_PUBLISH_FLAGS) != flags) {
return ASYMCUTE_NOTSUP;
}
/* check for message size */
if ((data_len + 9) > ASYMCUTE_BUFSIZE) {
return ASYMCUTE_OVERFLOW;
}
/* make sure topic is registered */
if (!asymcute_topic_is_reg(topic) || (topic->con != con)) {
return ASYMCUTE_REGERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* get message id */
req->msg_id = _msg_id_next(con);
/* assemble message */
size_t pos = _len_set(req->data, data_len + 6);
req->data[pos] = MQTTSN_PUBLISH;
req->data[pos + 1] = (flags | topic->flags);
byteorder_htobebufs(&req->data[pos + 2], topic->id);
byteorder_htobebufs(&req->data[pos + 4], req->msg_id);
memcpy(&req->data[pos + 6], data, data_len);
req->data_len = (pos + 6 + data_len);
/* publish selected data */
if (flags & MQTTSN_QOS_1) {
_req_send(req, con, NULL);
}
else {
_req_send_once(req, con);
}
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub, asymcute_topic_t *topic,
asymcute_sub_cb_t callback, void *arg, uint8_t flags)
{
assert(con);
assert(req);
assert(sub);
assert(topic);
assert(callback);
int ret = ASYMCUTE_OK;
/* check flags for validity */
if ((flags & VALID_SUBSCRIBE_FLAGS) != flags) {
return ASYMCUTE_NOTSUP;
}
/* is topic initialized? (though it does not need to be registered) */
if (!asymcute_topic_is_init(topic)) {
return ASYMCUTE_REGERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* check if we are already subscribed to the given topic */
for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) {
if (asymcute_topic_equal(topic, sub->topic)) {
ret = ASYMCUTE_SUBERR;
goto end;
}
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare subscription context */
sub->cb = callback;
sub->arg = arg;
sub->topic = topic;
/* send SUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE);
_req_send(req, con, _on_suback_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub)
{
assert(con);
assert(req);
assert(sub);
int ret = ASYMCUTE_OK;
/* make sure the subscription is actually active */
if (!asymcute_sub_active(sub)) {
return ASYMCUTE_SUBERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare and send UNSUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE);
_req_send(req, con, NULL);
end:
mutex_unlock(&con->lock);
return ret;
}