mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2025-01-17 05:52:44 +01:00
* first implementation of msg queues
This commit is contained in:
parent
cb1d5c7ab3
commit
cb79a7a237
@ -105,7 +105,13 @@ int msg_send_receive(msg *m, msg *reply, unsigned int target_pid);
|
||||
*/
|
||||
int msg_reply(msg *m, msg *reply);
|
||||
|
||||
uint16_t msg_alloc_event(void);
|
||||
/**
|
||||
* @brief Initialize the current thread's message queue.
|
||||
*
|
||||
* @param array Pointer to preallocated array of msg objects
|
||||
* @param num Number of msg objects in array. MUST BE POWER OF TWO!
|
||||
*/
|
||||
int msg_init_queue(msg* array, int num);
|
||||
|
||||
/** @} */
|
||||
#endif /* __MSG_H */
|
||||
|
@ -19,6 +19,8 @@
|
||||
#include <stdint.h>
|
||||
#include <queue.h>
|
||||
#include <clist.h>
|
||||
#include <cib.h>
|
||||
#include <msg.h>
|
||||
|
||||
/* uneven means has to be on runqueue */
|
||||
#define STATUS_NOT_FOUND (0x0000)
|
||||
@ -40,11 +42,14 @@ typedef struct tcb {
|
||||
uint16_t pid;
|
||||
uint16_t priority;
|
||||
|
||||
void* wait_data;
|
||||
queue_node_t msg_queue;
|
||||
|
||||
clist_node_t rq_entry;
|
||||
|
||||
void* wait_data;
|
||||
queue_node_t msg_waiters;
|
||||
|
||||
cib_t msg_queue;
|
||||
msg* msg_array;
|
||||
|
||||
const char* name;
|
||||
char* stack_start;
|
||||
int stack_size;
|
||||
|
89
core/msg.c
89
core/msg.c
@ -20,32 +20,47 @@
|
||||
#include "tcb.h"
|
||||
#include <stddef.h>
|
||||
#include <irq.h>
|
||||
#include <cib.h>
|
||||
|
||||
#include "flags.h"
|
||||
|
||||
//#define ENABLE_DEBUG
|
||||
#include "debug.h"
|
||||
|
||||
static int queue_msg(tcb *target, msg *m) {
|
||||
int n = cib_put(&(target->msg_queue));
|
||||
|
||||
if (n != -1) {
|
||||
target->msg_array[n] = *m;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int msg_send(msg* m, unsigned int target_pid, bool block) {
|
||||
if (inISR()) {
|
||||
return msg_send_int(m, target_pid);
|
||||
}
|
||||
|
||||
int result = 1;
|
||||
|
||||
tcb *target = (tcb*)sched_threads[target_pid];
|
||||
|
||||
m->sender_pid = thread_pid;
|
||||
if (m->sender_pid == target_pid) return -1;
|
||||
|
||||
dINT();
|
||||
|
||||
if (target == NULL) {
|
||||
eINT();
|
||||
if (m->sender_pid == target_pid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (target == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
dINT();
|
||||
if (target->status != STATUS_RECEIVE_BLOCKED) {
|
||||
if (queue_msg(target, m)) {
|
||||
eINT();
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (! block ) {
|
||||
DEBUG("%s: receiver not waiting. block=%u\n", active_thread->name, block);
|
||||
eINT();
|
||||
@ -56,9 +71,9 @@ int msg_send(msg* m, unsigned int target_pid, bool block) {
|
||||
queue_node_t n;
|
||||
n.priority = active_thread->priority;
|
||||
n.data = (unsigned int) active_thread;
|
||||
DEBUG("%s: Adding node to msg_queue:\n", active_thread->name);
|
||||
DEBUG("%s: Adding node to msg_waiters:\n", active_thread->name);
|
||||
|
||||
queue_priority_add(&(target->msg_queue), &n);
|
||||
queue_priority_add(&(target->msg_waiters), &n);
|
||||
|
||||
active_thread->wait_data = (void*) m;
|
||||
|
||||
@ -83,7 +98,7 @@ int msg_send(msg* m, unsigned int target_pid, bool block) {
|
||||
eINT();
|
||||
thread_yield();
|
||||
|
||||
return result;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int msg_send_int(msg* m, unsigned int target_pid) {
|
||||
@ -103,9 +118,8 @@ int msg_send_int(msg* m, unsigned int target_pid) {
|
||||
return 1;
|
||||
} else {
|
||||
DEBUG("msg_send_int: receiver not waiting.\n");
|
||||
return 0;
|
||||
return (queue_msg(target, m));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int msg_send_receive(msg *m, msg *reply, unsigned int target_pid) {
|
||||
@ -154,29 +168,44 @@ int msg_reply_int(msg *m, msg *reply) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int msg_receive(msg* m) {
|
||||
dINT();
|
||||
DEBUG("%s: msg_receive.\n", active_thread->name);
|
||||
|
||||
tcb *me = (tcb*) sched_threads[thread_pid];
|
||||
|
||||
me->wait_data = (void*) m;
|
||||
int n = cib_get(&(me->msg_queue));
|
||||
if (n >= 0) {
|
||||
DEBUG("%s: msg_receive(): We've got a queued message.\n", active_thread->name);
|
||||
*m = me->msg_array[n];
|
||||
} else {
|
||||
me->wait_data = (void*) m;
|
||||
}
|
||||
|
||||
queue_node_t *n = queue_remove_head(&(me->msg_queue));
|
||||
queue_node_t *node = queue_remove_head(&(me->msg_waiters));
|
||||
|
||||
if (n == NULL) {
|
||||
DEBUG("%s: msg_receive blocked\n", active_thread->name);
|
||||
sched_set_status(me, STATUS_RECEIVE_BLOCKED);
|
||||
if (node == NULL) {
|
||||
DEBUG("%s: msg_receive(): No thread in waiting list.\n", active_thread->name);
|
||||
if (n < 0) {
|
||||
DEBUG("%s: msg_receive(): No msg in queue. Going blocked.\n", active_thread->name);
|
||||
sched_set_status(me, STATUS_RECEIVE_BLOCKED);
|
||||
|
||||
eINT();
|
||||
thread_yield();
|
||||
eINT();
|
||||
thread_yield();
|
||||
|
||||
/* sender copied message */
|
||||
/* sender copied message */
|
||||
}
|
||||
return 1;
|
||||
} else {
|
||||
DEBUG("%s: msg_receive direct copy.\n", active_thread->name);
|
||||
tcb *sender = (tcb*)n->data;
|
||||
DEBUG("%s: msg_receive(): Wakeing up waiting thread.\n", active_thread->name);
|
||||
tcb *sender = (tcb*)node->data;
|
||||
|
||||
if (n >= 0) {
|
||||
/* we've already got a messgage from the queue. as there is a
|
||||
* waiter, take it's message into the just freed queue space.
|
||||
*/
|
||||
m = &(me->msg_array[cib_put(&(me->msg_queue))]);
|
||||
}
|
||||
|
||||
/* copy msg */
|
||||
msg* sender_msg = (msg*)sender->wait_data;
|
||||
@ -190,3 +219,15 @@ int msg_receive(msg* m) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
int msg_init_queue(msg* array, int num) {
|
||||
/* make sure brainfuck condition is met */
|
||||
if (num && (num & (num - 1)) == 0) {
|
||||
tcb *me = (tcb*)active_thread;
|
||||
me->msg_array = array;
|
||||
cib_init(&(me->msg_queue), num);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
@ -144,17 +144,19 @@ int thread_create(char *stack, int stacksize, char priority, int flags, void (*f
|
||||
cb->priority = priority;
|
||||
cb->status = 0;
|
||||
|
||||
cb->rq_entry.data = (unsigned int) cb;
|
||||
cb->rq_entry.next = NULL;
|
||||
cb->rq_entry.prev = NULL;
|
||||
|
||||
cb->name = name;
|
||||
|
||||
cb->wait_data = NULL;
|
||||
|
||||
cb->msg_queue.data = 0;
|
||||
cb->msg_queue.priority = 0;
|
||||
cb->msg_queue.next = NULL;
|
||||
cb->msg_waiters.data = 0;
|
||||
cb->msg_waiters.priority = 0;
|
||||
cb->msg_waiters.next = NULL;
|
||||
|
||||
cb->rq_entry.data = (unsigned int) cb;
|
||||
cb->rq_entry.next = NULL;
|
||||
cb->rq_entry.prev = NULL;
|
||||
cib_init(&(cb->msg_queue),0);
|
||||
|
||||
num_tasks++;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user