1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

posix_sockets: count available received messages asynchronously

Utilizing `sock_async`
This commit is contained in:
Martine S. Lenders 2019-12-13 12:58:35 +01:00
parent 791069acf1
commit af24c539d0
No known key found for this signature in database
GPG Key ID: CCD317364F63286F

View File

@ -20,6 +20,7 @@
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <string.h>
@ -37,6 +38,10 @@
#include "net/sock/udp.h"
#include "net/sock/tcp.h"
#if IS_USED(MODULE_SOCK_ASYNC)
#include "net/sock/async.h"
#endif
/* enough to create sockets both with socket() and accept() */
#define _ACTUAL_SOCKET_POOL_SIZE (SOCKET_POOL_SIZE + \
(SOCKET_POOL_SIZE * SOCKET_TCP_QUEUE_SIZE))
@ -78,6 +83,9 @@ typedef struct {
#ifdef MODULE_SOCK_TCP
sock_tcp_t *queue_array;
unsigned queue_array_len;
#endif
#if IS_USED(MODULE_SOCK_ASYNC)
atomic_uint available;
#endif
sock_tcp_ep_t local; /* to store bind before connect/listen */
} socket_t;
@ -105,6 +113,9 @@ static socket_t *_get_free_socket(void)
{
for (int i = 0; i < _ACTUAL_SOCKET_POOL_SIZE; i++) {
if (_socket_pool[i].domain == AF_UNSPEC) {
#if IS_USED(MODULE_SOCK_ASYNC)
atomic_init(&_socket_pool[i].available, 0U);
#endif
return &_socket_pool[i];
}
}
@ -338,6 +349,68 @@ static const vfs_file_ops_t socket_ops = {
.write = socket_write,
};
#if IS_USED(MODULE_SOCK_ASYNC)
static void _async_cb(void *sock, sock_async_flags_t type,
void *arg)
{
socket_t *socket = arg;
(void)sock;
if (type & SOCK_ASYNC_MSG_RECV) {
atomic_fetch_add(&socket->available, 1);
#if IS_USED(MODULE_POSIX_SELECT)
thread_flags_set(sock->socket->selecting_thread,
POSIX_SELECT_THREAD_FLAG);
#endif
}
}
static void _sock_set_cb(socket_t *socket)
{
union {
void (*sock_pool)(void *, sock_async_flags_t, void *);
#ifdef MODULE_SOCK_IP
sock_ip_cb_t ip;
#endif
#ifdef MODULE_SOCK_TCP
sock_tcp_cb_t tcp;
sock_tcp_queue_cb_t tcp_queue;
#endif
#ifdef MODULE_SOCK_UDP
sock_udp_cb_t udp;
#endif
} callback = { .sock_pool = _async_cb };
switch (socket->type) {
#ifdef MODULE_SOCK_IP
case SOCK_RAW:
sock_ip_set_cb(&socket->sock.ip, callback.ip, socket);
break;
#endif
#ifdef MODULE_SOCK_TCP
case SOCK_STREAM:
/* is a TCP client socket */
if (socket->queue_array == NULL) {
sock_tcp_set_cb(&socket->sock.tcp.sock, callback.tcp, socket);
}
/* is a TCP listening socket */
else {
sock_tcp_queue_set_cb(&socket->sock.tcp.queue,
callback.tcp_queue, socket);
}
break;
#endif
#ifdef MODULE_SOCK_UDP
case SOCK_DGRAM:
sock_udp_set_cb(&socket->sock->udp, callback.udp, socket);
break;
#endif
default:
break;
}
}
#endif
int socket(int domain, int type, int protocol)
{
int res = 0;
@ -472,6 +545,9 @@ int accept(int socket, struct sockaddr *restrict address,
new_s->queue_array = NULL;
new_s->queue_array_len = 0;
new_s->sock = (socket_sock_t *)sock;
#if IS_USED(MODULE_SOCK_ASYNC)
_sock_set_cb(new_s);
#endif
memset(&s->local, 0, sizeof(sock_tcp_ep_t));
}
break;
@ -605,6 +681,10 @@ static int _bind_connect(socket_t *s, const struct sockaddr *address,
return -1;
}
s->sock = sock;
#if IS_USED(MODULE_SOCK_ASYNC)
_sock_set_cb(s);
#endif
return 0;
}
@ -804,6 +884,9 @@ int listen(int socket, int backlog)
}
if (res == 0) {
s->sock = sock;
#if IS_USED(MODULE_SOCK_ASYNC)
_sock_set_cb(s);
#endif
}
else {
errno = -res;
@ -878,6 +961,9 @@ static ssize_t socket_recvfrom(socket_t *s, void *restrict buffer,
break;
}
if ((res >= 0) && (address != NULL) && (address_len != NULL)) {
#ifdef MODULE_SOCK_ASYNC
atomic_fetch_sub(&s->available, 1);
#endif
switch (s->type) {
#ifdef MODULE_SOCK_TCP
case SOCK_STREAM:
@ -1071,6 +1157,18 @@ bool posix_socket_is(int fd)
return IS_USED(MODULE_SOCK_ASYNC) && (_get_socket(fd) != NULL);
}
unsigned posix_socket_avail(int fd)
{
#if IS_USED(MODULE_SOCK_ASYNC)
socket_t *socket = _get_socket(fd);
return (socket != NULL) ? atomic_load(&socket->available) : 0U;
#else
(void)fd;
return 0U;
#endif
}
/**
* @}
*/