1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

net: added emCute - introducing MQTT-SN support

This commit is contained in:
Hauke Petersen 2017-02-21 00:01:59 +01:00
parent 289febc1dd
commit bb71986ecf
7 changed files with 1089 additions and 0 deletions

View File

@ -591,6 +591,12 @@ ifneq (,$(filter random,$(USEMODULE)))
endif
endif
ifneq (,$(filter emcute,$(USEMODULE)))
USEMODULE += core_thread_flags
USEMODULE += sock_udp
USEMODULE += xtimer
endif
# include package dependencies
-include $(USEPKG:%=$(RIOTPKG)/%/Makefile.dep)

View File

@ -108,6 +108,9 @@ endif
ifneq (,$(filter gcoap,$(USEMODULE)))
DIRS += net/application_layer/coap
endif
ifneq (,$(filter emcute,$(USEMODULE)))
DIRS += net/application_layer/emcute
endif
DIRS += $(dir $(wildcard $(addsuffix /Makefile, ${USEMODULE})))

381
sys/include/net/emcute.h Normal file
View File

@ -0,0 +1,381 @@
/*
* Copyright (C) 2017 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup net_emcute MQTT-SN Client (emCute)
* @ingroup net
* @brief emCute, the MQTT-SN implementation for RIOT
*
* # About
* emCute is the implementation of the OASIS MQTT-SN protocol for RIOT. It is
* designed with a focus on small memory footprint and usability.
*
*
* # Design Decisions and Restrictions
* * emCute is designed to run on top of UDP only, making use of
* @ref net_sock_udp. The design is not intended to be used with any other
* transport.
*
* The implementation is based on a 2-thread model: emCute needs one thread of
* its own, in which receiving of packets and sending of ping messages are
* handled. All 'user space functions' have to run from (a) different (i.e.
* user) thread(s). emCute uses thread flags to synchronize between threads.
*
* Further know restrictions are:
* - ASCII topic names only (no support for UTF8 names, yet)
* - topic length is restricted to fit in a single length byte (248 byte max)
* - no support for wildcards in topic names. This feature requires more
* elaborate internal memory management, supposedly at the cost of quite
* increased ROM and RAM usage
* - no retransmit when receiving a REJ_CONG (reject, reason congestion). when
* getting a REJ_CONG (reject, reason congestion), the spec tells us to resend
* the original message after T_WAIT (default: >5min). This is not supported,
* as this would require to block to calling thread (or keep state) for long
* periods of time and is (in Hauke's opinion) not feasible for constrained
* nodes.
*
*
* # Error Handling
* This implementation tries minimize parameter checks to a minimum, checking as
* many parameters as feasible using assertions. For the sake of run-time
* stability and usability, typical overflow checks are always done during run-
* time and explicit error values returned in case of errors.
*
*
* # Implementation state
* In the current state, emCute supports:
* - connecting to a gateway
* - disconnecting from gateway
* - registering a last will topic and message during connection setup
* - registering topic names with the gateway (obtaining topic IDs)
* - subscribing to topics
* - unsubscribing from topics
* - updating will topic
* - updating will message
* - sending out periodic PINGREQ messages
* - handling re-transmits
*
* The following features are however still missing (but planned):
* @todo Gateway discovery (so far there is no support for handling
* ADVERTISE, GWINFO, and SEARCHGW). Open question to answer here:
* how to put / how to encode the IPv(4/6) address AND the port of
* a gateway in the GwAdd field of the GWINFO message
* @todo QOS level 2
* @todo put the node to sleep (send DISCONNECT with duration field set)
* @todo handle DISCONNECT messages initiated by the broker/gateway
* @todo support for pre-defined and short topic IDs
* @todo handle (previously) active subscriptions on reconnect/disconnect
* @todo handle re-connect/disconnect from unresponsive gateway (in case
* a number of ping requests are unanswered)
* @todo react only to incoming ping requests that are actually send by
* the gateway we are connected to
*
* @{
* @file
* @brief emCute MQTT-SN interface definition
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef MQTTSN_H
#define MQTTSN_H
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "net/sock/udp.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef EMCUTE_DEFAULT_PORT
/**
* @brief Default UDP port to listen on (also used as SRC port)
*/
#define EMCUTE_DEFAULT_PORT (1883U)
#endif
#ifndef EMCUTE_BUFSIZE
/**
* @brief Buffer size used for emCute's transmit and receive buffers
*
* The overall buffer size used by emCute is this value time two (Rx + Tx).
*/
#define EMCUTE_BUFSIZE (512U)
#endif
#ifndef EMCUTE_ID_MAXLEN
/**
* @brief Maximum client ID length
*
* @note **Must** be less than (256 - 6) AND less than
* (@ref EMCUTE_BUFSIZE - 6).
*/
#define EMCUTE_ID_MAXLEN (196U)
#endif
#ifndef EMCUTE_TOPIC_MAXLEN
/**
* @brief Maximum topic length
*
* @note **Must** be less than (256 - 6) AND less than
* (@ref EMCUTE_BUFSIZE - 6).
*/
#define EMCUTE_TOPIC_MAXLEN (196U)
#endif
#ifndef EMCUTE_KEEPALIVE
/**
* @brief Keep-alive interval [in s]
*
* The node will communicate this interval to the gateway send a ping message
* every time when this amount of time has passed.
*
* For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min
*/
#define EMCUTE_KEEPALIVE (360) /* -> 6 min*/
#endif
#ifndef EMCUTE_T_RETRY
/**
* @brief Re-send interval [in seconds]
*
* For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec
*/
#define EMCUTE_T_RETRY (15U) /* -> 15 sec */
#endif
#ifndef EMCUTE_N_RETRY
/**
* @brief Number of retries when sending packets
*
* For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5
*/
#define EMCUTE_N_RETRY (3U)
#endif
/**
* @brief MQTT-SN flags
*
* All MQTT-SN functions only support a sub-set of the available flags. It is up
* to the user to only supply valid/supported flags to a function. emCute will
* trigger assertion fails on the use of unsupported flags (if compiled with
* DEVELHELP).
*
* Refer to the MQTT-SN spec section 5.3.4 for further information.
*/
enum {
EMCUTE_DUP = 0x80, /**< duplicate flag */
EMCUTE_QOS_MASK = 0x60, /**< QoS level mask */
EMCUTE_QOS_2 = 0x40, /**< QoS level 2 */
EMCUTE_QOS_1 = 0x20, /**< QoS level 1 */
EMCUTE_QOS_0 = 0x00, /**< QoS level 0 */
EMCUTE_RETAIN = 0x10, /**< retain flag */
EMCUTE_WILL = 0x08, /**< will flag, used during CONNECT */
EMCUTE_CS = 0x04, /**< clean session flag */
EMCUTE_TIT_MASK = 0x03, /**< topic ID type mask */
EMCUTE_TIT_SHORT = 0x02, /**< topic ID: short */
EMCUTE_TIT_PREDEF = 0x01, /**< topic ID: pre-defined */
EMCUTE_TIT_NORMAL = 0x00 /**< topic ID: normal */
};
/**
* @brief Possible emCute return values
*/
enum {
EMCUTE_OK = 0, /**< everything went as expect */
EMCUTE_NOGW = -1, /**< error: not connected to a gateway */
EMCUTE_REJECT = -2, /**< error: operation was rejected by broker */
EMCUTE_OVERFLOW = -3, /**< error: ran out of buffer space */
EMCUTE_TIMEOUT = -4, /**< error: timeout */
EMCUTE_NOTSUP = -5 /**< error: feature not supported */
};
/**
* @brief MQTT-SN topic
*/
typedef struct {
const char *name; /**< topic string (currently ACSII only) */
uint16_t id; /**< topic id, as assigned by the gateway */
} emcute_topic_t;
/**
* @brief Signature for callbacks fired when publish messages are received
*
* @param[in] topic topic the received data was published on
* @param[in] data published data
* @param[in] len length of @p data in bytes
*/
typedef void(*emcute_cb_t)(const emcute_topic_t *topic, void *data, size_t len);
/**
* @brief Data-structure for keeping track of topics we register to
*/
typedef struct emcute_sub {
struct emcute_sub *next; /**< next subscription (saved in a list) */
emcute_topic_t topic; /**< topic we subscribe to */
emcute_cb_t cb; /**< function called when receiving messages */
void *arg; /**< optional custom argument */
} emcute_sub_t;
/**
* @brief Connect to a given MQTT-SN gateway (CONNECT)
*
* When called while already connected to a gateway, call emcute_discon() first
* to disconnect from the current gateway.
*
* @param[in] remote address and port of the target MQTT-SN gateway
* @param[in] clean set to true to start a clean session
* @param[in] will_topic last will topic name, no last will will be
* configured if set to NULL
* @param[in] will_msg last will message content, will be ignored if
* @p will_topic is set to NULL
* @param[in] will_msg_len length of @p will_msg in byte
* @param[in] flags flags used for the last will, allowed are retain and
* QoS
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if already connected to a gateway
* @return EMCUTE_REJECT on connection refused by gateway
* @return EMCUTE_TIMEOUT on connection timeout
*/
int emcute_con(sock_udp_ep_t *remote, bool clean, const char *will_topic,
const void *will_msg, size_t will_msg_len, unsigned flags);
/**
* @brief Disconnect from the gateway we are currently connected to
*
* @return EMCUTE_OK on success
* @return EMCUTE_GW if not connected to a gateway
* @return EMCUTE_TIMEOUT on response timeout
*/
int emcute_discon(void);
/**
* @brief Get a topic ID for the given topic name from the gateway
*
* @param[in,out] topic topic to register, topic.name **must not** be NULL
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_OVERFLOW if length of topic name exceeds
* @ref EMCUTE_TOPIC_MAXLEN
* @return EMCUTE_TIMEOUT on connection timeout
*/
int emcute_reg(emcute_topic_t *topic);
/**
* @brief Publish data on the given topic
*
* @param[in] topic topic to send data to, topic **must** be registered
* (topic.id **must** populated).
* @param[in] buf data to publish
* @param[in] len length of @p data in bytes
* @param[in] flags flags used for publication, allowed are QoS and retain
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_REJECT if publish message was rejected (QoS > 0 only)
* @return EMCUTE_OVERFLOW if length of data exceeds @ref EMCUTE_BUFSIZE
* @return EMCUTE_TIMEOUT on connection timeout (QoS > 0 only)
* @return EMCUTE_NOTSUP on unsupported flag values
*/
int emcute_pub(emcute_topic_t *topic, const void *buf, size_t len,
unsigned flags);
/**
* @brief Subscribe to the given topic
*
* When calling this function, @p sub->topic.name and @p sub->cb **must** be
* set.
*
* @param[in,out] sub subscription context, @p sub->topic.name and @p sub->cb
* **must** not be NULL.
* @param[in] flags flags used when subscribing, allowed are QoS, DUP, and
* topic ID type
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_OVERFLOW if length of topic name exceeds
* @ref EMCUTE_TOPIC_MAXLEN
* @return EMCUTE_TIMEOUT on connection timeout
*/
int emcute_sub(emcute_sub_t *sub, unsigned flags);
/**
* @brief Unsubscripbe the given topic
*
* @param[in] sub subscription context
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_TIMEOUT on connection timeout
*/
int emcute_unsub(emcute_sub_t *sub);
/**
* @brief Update the last will topic
*
* @param[in] topic new last will topic
* @param[in] flags flags used for the topic, allowed are QoS and retain
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_OVERFLOW if length of topic name exceeds
* @ref EMCUTE_TOPIC_MAXLEN
* @return EMCUTE_REJECT on rejection by the gateway
* @return EMCUTE_TIMEOUT on response timeout
*/
int emcute_willupd_topic(const char *topic, unsigned flags);
/**
* @brief Update the last will message
*
* @param[in] data new message to send on last will
* @param[in] len length of @p data in bytes
*
* @return EMCUTE_OK on success
* @return EMCUTE_NOGW if not connected to a gateway
* @return EMCUTE_OVERFLOW if length of the given message exceeds
* @ref EMCUTE_BUFSIZE
* @return EMCUTE_REJECT on rejection by the gateway
* @return EMCUTE_TIMEOUT on response timeout
*/
int emcute_willupd_msg(const void *data, size_t len);
/**
* @brief Run emCute, will 'occupy' the calling thread
*
* This function will run the emCute message receiver. It will block the thread
* it is running in.
*
* @param[in] port UDP port used for listening (default: 1883)
* @param[in] id client ID (should be unique)
*/
void emcute_run(uint16_t port, const char *id);
/**
* @brief Return the string representation of the given type value
*
* This function is for debugging purposes.
*
* @param[in] type MQTT-SN message type
*
* @return string representation of the given type
* @return 'UNKNOWN' on invalid type value
*/
const char *emcute_type_str(uint8_t type);
#ifdef __cplusplus
}
#endif
#endif /* MQTTSN_H */
/** @} */

View File

@ -0,0 +1 @@
include $(RIOTBASE)/Makefile.base

View File

@ -0,0 +1,568 @@
/*
* Copyright (C) 2017 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup net_emcute
* @{
*
* @file
* @brief MQTT-SN implementation
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*
* @}
*/
#include <string.h>
#include "log.h"
#include "mutex.h"
#include "sched.h"
#include "xtimer.h"
#include "thread_flags.h"
#include "net/emcute.h"
#include "emcute_internal.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
#define PROTOCOL_VERSION (0x01)
#define PUB_FLAGS (EMCUTE_QOS_MASK | EMCUTE_RETAIN)
#define SUB_FLAGS (EMCUTE_DUP | EMCUTE_QOS_MASK | EMCUTE_TIT_MASK)
#define TFLAGS_RESP (0x0001)
#define TFLAGS_TIMEOUT (0x0002)
#define TFLAGS_ANY (TFLAGS_RESP | TFLAGS_TIMEOUT)
static const char *cli_id;
static sock_udp_t sock;
static sock_udp_ep_t gateway;
static uint8_t rbuf[EMCUTE_BUFSIZE];
static uint8_t tbuf[EMCUTE_BUFSIZE];
static emcute_sub_t *subs = NULL;
static mutex_t txlock;
static xtimer_t timer;
static uint16_t id_next = 0x1234;
static volatile uint8_t waiton = 0xff;
static volatile uint16_t waitonid = 0;
static volatile int result;
static inline uint16_t get_u16(const uint8_t *buf)
{
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return (uint16_t)((buf[0] << 8) | buf[1]);
#else
return (uint16_t)((buf[1] << 8) | buf[0]);
#endif
}
static inline void set_u16(uint8_t *buf, uint16_t val)
{
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
buf[0] = (uint8_t)(val >> 8);
buf[1] = (uint8_t)(val & 0xff);
#else
buf[0] = (uint8_t)(val & 0xff);
buf[1] = (uint8_t)(val >> 8);
#endif
}
static int set_len(uint8_t *buf, size_t len)
{
if (len < (0xff - 7)) {
buf[0] = len + 1;
return 1;
}
else {
buf[0] = 0x01;
set_u16(&tbuf[1], (uint16_t)(len + 3));
return 3;
}
}
static int get_len(uint8_t *buf, uint16_t *len)
{
if (buf[0] != 0x01) {
*len = (uint16_t)buf[0];
return 1;
}
else {
*len = get_u16(&buf[1]);
return 3;
}
}
static void time_evt(void *arg)
{
thread_flags_set((thread_t *)arg, TFLAGS_TIMEOUT);
}
static int syncsend(uint8_t resp, size_t len, bool unlock)
{
int res = EMCUTE_TIMEOUT;
waiton = resp;
timer.arg = (void *)sched_active_thread;
/* clear flags, in case the timer was triggered last time right before the
* remove was called */
thread_flags_clear(TFLAGS_ANY);
for (unsigned retries = 0; retries < EMCUTE_N_RETRY; retries++) {
DEBUG("[emcute] syncsend: sending round %i\n", retries);
sock_udp_send(&sock, tbuf, len, &gateway);
xtimer_set(&timer, (EMCUTE_T_RETRY * US_PER_SEC));
thread_flags_t flags = thread_flags_wait_any(TFLAGS_ANY);
if (flags & TFLAGS_RESP) {
DEBUG("[emcute] syncsend: got response [%i]\n", result);
xtimer_remove(&timer);
res = result;
retries = EMCUTE_N_RETRY;
}
}
/* cleanup sync state */
waiton = 0xff;
if (unlock) {
mutex_unlock(&txlock);
}
return res;
}
static void on_disconnect(void)
{
if (waiton == DISCONNECT) {
gateway.port = 0;
result = EMCUTE_OK;
thread_flags_set((thread_t *)timer.arg, TFLAGS_RESP);
}
}
static void on_ack(uint8_t type, int id_pos, int ret_pos, int res_pos)
{
if ((waiton == type) && (!id_pos || (waitonid == get_u16(&rbuf[id_pos])))) {
if (!ret_pos || (rbuf[ret_pos] == ACCEPT)) {
if (res_pos == 0) {
result = EMCUTE_OK;
} else {
result = (int)get_u16(&rbuf[res_pos]);
}
} else {
result = EMCUTE_REJECT;
}
thread_flags_set((thread_t *)timer.arg, TFLAGS_RESP);
}
}
static void on_publish(void)
{
emcute_sub_t *sub;
uint16_t len;
int pos = get_len(rbuf, &len);
uint16_t tid = get_u16(&rbuf[pos + 2]);
/* allocate a response packet */
uint8_t buf[7] = { 7, PUBACK, 0, 0, 0, 0, ACCEPT };
/* and populate message ID and topic ID fields */
memcpy(&buf[2], &rbuf[3], 4);
/* return error code in case we don't support/understand active flags. So
* far we only understand QoS 1... */
if (rbuf[2] & ~(EMCUTE_QOS_1 | EMCUTE_TIT_SHORT)) {
buf[6] = REJ_NOTSUP;
sock_udp_send(&sock, &buf, 7, &gateway);
return;
}
/* find the registered topic */
for (sub = subs; sub && (sub->topic.id != tid); sub = sub->next) {}
if (sub == NULL) {
buf[6] = REJ_INVTID;
sock_udp_send(&sock, &buf, 7, &gateway);
DEBUG("[emcute] on pub: no subscription found\n");
}
else {
if (rbuf[2] & EMCUTE_QOS_1) {
sock_udp_send(&sock, &buf, 7, &gateway);
}
DEBUG("[emcute] on pub: got %i bytes of data\n", (int)(len - pos - 6));
sub->cb(&sub->topic, &rbuf[pos + 6], (size_t)(len - pos - 6));
}
}
static void on_pingreq(sock_udp_ep_t *remote)
{
/* @todo respond with a PINGRESP only if the PINGREQ came from the
* connected gateway -> see spec v1.2, section 6.11 */
uint8_t buf[2] = { 2, PINGRESP };
sock_udp_send(&sock, &buf, 2, remote);
}
static void on_pingresp(void)
{
/** @todo: trigger update something like a 'last seen' value */
}
static void send_ping(void)
{
if (gateway.port != 0) {
uint8_t buf[2] = { 2, PINGREQ };
sock_udp_send(&sock, &buf, 2, &gateway);
}
}
int emcute_con(sock_udp_ep_t *remote, bool clean, const char *will_topic,
const void *will_msg, size_t will_msg_len, unsigned will_flags)
{
int res;
size_t len;
assert(!will_topic || (will_topic && will_msg && !(will_flags & ~PUB_FLAGS)));
mutex_lock(&txlock);
/* check for existing connections and copy given UDP endpoint */
if (gateway.port != 0) {
return EMCUTE_NOGW;
}
memcpy(&gateway, remote, sizeof(sock_udp_ep_t));
/* figure out which flags to set */
uint8_t flags = (clean) ? EMCUTE_CS : 0;
if (will_topic) {
flags |= EMCUTE_WILL;
}
/* compute packet size */
len = (strlen(cli_id) + 6);
tbuf[0] = (uint8_t)len;
tbuf[1] = CONNECT;
tbuf[2] = flags;
tbuf[3] = PROTOCOL_VERSION;
set_u16(&tbuf[4], EMCUTE_KEEPALIVE);
memcpy(&tbuf[6], cli_id, strlen(cli_id));
/* configure 'state machine' and send the connection request */
if (will_topic) {
size_t topic_len = strlen(will_topic);
if ((topic_len > EMCUTE_TOPIC_MAXLEN) ||
((will_msg_len + 4) > EMCUTE_BUFSIZE)) {
gateway.port = 0;
return EMCUTE_OVERFLOW;
}
res = syncsend(WILLTOPICREQ, len, false);
if (res != EMCUTE_OK) {
gateway.port = 0;
return res;
}
/* now send WILLTOPIC */
int pos = set_len(tbuf, (topic_len + 2));
len = (pos + topic_len + 2);
tbuf[pos++] = WILLTOPIC;
tbuf[pos++] = will_flags;
memcpy(&tbuf[pos], will_topic, strlen(will_topic));
res = syncsend(WILLMSGREQ, len, false);
if (res != EMCUTE_OK) {
gateway.port = 0;
return res;
}
/* and WILLMSG afterwards */
pos = set_len(tbuf, (will_msg_len + 1));
len = (pos + will_msg_len + 1);
tbuf[pos++] = WILLMSG;
memcpy(&tbuf[pos], will_msg, will_msg_len);
}
res = syncsend(CONNACK, len, true);
if (res != EMCUTE_OK) {
gateway.port = 0;
}
return res;
}
int emcute_discon(void)
{
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
mutex_lock(&txlock);
tbuf[0] = 2;
tbuf[1] = DISCONNECT;
return syncsend(DISCONNECT, 2, true);
}
int emcute_reg(emcute_topic_t *topic)
{
assert(topic && topic->name);
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
if (strlen(topic->name) > EMCUTE_TOPIC_MAXLEN) {
return EMCUTE_OVERFLOW;
}
mutex_lock(&txlock);
tbuf[0] = (strlen(topic->name) + 6);
tbuf[1] = REGISTER;
set_u16(&tbuf[2], 0);
set_u16(&tbuf[4], id_next);
waitonid = id_next++;
memcpy(&tbuf[6], topic->name, strlen(topic->name));
int res = syncsend(REGACK, (size_t)tbuf[0], true);
if (res > 0) {
topic->id = (uint16_t)res;
res = EMCUTE_OK;
}
return res;
}
int emcute_pub(emcute_topic_t *topic, const void *data, size_t len,
unsigned flags)
{
int res = EMCUTE_OK;
assert((topic->id != 0) && data && (len > 0) && !(flags & ~PUB_FLAGS));
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
if (len >= (EMCUTE_BUFSIZE - 9)) {
return EMCUTE_OVERFLOW;
}
if (flags & EMCUTE_QOS_2) {
return EMCUTE_NOTSUP;
}
mutex_lock(&txlock);
int pos = set_len(tbuf, (len + 6));
len += (pos + 6);
tbuf[pos++] = PUBLISH;
tbuf[pos++] = flags;
set_u16(&tbuf[pos], topic->id);
pos += 2;
set_u16(&tbuf[pos], id_next);
waitonid = id_next++;
pos += 2;
memcpy(&tbuf[pos], data, len);
if (flags & EMCUTE_QOS_1) {
res = syncsend(PUBACK, len, true);
}
else {
sock_udp_send(&sock, tbuf, len, &gateway);
mutex_unlock(&txlock);
}
return res;
}
int emcute_sub(emcute_sub_t *sub, unsigned flags)
{
assert(sub && (sub->cb) && (sub->topic.name) && !(flags & ~SUB_FLAGS));
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
if (strlen(sub->topic.name) > EMCUTE_TOPIC_MAXLEN) {
return EMCUTE_OVERFLOW;
}
mutex_lock(&txlock);
tbuf[0] = (strlen(sub->topic.name) + 5);
tbuf[1] = SUBSCRIBE;
tbuf[2] = flags;
set_u16(&tbuf[3], id_next);
waitonid = id_next++;
memcpy(&tbuf[5], sub->topic.name, strlen(sub->topic.name));
int res = syncsend(SUBACK, (size_t)tbuf[0], false);
if (res > 0) {
DEBUG("[emcute] sub: success, topic id is %i\n", res);
sub->topic.id = res;
/* check if subscription is already in the list, only insert if not*/
emcute_sub_t *s;
for (s = subs; s && (s != sub); s = s->next) {}
if (!s) {
sub->next = subs;
subs = sub;
res = EMCUTE_OK;
}
}
mutex_unlock(&txlock);
return res;
}
int emcute_unsub(emcute_sub_t *sub)
{
assert(sub && sub->topic.name);
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
mutex_lock(&txlock);
tbuf[0] = (strlen(sub->topic.name) + 5);
tbuf[1] = UNSUBSCRIBE;
tbuf[2] = 0;
set_u16(&tbuf[3], id_next);
waitonid = id_next++;
memcpy(&tbuf[5], sub->topic.name, strlen(sub->topic.name));
int res = syncsend(UNSUBACK, (size_t)tbuf[0], false);
if (res == EMCUTE_OK) {
if (subs == sub) {
subs = sub->next;
}
else {
emcute_sub_t *s;
for (s = subs; s; s = s->next) {
if (s->next == sub) {
s->next = sub->next;
break;
}
}
}
}
mutex_unlock(&txlock);
return res;
}
int emcute_willupd_topic(const char *topic, unsigned flags)
{
assert(!(flags & ~PUB_FLAGS));
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
if (topic && (strlen(topic) > EMCUTE_TOPIC_MAXLEN)) {
return EMCUTE_OVERFLOW;
}
mutex_lock(&txlock);
tbuf[1] = WILLTOPICUPD;
if (!topic) {
tbuf[0] = 2;
}
else {
tbuf[0] = (strlen(topic) + 3);
tbuf[2] = flags;
memcpy(&tbuf[3], topic, strlen(topic));
}
return syncsend(WILLTOPICRESP, (size_t)tbuf[0], true);
}
int emcute_willupd_msg(const void *data, size_t len)
{
assert(data && (len > 0));
if (gateway.port == 0) {
return EMCUTE_NOGW;
}
if (len > (EMCUTE_BUFSIZE - 4)) {
return EMCUTE_OVERFLOW;
}
mutex_lock(&txlock);
int pos = set_len(tbuf, (len + 1));
len += (pos + 1);
tbuf[pos++] = WILLMSGUPD;
memcpy(&tbuf[pos], data, len);
return syncsend(WILLMSGRESP, len, true);
}
void emcute_run(uint16_t port, const char *id)
{
assert(strlen(id) < EMCUTE_ID_MAXLEN);
sock_udp_ep_t local = SOCK_IPV6_EP_ANY;
sock_udp_ep_t remote;
local.port = port;
cli_id = id;
timer.callback = time_evt;
timer.arg = NULL;
mutex_init(&txlock);
if (sock_udp_create(&sock, &local, NULL, 0) < 0) {
LOG_ERROR("[emcute] unable to open UDP socket on port %i\n", (int)port);
return;
}
uint32_t start = xtimer_now_usec();
uint32_t t_out = (EMCUTE_KEEPALIVE * US_PER_SEC);
while (1) {
ssize_t len = sock_udp_recv(&sock, rbuf, sizeof(rbuf), t_out, &remote);
if ((len < 0) && (len != -ETIMEDOUT)) {
LOG_ERROR("[emcute] error while receiving UDP packet\n");
return;
}
if (len >= 2) {
/* handle the packet */
uint16_t pkt_len;
int pos = get_len(rbuf, &pkt_len);
uint8_t type = rbuf[pos];
switch (type) {
case CONNACK: on_ack(type, 0, 2, 0); break;
case WILLTOPICREQ: on_ack(type, 0, 0, 0); break;
case WILLMSGREQ: on_ack(type, 0, 0, 0); break;
case REGACK: on_ack(type, 4, 6, 2); break;
case PUBLISH: on_publish(); break;
case PUBACK: on_ack(type, 4, 6, 0); break;
case SUBACK: on_ack(type, 5, 7, 3); break;
case UNSUBACK: on_ack(type, 2, 0, 0); break;
case PINGREQ: on_pingreq(&remote); break;
case PINGRESP: on_pingresp(); break;
case DISCONNECT: on_disconnect(); break;
case WILLTOPICRESP: on_ack(type, 0, 0, 0); break;
case WILLMSGRESP: on_ack(type, 0, 0, 0); break;
default:
LOG_DEBUG("[emcute] received unexpected type [%s]\n",
emcute_type_str(type));
}
}
uint32_t now = xtimer_now_usec();
if ((now - start) >= (EMCUTE_KEEPALIVE * US_PER_SEC)) {
send_ping();
start = now;
t_out = (EMCUTE_KEEPALIVE * US_PER_SEC);
}
else {
t_out = (EMCUTE_KEEPALIVE * US_PER_SEC) - (now - start);
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright (C) 2017 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup net_emcute
* @{
*
* @file
* @brief emCute internals
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef EMCUTE_INTERNAL_H
#define EMCUTE_INTERNAL_H
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief MQTT-SN message types
*/
enum {
ADVERTISE = 0x00, /**< advertise message */
SEARCHGW = 0x01, /**< search gateway message */
GWINFO = 0x02, /**< gateway info message */
CONNECT = 0x04, /**< connect message */
CONNACK = 0x05, /**< connection acknowledgment message */
WILLTOPICREQ = 0x06, /**< will topic request */
WILLTOPIC = 0x07, /**< will topic */
WILLMSGREQ = 0x08, /**< will message request */
WILLMSG = 0x09, /**< will message */
REGISTER = 0x0a, /**< topic registration request */
REGACK = 0x0b, /**< topic registration acknowledgment */
PUBLISH = 0x0c, /**< publish message */
PUBACK = 0x0d, /**< publish acknowledgment */
PUBCOMP = 0x0e, /**< publish received (QoS 2) */
PUBREC = 0x0f, /**< publish complete (QoS 2) */
PUBREL = 0x10, /**< publish release (QoS 2) */
SUBSCRIBE = 0x12, /**< subscribe message */
SUBACK = 0x13, /**< subscription acknowledgment */
UNSUBSCRIBE = 0x14, /**< unsubscribe message */
UNSUBACK = 0x15, /**< unsubscription acknowledgment */
PINGREQ = 0x16, /**< ping request */
PINGRESP = 0x17, /**< ping response */
DISCONNECT = 0x18, /**< disconnect message */
WILLTOPICUPD = 0x1a, /**< will topic update request */
WILLTOPICRESP = 0x1b, /**< will topic update response */
WILLMSGUPD = 0x1c, /**< will message update request */
WILLMSGRESP = 0x1d /**< will topic update response */
};
/**
* @brief MQTT-SN return codes
*/
enum {
ACCEPT = 0x00, /**< all good */
REJ_CONG = 0x01, /**< reject, reason: congestions */
REJ_INVTID = 0x02, /**< reject, reason: invalid topic ID */
REJ_NOTSUP = 0x03 /**< reject, reason: operation not supported */
};
#ifdef __cplusplus
}
#endif
#endif /* EMCUTE_INTERNAL_H */
/** @} */

View File

@ -0,0 +1,56 @@
/*
* Copyright (C) 2017 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup net_emcute
* @{
*
* @file
* @brief emCute string functions (for debugging purposes)
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*
* @}
*/
#include "net/emcute.h"
#include "emcute_internal.h"
const char *emcute_type_str(uint8_t type)
{
switch (type) {
case ADVERTISE: return "ADVERTISE";
case SEARCHGW: return "SEARCHGW";
case GWINFO: return "GWINFO";
case CONNECT: return "CONNECT";
case CONNACK: return "CONNACK";
case WILLTOPICREQ: return "WILLTOPICREQ";
case WILLTOPIC: return "WILLTOPIC";
case WILLMSGREQ: return "WILLMSGREQ";
case WILLMSG: return "WILLMSG";
case REGISTER: return "REGISTER";
case REGACK: return "REGACK";
case PUBLISH: return "PUBLISH";
case PUBACK: return "PUBACK";
case PUBCOMP: return "PUBCOMP";
case PUBREC: return "PUBREC";
case PUBREL: return "PUBREL";
case SUBSCRIBE: return "SUBSCRIBE";
case SUBACK: return "SUBACK";
case UNSUBSCRIBE: return "UNSUBSCRIBE";
case UNSUBACK: return "UNSUBACK";
case PINGREQ: return "PINGREQ";
case PINGRESP: return "PINGRESP";
case DISCONNECT: return "DISCONNECT";
case WILLTOPICUPD: return "WILLTOPICUPD";
case WILLTOPICRESP: return "WILLTOPICRESP";
case WILLMSGUPD: return "WILLMSGUPD";
case WILLMSGRESP: return "WILLMSGRESP";
default: return "UNKNOWN";
}
}