/* * Copyright (C) 2019 Javier FILEIV * * This file is subject to the terms and conditions of the GNU Lesser * General Public License v2.1. See the file LICENSE in the top level * directory for more details. */ /** * @brief MQTT common RIOT interface functions * * @author Javier FILEIV */ #include #include #include #ifdef MODULE_IPV6_ADDR #include "net/ipv6/addr.h" #endif #ifdef MODULE_IPV4_ADDR #include "net/ipv4/addr.h" #endif #include "net/dns.h" #include "net/sock/tcp.h" #include "paho_mqtt.h" #include "MQTTClient.h" #include "timex.h" #include "ztimer.h" #include "tsrb.h" #include "log.h" #define ENABLE_DEBUG 0 #include "debug.h" #define IP_MAX_LEN_ADDRESS (39) /*IPv6 max length */ #ifndef TSRB_MAX_SIZE #define TSRB_MAX_SIZE (1024) #endif static uint8_t buffer[TSRB_MAX_SIZE]; static uint8_t _temp_buf[TSRB_MAX_SIZE]; static tsrb_t tsrb_lwip_tcp; #ifndef PAHO_MQTT_YIELD_MS #define PAHO_MQTT_YIELD_MS (10) #endif static int mqtt_read(struct Network *n, unsigned char *buf, int len, int timeout_ms) { int _timeout; int _len; void *_buf; int rc = -1; if (IS_USED(MODULE_LWIP)) { /* As LWIP doesn't support packet reading byte per byte and * PAHO MQTT reads like that to decode it on the fly, * we read TSRB_MAX_SIZE at once and keep them in a ring buffer. */ _buf = _temp_buf; _len = TSRB_MAX_SIZE; _timeout = 0; } else { _buf = buf; _len = len; _timeout = timeout_ms; } uint32_t send_time = ztimer_now(ZTIMER_MSEC) + timeout_ms; do { rc = sock_tcp_read(&n->sock, _buf, _len, _timeout); if ((rc == -EAGAIN) || (rc == -ETIMEDOUT)) { rc = 0; } if (IS_USED(MODULE_LWIP)) { if (rc > 0) { tsrb_add(&tsrb_lwip_tcp, _temp_buf, rc); } rc = tsrb_get(&tsrb_lwip_tcp, buf, len); } } while (rc < len && ztimer_now(ZTIMER_MSEC) < send_time && rc >= 0); if (IS_ACTIVE(ENABLE_DEBUG) && IS_USED(MODULE_LWIP) && rc > 0) { DEBUG("MQTT buf asked for %d, available to read %d\n", rc, tsrb_avail(&tsrb_lwip_tcp)); for (int i = 0; i < rc; i++) { DEBUG("0x%02X ", buf[i]); } DEBUG("\n"); } return rc; } static int mqtt_write(struct Network *n, unsigned char *buf, int len, int timeout_ms) { /* timeout is controlled by upper layer in PAHO */ (void) timeout_ms; return sock_tcp_write(&n->sock, buf, len); } void NetworkInit(Network *n) { if (IS_USED(MODULE_LWIP)) { tsrb_init(&tsrb_lwip_tcp, buffer, TSRB_MAX_SIZE); } n->mqttread = mqtt_read; n->mqttwrite = mqtt_write; } int NetworkConnect(Network *n, char *addr_ip, int port) { int ret =-1; sock_tcp_ep_t remote = SOCK_IPV4_EP_ANY; ret = dns_query(addr_ip, &remote.addr, AF_UNSPEC); if (ret > 0) { remote.port = port; remote.family = ret == 4 ? AF_INET : AF_INET6; } if (IS_USED(MODULE_IPV4_ADDR) && (remote.port == 0) && ipv4_addr_from_str((ipv4_addr_t *)&remote.addr, addr_ip)) { remote.port = port; } if (IS_USED(MODULE_IPV6_ADDR) && (remote.port == 0) && ipv6_addr_from_str((ipv6_addr_t *)&remote.addr, addr_ip)) { remote.port = port; remote.family = AF_INET6; } if (remote.port == 0) { LOG_ERROR("Error: unable to parse destination address\n"); return ret; } ret = sock_tcp_connect(&n->sock, &remote, 0, 0); return ret; } void NetworkDisconnect(Network *n) { sock_tcp_disconnect(&n->sock); } void TimerInit(Timer *timer) { timer->timeout = 0; timer->time_set = 0; } char TimerIsExpired(Timer *timer) { return (TimerLeftMS(timer) == 0); } void TimerCountdownMS(Timer *timer, unsigned int timeout_ms) { timer->time_set = ztimer_now(ZTIMER_MSEC); timer->timeout = timeout_ms; } void TimerCountdown(Timer *timer, unsigned int timeout_s) { TimerCountdownMS(timer, timeout_s * MS_PER_SEC); } int TimerLeftMS(Timer *timer) { uint32_t left_time = ztimer_now(ZTIMER_MSEC) - timer->time_set; /* should be always greater than 0 */ if (left_time < timer->timeout) { left_time = timer->timeout - left_time; return left_time; } return 0; } void MutexInit(Mutex *mutex) { mutex_init(&mutex->lock); } int MutexLock(Mutex *mutex) { mutex_lock(&mutex->lock); return 0; } int MutexUnlock(Mutex *mutex) { mutex_unlock(&mutex->lock); return 0; } void *mqtt_riot_run(void *arg) { MQTTClient *client = (MQTTClient *)arg; assert(client); while (1) { int rc; MutexLock(&client->mutex); if ((rc = MQTTYield(client, PAHO_MQTT_YIELD_MS)) != 0) { LOG_DEBUG("riot_iface: error while MQTTYield()(%d)\n", rc); } MutexUnlock(&client->mutex); /* let other threads do their work */ ztimer_sleep(ZTIMER_MSEC, MQTT_YIELD_POLLING_MS); } return NULL; } int ThreadStart(Thread *thread, void (*fn)(void *), void *arg) { (void) fn; thread->pid = thread_create(thread->stack, sizeof(thread->stack), MQTT_THREAD_PRIORITY, 0, mqtt_riot_run, arg, "paho_mqtt_riot"); return thread->pid; }