1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-17 05:12:57 +01:00

core: condition variable implementation

This commit is contained in:
Sam Kumar 2018-01-07 20:39:52 -08:00
parent 3ff43439ae
commit fb8edbb610
7 changed files with 491 additions and 2 deletions

87
core/cond.c Normal file
View File

@ -0,0 +1,87 @@
/*
* Copyright (C) 2016 Sam Kumar <samkumar@berkeley.edu>
* 2016 University of California, Berkeley
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup core_sync
* @{
*
* @file
* @brief Kernel condition variable implementation
*
* @author Sam Kumar <samkumar@berkeley.edu>
*
* @}
*/
#include "cond.h"
#include "irq.h"
#include "mutex.h"
#include "thread.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
void cond_init(cond_t *cond)
{
cond->queue.next = NULL;
}
void cond_wait(cond_t *cond, mutex_t *mutex)
{
unsigned irqstate = irq_disable();
thread_t *me = (thread_t *)sched_active_thread;
mutex_unlock(mutex);
sched_set_status(me, STATUS_COND_BLOCKED);
thread_add_to_list(&cond->queue, me);
irq_restore(irqstate);
thread_yield_higher();
/*
* Once we reach this point, the condition variable was signalled,
* and we are free to continue.
*/
mutex_lock(mutex);
}
static void _cond_signal(cond_t *cond, bool broadcast)
{
unsigned irqstate = irq_disable();
list_node_t *next;
uint16_t min_prio = THREAD_PRIORITY_MIN + 1;
while ((next = list_remove_head(&cond->queue)) != NULL) {
thread_t *process = container_of((clist_node_t *)next, thread_t, rq_entry);
sched_set_status(process, STATUS_PENDING);
uint16_t process_priority = process->priority;
if (process_priority < min_prio) {
min_prio = process_priority;
}
if (!broadcast) {
break;
}
}
irq_restore(irqstate);
if (min_prio <= THREAD_PRIORITY_MIN) {
sched_switch(min_prio);
}
}
void cond_signal(cond_t *cond)
{
_cond_signal(cond, false);
}
void cond_broadcast(cond_t *cond)
{
_cond_signal(cond, true);
}

214
core/include/cond.h Normal file
View File

@ -0,0 +1,214 @@
/*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @brief Condition variable for thread synchronization
* @ingroup core, core_sync
* @{
*
* @file
* @brief RIOT synchronization API
*
* This file contains a condition variable with Mesa-style semantics.
*
* Condition variable solve the following problem. Suppose that a thread should
* sleep until a certain condition comes true. Condition variables provide a
* primitive whereby a thread can go to sleep by calling cond_wait(). Then,
* when the condition comes true in a thread or interrupt context, cond_signal()
* can be called, to wake up the thread.
*
* "Mesa-style semantics" means that, when cond_signal() is called, the
* sleeping thread becomes runnable, but may not be scheduled immediately. In
* contrast, "Hoare-style semantics" means that when cond_signal() is called,
* the sleeping thread is awakened and immediately scheduled. The condition
* variable in this file implements Mesa-style semantics, as is used by other
* standard implementations, such as pthreads.
*
* To avoid races, condition variables are used with mutexes. When a thread is
* put to sleep with cond_wait, it atomically unlocks the provided mutex and
* then goes to sleep. When it is awakened with cond_signal, it reacquires the
* mutex.
*
* As a rule of thumb, every condition variable should have a corresponding
* mutex, and that mutex should be held whenever performing any operation with
* the condition variable. There are exceptions to this rule, where it is
* appropriate to call cond_signal or cond_broadcast without the mutex held
* (for example, if you know that no thread will call cond_wait concurrently).
* It is safe to call cond_signal or cond_broadcast in interrupt context.
*
* However, the programmer should be aware of the following situation that
* could arise with Mesa-style condition variables: the condition may become
* true, making the sleeping thread runnable, but the condition may become
* false again before the thread is scheduled. To handle this case, the
* condition variable should be used in a while loop as follows:
*
* ```
* mutex_lock(&lock);
* while (condition_is_not_true) {
* cond_wait(&cond, &lock);
* }
* // do work while condition is true.
* mutex_unlock(&lock);
* ```
*
* When used in this way, the thread checks, once it has has awakened, whether
* the condition is actually true, and goes to sleep again if it is not. This
* is the standard way to use Mesa-style condition variables.
*
* Example: Suppose we want to implement a bounded queue, such as a Unix-style
* pipe between two threads. When data is written to the pipe, it is appended
* to a queue, and the writing thread blocks if the queue is full. When data
* is read from the pipe, it is removed from the queue; if the queue is empty,
* the reading thread blocks until it is not empty. If the pipe is closed by
* the sender, waiting reading threads wake up.
*
* Here is a sketch of how to implement such a structure with condition
* variables. For simplicity, messages are single bytes. We assume a FIFO data
* structure queue_t. We assume it is unsafe to add to the queue if it is full,
* or remove from the queue if it is empty.
*
* ```
* typedef struct pipe {
* queue_t queue;
* cond_t read_cond;
* cond_t write_cond;
* mutex_t lock;
* bool closed;
* } pipe_t;
*
* void pipe_init(pipe_t* pipe) {
* queue_init(&pipe->queue);
* cond_init(&pipe->read_cond);
* cond_init(&pipe->write_cond);
* mutex_init(&pipe->lock);
* pipe->closed = false;
* }
*
* void pipe_write(pipe_t* pipe, char c) {
* mutex_lock(&pipe->lock);
* while (queue_length(&pipe->queue) == MAX_QUEUE_LENGTH && !pipe->closed) {
* cond_wait(&pipe->write_cond, &pipe->lock);
* }
* if (pipe->closed) {
* mutex_unlock(&pipe->lock);
* return 0;
* }
* add_to_queue(&pipe->queue, c);
* cond_signal(&pipe->read_cond);
* mutex_unlock(&pipe->lock);
* return 1;
* }
*
* void pipe_close(pipe_t* pipe) {
* mutex_lock(&pipe->lock);
* pipe->closed = true;
* cond_broadcast(&pipe->read_cond);
* cond_broadcast(&pipe->write_cond);
* mutex_unlock(&pipe->lock);
* }
*
* int pipe_read(pipe_t* pipe, char* buf) {
* mutex_lock(&pipe->lock);
* while (queue_length(&pipe->queue) == 0 && !pipe->closed) {
* cond_wait(&pipe->read_cond, &pipe->lock);
* }
* if (pipe->closed) {
* mutex_unlock(&pipe->lock);
* return 0;
* }
* *buf = remove_from_queue(&pipe->queue);
* cond_signal(&pipe->write_cond);
* mutex_unlock(&pipe->lock);
* return 1;
* }
* ```
*
* Note that this could actually be written with a single condition variable.
* However, the example includes two for didactic reasons.
*
* @author Sam Kumar <samkumar@berkeley.edu>
*/
#ifndef COND_H
#define COND_H
#include <stdbool.h>
#include <stddef.h>
#include "list.h"
#include "mutex.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Condition variable structure. Must never be modified by the user.
*/
typedef struct {
/**
* @brief The process waiting queue of the condition variable.
*
* @internal
*/
list_node_t queue;
} cond_t;
/**
* @brief Static initializer for cond_t.
*
* @note This initializer is preferable to cond_init().
*/
#define COND_INIT { { NULL } }
/**
* @brief Initializes a condition variable.
*
* @details For initialization of variables use COND_INIT instead.
* Only use the function call for dynamically allocated condition
* variables.
*
* @param[in] cond Pre-allocated condition structure. Must not be NULL.
*/
void cond_init(cond_t *cond);
/**
* @brief Waits on a condition.
*
* @param[in] cond Condition variable to wait on.
* @param[in] mutex Mutex object held by the current thread.
*/
void cond_wait(cond_t *cond, mutex_t *mutex);
/**
* @brief Wakes up one thread waiting on the condition variable.
*
* @details The thread is marked as runnable and will only be scheduled later
* at the scheduler's whim, so the thread should re-check the condition and wait
* again if it is not fulfilled.
*
* @param[in] cond Condition variable to signal.
*/
void cond_signal(cond_t *cond);
/**
* @brief Wakes up all threads waiting on the condition variable.
*
* @details The threads are marked as runnable and will only be scheduled later
* at the scheduler's whim, so they should re-check the condition and wait again
* if it is not fulfilled.
*
* @param[in] cond Condition variable to broadcast.
*/
void cond_broadcast(cond_t *cond);
#ifdef __cplusplus
}
#endif
#endif /* COND_H */
/** @} */

View File

@ -154,6 +154,7 @@
#define STATUS_FLAG_BLOCKED_ANY 6 /**< waiting for any flag from flag_mask*/
#define STATUS_FLAG_BLOCKED_ALL 7 /**< waiting for all flags in flag_mask */
#define STATUS_MBOX_BLOCKED 8 /**< waiting for get/put on mbox */
#define STATUS_COND_BLOCKED 9 /**< waiting for a condition variable */
/** @} */
/**
@ -162,8 +163,8 @@
*/
#define STATUS_ON_RUNQUEUE STATUS_RUNNING /**< to check if on run queue:
`st >= STATUS_ON_RUNQUEUE` */
#define STATUS_RUNNING 9 /**< currently running */
#define STATUS_PENDING 10 /**< waiting to be scheduled to run */
#define STATUS_RUNNING 10 /**< currently running */
#define STATUS_PENDING 11 /**< waiting to be scheduled to run */
/** @} */
/**

11
tests/cond_order/Makefile Normal file
View File

@ -0,0 +1,11 @@
include ../Makefile.tests_common
BOARD_INSUFFICIENT_MEMORY := nucleo32-f031 nucleo32-f042 nucleo32-l031 nucleo-f030 \
nucleo-l053 stm32f0discovery arduino-duemilanove \
arduino-uno nucleo-f030r8 nucleo-f031k6 nucleo-f042k6 \
nucleo-l031k6 nucleo-l053r8
include $(RIOTBASE)/Makefile.include
test:
tests/01-run.py

View File

@ -0,0 +1,35 @@
Expected result
===============
When successful, you should see 5 different threads printing their PID and
priority. The thread with the lowest priority should be able to signaled first,
followed by the other threads in the order of their priority (highest next). If
the main thread holds the lock, however, none of the other threads should be
able to make progress. The output should look like the following:
```
main(): This is RIOT! (Version: 2018.01-devel-1120-g811de-starbeam-feature-condition-variable)
Condition variable order test
Please refer to the README.md for more information
T3 (prio 6): waiting on condition variable now
T4 (prio 4): waiting on condition variable now
T5 (prio 0): waiting on condition variable now
T6 (prio 2): waiting on condition variable now
T7 (prio 1): waiting on condition variable now
First batch was signaled
T5 (prio 0): condition variable was signaled now
T7 (prio 1): condition variable was signaled now
T6 (prio 2): condition variable was signaled now
First batch has woken up
Second batch was signaled
T4 (prio 4): condition variable was signaled now
T3 (prio 6): condition variable was signaled now
Second batch has woken up
Test END, check the order of priorities above.
```
Background
==========
This test application stresses a condition variable with a number of threads
waiting on it. The threads are signaled (awakened) in two batches.

97
tests/cond_order/main.c Normal file
View File

@ -0,0 +1,97 @@
/*
* Copyright (C) 2018 University of California, Berkeley
* Copyright (C) 2016 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup tests
* @{
*
* @file
* @brief Test application for testing mutexes
*
* @author Sam Kumar <samkumar@cs.berkeley.edu>,
* Hauke Petersen <hauke.petersen@fu-berlin.de>
* @}
*/
#include <stdio.h>
#include "cond.h"
#include "mutex.h"
#include "thread.h"
#define THREAD_NUMOF (5U)
#define THREAD_FIRSTGROUP_NUMOF (3U)
extern volatile thread_t *sched_active_thread;
static char stacks[THREAD_NUMOF][THREAD_STACKSIZE_MAIN];
static const char prios[THREAD_NUMOF] = {THREAD_PRIORITY_MAIN - 1, 4, 0, 2, 1};
static mutex_t testlock;
static cond_t testcond;
static void *lockme(void *arg)
{
(void)arg;
volatile thread_t *t = sched_active_thread;
mutex_lock(&testlock);
printf("T%i (prio %i): waiting on condition variable now\n",
(int)t->pid, (int)t->priority);
cond_wait(&testcond, &testlock);
printf("T%i (prio %i): condition variable was signaled now\n",
(int)t->pid, (int)t->priority);
mutex_unlock(&testlock);
thread_yield();
mutex_unlock(&testlock);
return NULL;
}
int main(void)
{
puts("Condition variable order test");
puts("Please refer to the README.md for more information\n");
mutex_init(&testlock);
cond_init(&testcond);
/* create threads */
for (unsigned i = 0; i < THREAD_NUMOF; i++) {
thread_create(stacks[i], sizeof(stacks[i]), prios[i], 0,
lockme, NULL, "t");
}
/* allow threads to lock the mutex and wait on the condition variable */
/* signal the first few threads, in a group */
mutex_lock(&testlock);
for (unsigned i = 0; i < THREAD_FIRSTGROUP_NUMOF; i++) {
cond_signal(&testcond);
}
printf("First batch was signaled\n");
mutex_unlock(&testlock);
/* allow the first THREAD_FIRSTGROUP_NUMOF threads to wake up */
printf("First batch has woken up\n");
mutex_lock(&testlock);
cond_broadcast(&testcond);
printf("Second batch was signaled\n");
mutex_unlock(&testlock);
/* allow the remaining threads to wake up */
printf("Second batch has woken up\n");
mutex_lock(&testlock);
puts("\nTest END, check the order of priorities above.");
return 0;
}

View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
# Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
# Copyright (C) 2016 Oliver Hahm <oliver.hahm@inria.fr>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
thread_prio = {
3: 6,
4: 4,
5: 0,
6: 2,
7: 1
}
first_group_size = 3
def testfunc(child):
for k in thread_prio.keys():
child.expect(u"T%i \(prio %i\): waiting on condition variable now" % (k, thread_prio[k]))
count = 0
last = -1
child.expect(u"First batch was signaled")
for _ in range(len(thread_prio)):
child.expect(u"T\d+ \(prio (\d+)\): condition variable was signaled now")
assert(int(child.match.group(1)) > last)
last = int(child.match.group(1))
count += 1
if count == 3:
child.expect(u"First batch has woken up")
child.expect(u"Second batch was signaled")
child.expect(u"Second batch has woken up")
if __name__ == "__main__":
sys.path.append(os.path.join(os.environ['RIOTBASE'], 'dist/tools/testrunner'))
from testrunner import run
sys.exit(run(testfunc))