mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2025-01-18 01:52:45 +01:00
232 lines
5.4 KiB
C
232 lines
5.4 KiB
C
/*
|
|
* Copyright (C) 2019 Javier FILEIV <javier.fileiv@gmail.com>
|
|
*
|
|
* This file is subject to the terms and conditions of the GNU Lesser
|
|
* General Public License v2.1. See the file LICENSE in the top level
|
|
* directory for more details.
|
|
*/
|
|
|
|
/**
|
|
* @brief MQTT common RIOT interface functions
|
|
*
|
|
* @author Javier FILEIV <javier.fileiv@gmail.com>
|
|
*/
|
|
|
|
#include <assert.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
|
|
#ifdef MODULE_IPV6_ADDR
|
|
#include "net/ipv6/addr.h"
|
|
#endif
|
|
#ifdef MODULE_IPV4_ADDR
|
|
#include "net/ipv4/addr.h"
|
|
#endif
|
|
#include "net/dns.h"
|
|
#include "net/sock/tcp.h"
|
|
#include "paho_mqtt.h"
|
|
#include "MQTTClient.h"
|
|
#include "timex.h"
|
|
#include "ztimer.h"
|
|
#include "tsrb.h"
|
|
#include "log.h"
|
|
|
|
#define ENABLE_DEBUG 0
|
|
#include "debug.h"
|
|
|
|
#define IP_MAX_LEN_ADDRESS (39) /*IPv6 max length */
|
|
|
|
#ifndef TSRB_MAX_SIZE
|
|
#define TSRB_MAX_SIZE (1024)
|
|
#endif
|
|
|
|
static uint8_t buffer[TSRB_MAX_SIZE];
|
|
static uint8_t _temp_buf[TSRB_MAX_SIZE];
|
|
static tsrb_t tsrb_lwip_tcp;
|
|
|
|
#ifndef PAHO_MQTT_YIELD_MS
|
|
#define PAHO_MQTT_YIELD_MS (10)
|
|
#endif
|
|
|
|
static int mqtt_read(struct Network *n, unsigned char *buf, int len,
|
|
int timeout_ms)
|
|
{
|
|
int _timeout;
|
|
int _len;
|
|
void *_buf;
|
|
int rc = -1;
|
|
|
|
if (IS_USED(MODULE_LWIP)) {
|
|
/* As LWIP doesn't support packet reading byte per byte and
|
|
* PAHO MQTT reads like that to decode it on the fly,
|
|
* we read TSRB_MAX_SIZE at once and keep them in a ring buffer.
|
|
*/
|
|
_buf = _temp_buf;
|
|
_len = TSRB_MAX_SIZE;
|
|
_timeout = 0;
|
|
}
|
|
else {
|
|
_buf = buf;
|
|
_len = len;
|
|
_timeout = timeout_ms;
|
|
}
|
|
|
|
uint32_t send_time = ztimer_now(ZTIMER_MSEC) + timeout_ms;
|
|
do {
|
|
rc = sock_tcp_read(&n->sock, _buf, _len, _timeout);
|
|
if ((rc == -EAGAIN) || (rc == -ETIMEDOUT)) {
|
|
rc = 0;
|
|
}
|
|
|
|
if (IS_USED(MODULE_LWIP)) {
|
|
if (rc > 0) {
|
|
tsrb_add(&tsrb_lwip_tcp, _temp_buf, rc);
|
|
}
|
|
|
|
rc = tsrb_get(&tsrb_lwip_tcp, buf, len);
|
|
}
|
|
} while (rc < len && ztimer_now(ZTIMER_MSEC) < send_time && rc >= 0);
|
|
|
|
if (IS_ACTIVE(ENABLE_DEBUG) && IS_USED(MODULE_LWIP) && rc > 0) {
|
|
DEBUG("MQTT buf asked for %d, available to read %d\n",
|
|
rc, tsrb_avail(&tsrb_lwip_tcp));
|
|
for (int i = 0; i < rc; i++) {
|
|
DEBUG("0x%02X ", buf[i]);
|
|
}
|
|
DEBUG("\n");
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
static int mqtt_write(struct Network *n, unsigned char *buf, int len,
|
|
int timeout_ms)
|
|
{
|
|
/* timeout is controlled by upper layer in PAHO */
|
|
(void) timeout_ms;
|
|
|
|
return sock_tcp_write(&n->sock, buf, len);
|
|
}
|
|
|
|
void NetworkInit(Network *n)
|
|
{
|
|
if (IS_USED(MODULE_LWIP)) {
|
|
tsrb_init(&tsrb_lwip_tcp, buffer, TSRB_MAX_SIZE);
|
|
}
|
|
n->mqttread = mqtt_read;
|
|
n->mqttwrite = mqtt_write;
|
|
}
|
|
|
|
int NetworkConnect(Network *n, char *addr_ip, int port)
|
|
{
|
|
int ret =-1;
|
|
sock_tcp_ep_t remote = SOCK_IPV4_EP_ANY;
|
|
|
|
ret = dns_query(addr_ip, &remote.addr, AF_UNSPEC);
|
|
if (ret > 0) {
|
|
remote.port = port;
|
|
remote.family = ret == 4 ? AF_INET : AF_INET6;
|
|
}
|
|
|
|
if (IS_USED(MODULE_IPV4_ADDR) && (remote.port == 0) &&
|
|
ipv4_addr_from_str((ipv4_addr_t *)&remote.addr, addr_ip)) {
|
|
remote.port = port;
|
|
}
|
|
|
|
if (IS_USED(MODULE_IPV6_ADDR) && (remote.port == 0) &&
|
|
ipv6_addr_from_str((ipv6_addr_t *)&remote.addr, addr_ip)) {
|
|
remote.port = port;
|
|
remote.family = AF_INET6;
|
|
}
|
|
|
|
if (remote.port == 0) {
|
|
LOG_ERROR("Error: unable to parse destination address\n");
|
|
return ret;
|
|
}
|
|
|
|
ret = sock_tcp_connect(&n->sock, &remote, 0, 0);
|
|
return ret;
|
|
}
|
|
|
|
void NetworkDisconnect(Network *n)
|
|
{
|
|
sock_tcp_disconnect(&n->sock);
|
|
}
|
|
|
|
void TimerInit(Timer *timer)
|
|
{
|
|
timer->timeout = 0;
|
|
timer->time_set = 0;
|
|
}
|
|
|
|
char TimerIsExpired(Timer *timer)
|
|
{
|
|
return (TimerLeftMS(timer) == 0);
|
|
}
|
|
|
|
void TimerCountdownMS(Timer *timer, unsigned int timeout_ms)
|
|
{
|
|
timer->time_set = ztimer_now(ZTIMER_MSEC);
|
|
timer->timeout = timeout_ms;
|
|
}
|
|
|
|
void TimerCountdown(Timer *timer, unsigned int timeout_s)
|
|
{
|
|
TimerCountdownMS(timer, timeout_s * MS_PER_SEC);
|
|
}
|
|
|
|
int TimerLeftMS(Timer *timer)
|
|
{
|
|
uint32_t left_time = ztimer_now(ZTIMER_MSEC) - timer->time_set; /* should be always greater than 0 */
|
|
if (left_time < timer->timeout) {
|
|
left_time = timer->timeout - left_time;
|
|
return left_time;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void MutexInit(Mutex *mutex)
|
|
{
|
|
mutex_init(&mutex->lock);
|
|
}
|
|
|
|
int MutexLock(Mutex *mutex)
|
|
{
|
|
mutex_lock(&mutex->lock);
|
|
return 0;
|
|
}
|
|
|
|
int MutexUnlock(Mutex *mutex)
|
|
{
|
|
mutex_unlock(&mutex->lock);
|
|
return 0;
|
|
}
|
|
|
|
void *mqtt_riot_run(void *arg)
|
|
{
|
|
MQTTClient *client = (MQTTClient *)arg;
|
|
assert(client);
|
|
|
|
while (1) {
|
|
int rc;
|
|
MutexLock(&client->mutex);
|
|
if ((rc = MQTTYield(client, PAHO_MQTT_YIELD_MS)) != 0) {
|
|
LOG_DEBUG("riot_iface: error while MQTTYield()(%d)\n", rc);
|
|
}
|
|
MutexUnlock(&client->mutex);
|
|
/* let other threads do their work */
|
|
ztimer_sleep(ZTIMER_MSEC, MQTT_YIELD_POLLING_MS);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
int ThreadStart(Thread *thread, void (*fn)(void *), void *arg)
|
|
{
|
|
(void) fn;
|
|
thread->pid = thread_create(thread->stack, sizeof(thread->stack),
|
|
MQTT_THREAD_PRIORITY,
|
|
THREAD_CREATE_STACKTEST, mqtt_riot_run, arg,
|
|
"paho_mqtt_riot");
|
|
return thread->pid;
|
|
}
|