mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2024-12-29 04:50:03 +01:00
Merge pull request #8333 from samkumar/feature-condition-variable
core: condition variable implementation
This commit is contained in:
commit
43d39b7a33
87
core/cond.c
Normal file
87
core/cond.c
Normal file
@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Copyright (C) 2016 Sam Kumar <samkumar@berkeley.edu>
|
||||
* 2016 University of California, Berkeley
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @ingroup core_sync
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief Kernel condition variable implementation
|
||||
*
|
||||
* @author Sam Kumar <samkumar@berkeley.edu>
|
||||
*
|
||||
* @}
|
||||
*/
|
||||
|
||||
#include "cond.h"
|
||||
#include "irq.h"
|
||||
#include "mutex.h"
|
||||
#include "thread.h"
|
||||
|
||||
#define ENABLE_DEBUG (0)
|
||||
#include "debug.h"
|
||||
|
||||
void cond_init(cond_t *cond)
|
||||
{
|
||||
cond->queue.next = NULL;
|
||||
}
|
||||
|
||||
void cond_wait(cond_t *cond, mutex_t *mutex)
|
||||
{
|
||||
unsigned irqstate = irq_disable();
|
||||
thread_t *me = (thread_t *)sched_active_thread;
|
||||
|
||||
mutex_unlock(mutex);
|
||||
sched_set_status(me, STATUS_COND_BLOCKED);
|
||||
thread_add_to_list(&cond->queue, me);
|
||||
irq_restore(irqstate);
|
||||
thread_yield_higher();
|
||||
|
||||
/*
|
||||
* Once we reach this point, the condition variable was signalled,
|
||||
* and we are free to continue.
|
||||
*/
|
||||
mutex_lock(mutex);
|
||||
}
|
||||
|
||||
static void _cond_signal(cond_t *cond, bool broadcast)
|
||||
{
|
||||
unsigned irqstate = irq_disable();
|
||||
list_node_t *next;
|
||||
|
||||
uint16_t min_prio = THREAD_PRIORITY_MIN + 1;
|
||||
|
||||
while ((next = list_remove_head(&cond->queue)) != NULL) {
|
||||
thread_t *process = container_of((clist_node_t *)next, thread_t, rq_entry);
|
||||
sched_set_status(process, STATUS_PENDING);
|
||||
uint16_t process_priority = process->priority;
|
||||
if (process_priority < min_prio) {
|
||||
min_prio = process_priority;
|
||||
}
|
||||
|
||||
if (!broadcast) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
irq_restore(irqstate);
|
||||
if (min_prio <= THREAD_PRIORITY_MIN) {
|
||||
sched_switch(min_prio);
|
||||
}
|
||||
}
|
||||
|
||||
void cond_signal(cond_t *cond)
|
||||
{
|
||||
_cond_signal(cond, false);
|
||||
}
|
||||
|
||||
void cond_broadcast(cond_t *cond)
|
||||
{
|
||||
_cond_signal(cond, true);
|
||||
}
|
214
core/include/cond.h
Normal file
214
core/include/cond.h
Normal file
@ -0,0 +1,214 @@
|
||||
/*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Condition variable for thread synchronization
|
||||
* @ingroup core, core_sync
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief RIOT synchronization API
|
||||
*
|
||||
* This file contains a condition variable with Mesa-style semantics.
|
||||
*
|
||||
* Condition variable solve the following problem. Suppose that a thread should
|
||||
* sleep until a certain condition comes true. Condition variables provide a
|
||||
* primitive whereby a thread can go to sleep by calling cond_wait(). Then,
|
||||
* when the condition comes true in a thread or interrupt context, cond_signal()
|
||||
* can be called, to wake up the thread.
|
||||
*
|
||||
* "Mesa-style semantics" means that, when cond_signal() is called, the
|
||||
* sleeping thread becomes runnable, but may not be scheduled immediately. In
|
||||
* contrast, "Hoare-style semantics" means that when cond_signal() is called,
|
||||
* the sleeping thread is awakened and immediately scheduled. The condition
|
||||
* variable in this file implements Mesa-style semantics, as is used by other
|
||||
* standard implementations, such as pthreads.
|
||||
*
|
||||
* To avoid races, condition variables are used with mutexes. When a thread is
|
||||
* put to sleep with cond_wait, it atomically unlocks the provided mutex and
|
||||
* then goes to sleep. When it is awakened with cond_signal, it reacquires the
|
||||
* mutex.
|
||||
*
|
||||
* As a rule of thumb, every condition variable should have a corresponding
|
||||
* mutex, and that mutex should be held whenever performing any operation with
|
||||
* the condition variable. There are exceptions to this rule, where it is
|
||||
* appropriate to call cond_signal or cond_broadcast without the mutex held
|
||||
* (for example, if you know that no thread will call cond_wait concurrently).
|
||||
* It is safe to call cond_signal or cond_broadcast in interrupt context.
|
||||
*
|
||||
* However, the programmer should be aware of the following situation that
|
||||
* could arise with Mesa-style condition variables: the condition may become
|
||||
* true, making the sleeping thread runnable, but the condition may become
|
||||
* false again before the thread is scheduled. To handle this case, the
|
||||
* condition variable should be used in a while loop as follows:
|
||||
*
|
||||
* ```
|
||||
* mutex_lock(&lock);
|
||||
* while (condition_is_not_true) {
|
||||
* cond_wait(&cond, &lock);
|
||||
* }
|
||||
* // do work while condition is true.
|
||||
* mutex_unlock(&lock);
|
||||
* ```
|
||||
*
|
||||
* When used in this way, the thread checks, once it has has awakened, whether
|
||||
* the condition is actually true, and goes to sleep again if it is not. This
|
||||
* is the standard way to use Mesa-style condition variables.
|
||||
*
|
||||
* Example: Suppose we want to implement a bounded queue, such as a Unix-style
|
||||
* pipe between two threads. When data is written to the pipe, it is appended
|
||||
* to a queue, and the writing thread blocks if the queue is full. When data
|
||||
* is read from the pipe, it is removed from the queue; if the queue is empty,
|
||||
* the reading thread blocks until it is not empty. If the pipe is closed by
|
||||
* the sender, waiting reading threads wake up.
|
||||
*
|
||||
* Here is a sketch of how to implement such a structure with condition
|
||||
* variables. For simplicity, messages are single bytes. We assume a FIFO data
|
||||
* structure queue_t. We assume it is unsafe to add to the queue if it is full,
|
||||
* or remove from the queue if it is empty.
|
||||
*
|
||||
* ```
|
||||
* typedef struct pipe {
|
||||
* queue_t queue;
|
||||
* cond_t read_cond;
|
||||
* cond_t write_cond;
|
||||
* mutex_t lock;
|
||||
* bool closed;
|
||||
* } pipe_t;
|
||||
*
|
||||
* void pipe_init(pipe_t* pipe) {
|
||||
* queue_init(&pipe->queue);
|
||||
* cond_init(&pipe->read_cond);
|
||||
* cond_init(&pipe->write_cond);
|
||||
* mutex_init(&pipe->lock);
|
||||
* pipe->closed = false;
|
||||
* }
|
||||
*
|
||||
* void pipe_write(pipe_t* pipe, char c) {
|
||||
* mutex_lock(&pipe->lock);
|
||||
* while (queue_length(&pipe->queue) == MAX_QUEUE_LENGTH && !pipe->closed) {
|
||||
* cond_wait(&pipe->write_cond, &pipe->lock);
|
||||
* }
|
||||
* if (pipe->closed) {
|
||||
* mutex_unlock(&pipe->lock);
|
||||
* return 0;
|
||||
* }
|
||||
* add_to_queue(&pipe->queue, c);
|
||||
* cond_signal(&pipe->read_cond);
|
||||
* mutex_unlock(&pipe->lock);
|
||||
* return 1;
|
||||
* }
|
||||
*
|
||||
* void pipe_close(pipe_t* pipe) {
|
||||
* mutex_lock(&pipe->lock);
|
||||
* pipe->closed = true;
|
||||
* cond_broadcast(&pipe->read_cond);
|
||||
* cond_broadcast(&pipe->write_cond);
|
||||
* mutex_unlock(&pipe->lock);
|
||||
* }
|
||||
*
|
||||
* int pipe_read(pipe_t* pipe, char* buf) {
|
||||
* mutex_lock(&pipe->lock);
|
||||
* while (queue_length(&pipe->queue) == 0 && !pipe->closed) {
|
||||
* cond_wait(&pipe->read_cond, &pipe->lock);
|
||||
* }
|
||||
* if (pipe->closed) {
|
||||
* mutex_unlock(&pipe->lock);
|
||||
* return 0;
|
||||
* }
|
||||
* *buf = remove_from_queue(&pipe->queue);
|
||||
* cond_signal(&pipe->write_cond);
|
||||
* mutex_unlock(&pipe->lock);
|
||||
* return 1;
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* Note that this could actually be written with a single condition variable.
|
||||
* However, the example includes two for didactic reasons.
|
||||
*
|
||||
* @author Sam Kumar <samkumar@berkeley.edu>
|
||||
*/
|
||||
|
||||
#ifndef COND_H
|
||||
#define COND_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "list.h"
|
||||
#include "mutex.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Condition variable structure. Must never be modified by the user.
|
||||
*/
|
||||
typedef struct {
|
||||
/**
|
||||
* @brief The process waiting queue of the condition variable.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
list_node_t queue;
|
||||
} cond_t;
|
||||
|
||||
/**
|
||||
* @brief Static initializer for cond_t.
|
||||
*
|
||||
* @note This initializer is preferable to cond_init().
|
||||
*/
|
||||
#define COND_INIT { { NULL } }
|
||||
|
||||
/**
|
||||
* @brief Initializes a condition variable.
|
||||
*
|
||||
* @details For initialization of variables use COND_INIT instead.
|
||||
* Only use the function call for dynamically allocated condition
|
||||
* variables.
|
||||
*
|
||||
* @param[in] cond Pre-allocated condition structure. Must not be NULL.
|
||||
*/
|
||||
void cond_init(cond_t *cond);
|
||||
|
||||
/**
|
||||
* @brief Waits on a condition.
|
||||
*
|
||||
* @param[in] cond Condition variable to wait on.
|
||||
* @param[in] mutex Mutex object held by the current thread.
|
||||
*/
|
||||
void cond_wait(cond_t *cond, mutex_t *mutex);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Wakes up one thread waiting on the condition variable.
|
||||
*
|
||||
* @details The thread is marked as runnable and will only be scheduled later
|
||||
* at the scheduler's whim, so the thread should re-check the condition and wait
|
||||
* again if it is not fulfilled.
|
||||
*
|
||||
* @param[in] cond Condition variable to signal.
|
||||
*/
|
||||
void cond_signal(cond_t *cond);
|
||||
|
||||
/**
|
||||
* @brief Wakes up all threads waiting on the condition variable.
|
||||
*
|
||||
* @details The threads are marked as runnable and will only be scheduled later
|
||||
* at the scheduler's whim, so they should re-check the condition and wait again
|
||||
* if it is not fulfilled.
|
||||
*
|
||||
* @param[in] cond Condition variable to broadcast.
|
||||
*/
|
||||
void cond_broadcast(cond_t *cond);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* COND_H */
|
||||
/** @} */
|
@ -154,6 +154,7 @@
|
||||
#define STATUS_FLAG_BLOCKED_ANY 6 /**< waiting for any flag from flag_mask*/
|
||||
#define STATUS_FLAG_BLOCKED_ALL 7 /**< waiting for all flags in flag_mask */
|
||||
#define STATUS_MBOX_BLOCKED 8 /**< waiting for get/put on mbox */
|
||||
#define STATUS_COND_BLOCKED 9 /**< waiting for a condition variable */
|
||||
/** @} */
|
||||
|
||||
/**
|
||||
@ -162,8 +163,8 @@
|
||||
*/
|
||||
#define STATUS_ON_RUNQUEUE STATUS_RUNNING /**< to check if on run queue:
|
||||
`st >= STATUS_ON_RUNQUEUE` */
|
||||
#define STATUS_RUNNING 9 /**< currently running */
|
||||
#define STATUS_PENDING 10 /**< waiting to be scheduled to run */
|
||||
#define STATUS_RUNNING 10 /**< currently running */
|
||||
#define STATUS_PENDING 11 /**< waiting to be scheduled to run */
|
||||
/** @} */
|
||||
|
||||
/**
|
||||
|
11
tests/cond_order/Makefile
Normal file
11
tests/cond_order/Makefile
Normal file
@ -0,0 +1,11 @@
|
||||
include ../Makefile.tests_common
|
||||
|
||||
BOARD_INSUFFICIENT_MEMORY := nucleo32-f031 nucleo32-f042 nucleo32-l031 nucleo-f030 \
|
||||
nucleo-l053 stm32f0discovery arduino-duemilanove \
|
||||
arduino-uno nucleo-f030r8 nucleo-f031k6 nucleo-f042k6 \
|
||||
nucleo-l031k6 nucleo-l053r8
|
||||
|
||||
include $(RIOTBASE)/Makefile.include
|
||||
|
||||
test:
|
||||
tests/01-run.py
|
35
tests/cond_order/README.md
Normal file
35
tests/cond_order/README.md
Normal file
@ -0,0 +1,35 @@
|
||||
Expected result
|
||||
===============
|
||||
When successful, you should see 5 different threads printing their PID and
|
||||
priority. The thread with the lowest priority should be able to signaled first,
|
||||
followed by the other threads in the order of their priority (highest next). If
|
||||
the main thread holds the lock, however, none of the other threads should be
|
||||
able to make progress. The output should look like the following:
|
||||
|
||||
```
|
||||
main(): This is RIOT! (Version: 2018.01-devel-1120-g811de-starbeam-feature-condition-variable)
|
||||
Condition variable order test
|
||||
Please refer to the README.md for more information
|
||||
|
||||
T3 (prio 6): waiting on condition variable now
|
||||
T4 (prio 4): waiting on condition variable now
|
||||
T5 (prio 0): waiting on condition variable now
|
||||
T6 (prio 2): waiting on condition variable now
|
||||
T7 (prio 1): waiting on condition variable now
|
||||
First batch was signaled
|
||||
T5 (prio 0): condition variable was signaled now
|
||||
T7 (prio 1): condition variable was signaled now
|
||||
T6 (prio 2): condition variable was signaled now
|
||||
First batch has woken up
|
||||
Second batch was signaled
|
||||
T4 (prio 4): condition variable was signaled now
|
||||
T3 (prio 6): condition variable was signaled now
|
||||
Second batch has woken up
|
||||
|
||||
Test END, check the order of priorities above.
|
||||
```
|
||||
|
||||
Background
|
||||
==========
|
||||
This test application stresses a condition variable with a number of threads
|
||||
waiting on it. The threads are signaled (awakened) in two batches.
|
97
tests/cond_order/main.c
Normal file
97
tests/cond_order/main.c
Normal file
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright (C) 2018 University of California, Berkeley
|
||||
* Copyright (C) 2016 Freie Universität Berlin
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @ingroup tests
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief Test application for testing mutexes
|
||||
*
|
||||
* @author Sam Kumar <samkumar@cs.berkeley.edu>,
|
||||
* Hauke Petersen <hauke.petersen@fu-berlin.de>
|
||||
* @}
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "cond.h"
|
||||
#include "mutex.h"
|
||||
#include "thread.h"
|
||||
|
||||
#define THREAD_NUMOF (5U)
|
||||
#define THREAD_FIRSTGROUP_NUMOF (3U)
|
||||
|
||||
extern volatile thread_t *sched_active_thread;
|
||||
|
||||
static char stacks[THREAD_NUMOF][THREAD_STACKSIZE_MAIN];
|
||||
|
||||
static const char prios[THREAD_NUMOF] = {THREAD_PRIORITY_MAIN - 1, 4, 0, 2, 1};
|
||||
|
||||
static mutex_t testlock;
|
||||
static cond_t testcond;
|
||||
|
||||
static void *lockme(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
volatile thread_t *t = sched_active_thread;
|
||||
|
||||
mutex_lock(&testlock);
|
||||
printf("T%i (prio %i): waiting on condition variable now\n",
|
||||
(int)t->pid, (int)t->priority);
|
||||
cond_wait(&testcond, &testlock);
|
||||
printf("T%i (prio %i): condition variable was signaled now\n",
|
||||
(int)t->pid, (int)t->priority);
|
||||
mutex_unlock(&testlock);
|
||||
|
||||
thread_yield();
|
||||
|
||||
mutex_unlock(&testlock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
puts("Condition variable order test");
|
||||
puts("Please refer to the README.md for more information\n");
|
||||
|
||||
mutex_init(&testlock);
|
||||
cond_init(&testcond);
|
||||
|
||||
/* create threads */
|
||||
for (unsigned i = 0; i < THREAD_NUMOF; i++) {
|
||||
thread_create(stacks[i], sizeof(stacks[i]), prios[i], 0,
|
||||
lockme, NULL, "t");
|
||||
}
|
||||
/* allow threads to lock the mutex and wait on the condition variable */
|
||||
|
||||
/* signal the first few threads, in a group */
|
||||
mutex_lock(&testlock);
|
||||
for (unsigned i = 0; i < THREAD_FIRSTGROUP_NUMOF; i++) {
|
||||
cond_signal(&testcond);
|
||||
}
|
||||
printf("First batch was signaled\n");
|
||||
mutex_unlock(&testlock);
|
||||
/* allow the first THREAD_FIRSTGROUP_NUMOF threads to wake up */
|
||||
|
||||
printf("First batch has woken up\n");
|
||||
|
||||
mutex_lock(&testlock);
|
||||
cond_broadcast(&testcond);
|
||||
printf("Second batch was signaled\n");
|
||||
mutex_unlock(&testlock);
|
||||
/* allow the remaining threads to wake up */
|
||||
printf("Second batch has woken up\n");
|
||||
|
||||
mutex_lock(&testlock);
|
||||
puts("\nTest END, check the order of priorities above.");
|
||||
|
||||
return 0;
|
||||
}
|
44
tests/cond_order/tests/01-run.py
Executable file
44
tests/cond_order/tests/01-run.py
Executable file
@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
|
||||
# Copyright (C) 2016 Oliver Hahm <oliver.hahm@inria.fr>
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
thread_prio = {
|
||||
3: 6,
|
||||
4: 4,
|
||||
5: 0,
|
||||
6: 2,
|
||||
7: 1
|
||||
}
|
||||
first_group_size = 3
|
||||
|
||||
|
||||
def testfunc(child):
|
||||
for k in thread_prio.keys():
|
||||
child.expect(u"T%i \(prio %i\): waiting on condition variable now" % (k, thread_prio[k]))
|
||||
|
||||
count = 0
|
||||
last = -1
|
||||
child.expect(u"First batch was signaled")
|
||||
for _ in range(len(thread_prio)):
|
||||
child.expect(u"T\d+ \(prio (\d+)\): condition variable was signaled now")
|
||||
assert(int(child.match.group(1)) > last)
|
||||
last = int(child.match.group(1))
|
||||
count += 1
|
||||
if count == 3:
|
||||
child.expect(u"First batch has woken up")
|
||||
child.expect(u"Second batch was signaled")
|
||||
child.expect(u"Second batch has woken up")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.path.append(os.path.join(os.environ['RIOTBASE'], 'dist/tools/testrunner'))
|
||||
from testrunner import run
|
||||
sys.exit(run(testfunc))
|
Loading…
Reference in New Issue
Block a user