1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-15 17:12:45 +01:00
RIOT/sys/posix/pthread/pthread_rwlock.c

317 lines
11 KiB
C

/*
* Copyright (C) 2014 René Kijewski <rene.kijewski@fu-berlin.de>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* @ingroup pthread
* @{
*
* @file
* @brief Implementation of a fair, POSIX conforming reader/writer lock.
*
* @author René Kijewski <rene.kijewski@fu-berlin.de>
*
* @}
*/
#include <stdint.h>
#include <string.h>
#include "pthread.h"
#include "sched.h"
#include "xtimer.h"
#include "thread.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr)
{
(void) attr;
if (rwlock == NULL) {
DEBUG("Thread %" PRIkernel_pid " pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "init");
return EINVAL;
}
memset(rwlock, 0, sizeof (*rwlock));
return 0;
}
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "destroy");
return EINVAL;
}
/* do not unlock the mutex, no need */
if ((mutex_trylock(&rwlock->mutex) == 0) || (rwlock->readers != 0)) {
return EBUSY;
}
return 0;
}
bool __pthread_rwlock_blocked_readingly(const pthread_rwlock_t *rwlock)
{
if (rwlock->readers < 0) {
/* a writer holds the lock */
return true;
}
/* Determine if there is a writer waiting to get this lock who has a higher or the same priority: */
if (rwlock->queue.first == NULL) {
/* no waiting thread */
return false;
}
priority_queue_node_t *qnode = rwlock->queue.first;
if (qnode->priority > sched_active_thread->priority) {
/* the waiting thread has a lower priority */
return false;
}
/* if the waiting node is a writer, then we cannot enter the critical section (to prevent starving the writer) */
__pthread_rwlock_waiter_node_t *waiting_node = (__pthread_rwlock_waiter_node_t *) qnode->data;
return waiting_node->is_writer;
}
bool __pthread_rwlock_blocked_writingly(const pthread_rwlock_t *rwlock)
{
/* if any thread holds the lock, then no writer may enter the critical section */
return rwlock->readers != 0;
}
static int pthread_rwlock_lock(pthread_rwlock_t *rwlock,
bool (*is_blocked)(const pthread_rwlock_t *rwlock),
bool is_writer,
int incr_when_held,
bool allow_spurious)
{
if (rwlock == NULL) {
DEBUG("Thread %" PRIkernel_pid": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n",
thread_getpid(), "lock", is_writer, allow_spurious, "rwlock=NULL");
return EINVAL;
}
mutex_lock(&rwlock->mutex);
if (!is_blocked(rwlock)) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n",
thread_getpid(), "lock", is_writer, allow_spurious, "is open");
rwlock->readers += incr_when_held;
}
else {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n",
thread_getpid(), "lock", is_writer, allow_spurious, "is locked");
/* queue for the lock */
__pthread_rwlock_waiter_node_t waiting_node = {
.is_writer = is_writer,
.thread = (thread_t *) sched_active_thread,
.qnode = {
.next = NULL,
.data = (uintptr_t) &waiting_node,
.priority = sched_active_thread->priority,
},
.continue_ = false,
};
priority_queue_add(&rwlock->queue, &waiting_node.qnode);
while (1) {
/* wait to be unlocked, so this thread can try to acquire the lock again */
mutex_unlock_and_sleep(&rwlock->mutex);
mutex_lock(&rwlock->mutex);
if (waiting_node.continue_) {
/* pthread_rwlock_unlock() already set rwlock->readers */
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n",
thread_getpid(), "lock", is_writer, allow_spurious, "continued");
break;
}
else if (allow_spurious) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n",
thread_getpid(), "lock", is_writer, allow_spurious, "is timed out");
priority_queue_remove(&rwlock->queue, &waiting_node.qnode);
mutex_unlock(&rwlock->mutex);
return ETIMEDOUT;
}
}
}
mutex_unlock(&rwlock->mutex);
return 0;
}
static int pthread_rwlock_trylock(pthread_rwlock_t *rwlock,
bool (*is_blocked)(const pthread_rwlock_t *rwlock),
int incr_when_held)
{
if (rwlock == NULL) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "trylock");
return EINVAL;
}
else if (mutex_trylock(&rwlock->mutex) == 0) {
return EBUSY;
}
else if (is_blocked(rwlock)) {
mutex_unlock(&rwlock->mutex);
return EBUSY;
}
rwlock->readers += incr_when_held;
mutex_unlock(&rwlock->mutex);
return 0;
}
static int pthread_rwlock_timedlock(pthread_rwlock_t *rwlock,
bool (*is_blocked)(const pthread_rwlock_t *rwlock),
bool is_writer,
int incr_when_held,
const struct timespec *abstime)
{
uint64_t now = xtimer_now_usec64();
uint64_t then = ((uint64_t)abstime->tv_sec * US_PER_SEC) +
(abstime->tv_nsec / NS_PER_US);
if (now >= then) {
return ETIMEDOUT;
}
else {
xtimer_t timer;
xtimer_set_wakeup64(&timer, (then - now), sched_active_pid);
int result = pthread_rwlock_lock(rwlock, is_blocked, is_writer, incr_when_held, true);
if (result != ETIMEDOUT) {
xtimer_remove(&timer);
}
return result;
}
}
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock)
{
return pthread_rwlock_lock(rwlock, __pthread_rwlock_blocked_readingly, false, +1, false);
}
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock)
{
return pthread_rwlock_lock(rwlock, __pthread_rwlock_blocked_writingly, true, -1, false);
}
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock)
{
return pthread_rwlock_trylock(rwlock, __pthread_rwlock_blocked_readingly, +1);
}
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock)
{
return pthread_rwlock_trylock(rwlock, __pthread_rwlock_blocked_writingly, -1);
}
int pthread_rwlock_timedrdlock(pthread_rwlock_t *rwlock, const struct timespec *abstime)
{
return pthread_rwlock_timedlock(rwlock, __pthread_rwlock_blocked_readingly, false, +1, abstime);
}
int pthread_rwlock_timedwrlock(pthread_rwlock_t *rwlock, const struct timespec *abstime)
{
return pthread_rwlock_timedlock(rwlock, __pthread_rwlock_blocked_writingly, true, -1, abstime);
}
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "unlock");
return EINVAL;
}
mutex_lock(&rwlock->mutex);
if (rwlock->readers == 0) {
/* the lock is open */
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): lock is open\n", thread_getpid(), "unlock");
mutex_unlock(&rwlock->mutex);
return EPERM;
}
if (rwlock->readers > 0) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): release %s lock\n", thread_getpid(), "unlock", "read");
--rwlock->readers;
}
else {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): release %s lock\n", thread_getpid(), "unlock", "write");
rwlock->readers = 0;
}
if (rwlock->readers != 0 || rwlock->queue.first == NULL) {
/* this thread was not the last reader, or no one is waiting to aquire the lock */
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): no one is waiting\n", thread_getpid(), "unlock");
mutex_unlock(&rwlock->mutex);
return 0;
}
/* wake up the next thread */
priority_queue_node_t *qnode = priority_queue_remove_head(&rwlock->queue);
__pthread_rwlock_waiter_node_t *waiting_node = (__pthread_rwlock_waiter_node_t *) qnode->data;
waiting_node->continue_ = true;
uint16_t prio = qnode->priority;
sched_set_status(waiting_node->thread, STATUS_PENDING);
if (waiting_node->is_writer) {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n",
thread_getpid(), "unlock", "writer", waiting_node->thread->pid);
--rwlock->readers;
}
else {
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n",
thread_getpid(), "unlock", "reader", waiting_node->thread->pid);
++rwlock->readers;
/* wake up further readers */
while (rwlock->queue.first) {
waiting_node = (__pthread_rwlock_waiter_node_t *) rwlock->queue.first->data;
if (waiting_node->is_writer) {
/* Not to be unfair to writers, we don't try to wake up readers that came after the first writer. */
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continuing readers blocked by writer %" PRIkernel_pid "\n",
thread_getpid(), "unlock", waiting_node->thread->pid);
break;
}
waiting_node->continue_ = true;
DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n",
thread_getpid(), "unlock", "reader", waiting_node->thread->pid);
/* wake up this reader */
qnode = priority_queue_remove_head(&rwlock->queue);
if (qnode->priority < prio) {
prio = qnode->priority;
}
sched_set_status(waiting_node->thread, STATUS_PENDING);
++rwlock->readers;
}
}
mutex_unlock(&rwlock->mutex);
/* yield if a woken up thread had a higher priority */
sched_switch(prio);
return 0;
}