/* * Copyright (C) 2014 René Kijewski * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ /** * @ingroup pthread * @{ * * @file * @brief Implementation of a fair, POSIX conforming reader/writer lock. * * @author René Kijewski * * @} */ #include #include #include "pthread.h" #include "sched.h" #include "ztimer64.h" #include "timex.h" #include "thread.h" #define ENABLE_DEBUG 0 #include "debug.h" int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr) { (void) attr; if (rwlock == NULL) { DEBUG("Thread %" PRIkernel_pid " pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "init"); return EINVAL; } memset(rwlock, 0, sizeof (*rwlock)); return 0; } int pthread_rwlock_destroy(pthread_rwlock_t *rwlock) { if (rwlock == NULL) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "destroy"); return EINVAL; } /* do not unlock the mutex, no need */ if ((mutex_trylock(&rwlock->mutex) == 0) || (rwlock->readers != 0)) { return EBUSY; } return 0; } bool __pthread_rwlock_blocked_readingly(const pthread_rwlock_t *rwlock) { if (rwlock->readers < 0) { /* a writer holds the lock */ return true; } /* Determine if there is a writer waiting to get this lock who has a higher or the same priority: */ if (rwlock->queue.first == NULL) { /* no waiting thread */ return false; } priority_queue_node_t *qnode = rwlock->queue.first; if (qnode->priority > thread_get_active()->priority) { /* the waiting thread has a lower priority */ return false; } /* if the waiting node is a writer, then we cannot enter the critical section (to prevent starving the writer) */ __pthread_rwlock_waiter_node_t *waiting_node = (__pthread_rwlock_waiter_node_t *) qnode->data; return waiting_node->is_writer; } bool __pthread_rwlock_blocked_writingly(const pthread_rwlock_t *rwlock) { /* if any thread holds the lock, then no writer may enter the critical section */ return rwlock->readers != 0; } static int pthread_rwlock_lock(pthread_rwlock_t *rwlock, bool (*is_blocked)(const pthread_rwlock_t *rwlock), bool is_writer, int incr_when_held, bool allow_spurious) { if (rwlock == NULL) { DEBUG("Thread %" PRIkernel_pid": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n", thread_getpid(), "lock", is_writer, allow_spurious, "rwlock=NULL"); return EINVAL; } mutex_lock(&rwlock->mutex); if (!is_blocked(rwlock)) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n", thread_getpid(), "lock", is_writer, allow_spurious, "is open"); rwlock->readers += incr_when_held; } else { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n", thread_getpid(), "lock", is_writer, allow_spurious, "is locked"); /* queue for the lock */ __pthread_rwlock_waiter_node_t waiting_node = { .is_writer = is_writer, .thread = thread_get_active(), .qnode = { .next = NULL, .data = (uintptr_t) &waiting_node, .priority = thread_get_active()->priority, }, .continue_ = false, }; priority_queue_add(&rwlock->queue, &waiting_node.qnode); while (1) { /* wait to be unlocked, so this thread can try to acquire the lock again */ mutex_unlock_and_sleep(&rwlock->mutex); mutex_lock(&rwlock->mutex); if (waiting_node.continue_) { /* pthread_rwlock_unlock() already set rwlock->readers */ DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n", thread_getpid(), "lock", is_writer, allow_spurious, "continued"); break; } else if (allow_spurious) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): is_writer=%u, allow_spurious=%u %s\n", thread_getpid(), "lock", is_writer, allow_spurious, "is timed out"); priority_queue_remove(&rwlock->queue, &waiting_node.qnode); mutex_unlock(&rwlock->mutex); return ETIMEDOUT; } } } mutex_unlock(&rwlock->mutex); return 0; } static int pthread_rwlock_trylock(pthread_rwlock_t *rwlock, bool (*is_blocked)(const pthread_rwlock_t *rwlock), int incr_when_held) { if (rwlock == NULL) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "trylock"); return EINVAL; } else if (mutex_trylock(&rwlock->mutex) == 0) { return EBUSY; } else if (is_blocked(rwlock)) { mutex_unlock(&rwlock->mutex); return EBUSY; } rwlock->readers += incr_when_held; mutex_unlock(&rwlock->mutex); return 0; } static int pthread_rwlock_timedlock(pthread_rwlock_t *rwlock, bool (*is_blocked)(const pthread_rwlock_t *rwlock), bool is_writer, int incr_when_held, const struct timespec *abstime) { uint64_t now = ztimer64_now(ZTIMER64_USEC); uint64_t then = ((uint64_t)abstime->tv_sec * US_PER_SEC) + (abstime->tv_nsec / NS_PER_US); if (now >= then) { return ETIMEDOUT; } else { ztimer64_t timer; ztimer64_set_wakeup(ZTIMER64_USEC, &timer, (then - now), thread_getpid()); int result = pthread_rwlock_lock(rwlock, is_blocked, is_writer, incr_when_held, true); if (result != ETIMEDOUT) { ztimer64_remove(ZTIMER64_USEC, &timer); } return result; } } int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock) { return pthread_rwlock_lock(rwlock, __pthread_rwlock_blocked_readingly, false, +1, false); } int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock) { return pthread_rwlock_lock(rwlock, __pthread_rwlock_blocked_writingly, true, -1, false); } int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock) { return pthread_rwlock_trylock(rwlock, __pthread_rwlock_blocked_readingly, +1); } int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock) { return pthread_rwlock_trylock(rwlock, __pthread_rwlock_blocked_writingly, -1); } int pthread_rwlock_timedrdlock(pthread_rwlock_t *rwlock, const struct timespec *abstime) { return pthread_rwlock_timedlock(rwlock, __pthread_rwlock_blocked_readingly, false, +1, abstime); } int pthread_rwlock_timedwrlock(pthread_rwlock_t *rwlock, const struct timespec *abstime) { return pthread_rwlock_timedlock(rwlock, __pthread_rwlock_blocked_writingly, true, -1, abstime); } int pthread_rwlock_unlock(pthread_rwlock_t *rwlock) { if (rwlock == NULL) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): rwlock=NULL supplied\n", thread_getpid(), "unlock"); return EINVAL; } mutex_lock(&rwlock->mutex); if (rwlock->readers == 0) { /* the lock is open */ DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): lock is open\n", thread_getpid(), "unlock"); mutex_unlock(&rwlock->mutex); return EPERM; } if (rwlock->readers > 0) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): release %s lock\n", thread_getpid(), "unlock", "read"); --rwlock->readers; } else { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): release %s lock\n", thread_getpid(), "unlock", "write"); rwlock->readers = 0; } if (rwlock->readers != 0 || rwlock->queue.first == NULL) { /* this thread was not the last reader, or no one is waiting to acquire the lock */ DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): no one is waiting\n", thread_getpid(), "unlock"); mutex_unlock(&rwlock->mutex); return 0; } /* wake up the next thread */ priority_queue_node_t *qnode = priority_queue_remove_head(&rwlock->queue); __pthread_rwlock_waiter_node_t *waiting_node = (__pthread_rwlock_waiter_node_t *) qnode->data; waiting_node->continue_ = true; uint16_t prio = qnode->priority; sched_set_status(waiting_node->thread, STATUS_PENDING); if (waiting_node->is_writer) { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n", thread_getpid(), "unlock", "writer", waiting_node->thread->pid); --rwlock->readers; } else { DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n", thread_getpid(), "unlock", "reader", waiting_node->thread->pid); ++rwlock->readers; /* wake up further readers */ while (rwlock->queue.first) { waiting_node = (__pthread_rwlock_waiter_node_t *) rwlock->queue.first->data; if (waiting_node->is_writer) { /* Not to be unfair to writers, we don't try to wake up readers that came after the first writer. */ DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continuing readers blocked by writer %" PRIkernel_pid "\n", thread_getpid(), "unlock", waiting_node->thread->pid); break; } waiting_node->continue_ = true; DEBUG("Thread %" PRIkernel_pid ": pthread_rwlock_%s(): continue %s %" PRIkernel_pid "\n", thread_getpid(), "unlock", "reader", waiting_node->thread->pid); /* wake up this reader */ qnode = priority_queue_remove_head(&rwlock->queue); if (qnode->priority < prio) { prio = qnode->priority; } sched_set_status(waiting_node->thread, STATUS_PENDING); ++rwlock->readers; } } mutex_unlock(&rwlock->mutex); /* yield if a woken up thread had a higher priority */ sched_switch(prio); return 0; }