# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2025-2026 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2025-2026 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0
import threading
from time import monotonic
[docs]
def remaining(timeout, start):
"""
Calculate the amount of *timeout* remaining if the routine started at
*start* (measured by :func:`time.monotonic`).
If *timeout* is -1 (meaning infinite timeout), this function always returns
-1. Otherwise, the time remaining is calculated and clamped to 0.
"""
return (
-1 if timeout == -1 else
max(0, timeout - (monotonic() - start)))
[docs]
class LightSwitch:
"""
An auxiliary "light switch"-like object. The first thread to acquire the
switch also acquires the *lock* associated with the switch. The last thread
to release the switch also releases the lock associated with the switch.
If the first and last threads can differ, you must use a primitive
:class:`threading.Lock` or :class:`threading.Semaphore` (initialized to 1)
as the value of the *lock* parameter. If the first and last threads will
always be the same, you *may* use a re-entrant :class:`threading.RLock`.
This implementation is based on [1]_, sec. 4.2.2 but with additions to
permit non-blocking execution or timeouts in blocking mode, and operation
as a context manager.
.. [1] A.B. Downey: "The little book of semaphores", Version 2.2.1, 2016
https://greenteapress.com/wp/semaphores/
"""
def __init__(self, lock):
self._counter = 0
self._lock = lock
self._mutex = threading.Lock()
def acquire(self, blocking=True, timeout=-1):
start = monotonic()
if not self._mutex.acquire(blocking, timeout):
return False
try:
self._counter += 1
if self._counter == 1:
if self._lock.acquire(blocking, remaining(timeout, start)):
return True
else:
self._counter = 0
return False
else:
return True
finally:
self._mutex.release()
def release(self):
with self._mutex:
if not self._counter:
raise RuntimeError('Attempt to release an unacquired Switch')
self._counter -= 1
if self._counter == 0:
self._lock.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.release()
[docs]
class RWLockState:
"""
State of a thread in :class:`RWLock`. This tracks the number of re-entrant
calls a thread has made.
Consider a hypothetical thread performing a series of operations. The
values that its RWLockState move through will be as follows:
+---------------+------+-------+---------+-----------+
| Operation | read | write | ignored | Notes |
+===============+======+=======+=========+===========+
| | 0 | 0 | 0 | start |
+---------------+------+-------+---------+-----------+
| read.acquire | 1 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
| read.acquire | 2 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
| read.acquire | 3 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
| write.acquire | 3 | 1 | 0 | upgrade |
+---------------+------+-------+---------+-----------+
| write.acquire | 3 | 2 | 0 | |
+---------------+------+-------+---------+-----------+
| read.acquire | 3 | 2 | 1 | ignore |
+---------------+------+-------+---------+-----------+
| read.acquire | 3 | 2 | 2 | |
+---------------+------+-------+---------+-----------+
| read.release | 3 | 2 | 1 | |
+---------------+------+-------+---------+-----------+
| read.release | 3 | 2 | 0 | |
+---------------+------+-------+---------+-----------+
| write.release | 3 | 1 | 0 | |
+---------------+------+-------+---------+-----------+
| write.release | 3 | 0 | 0 | downgrade |
+---------------+------+-------+---------+-----------+
| read.release | 2 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
| read.release | 1 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
| read.release | 0 | 0 | 0 | |
+---------------+------+-------+---------+-----------+
upgrade
At this point this thread holds three re-entrant acquisitions of the
read lock, but has requested a write lock (effectively requesting to
upgrade its lock). What this does is release all the read locks held
(actually one lightswitch release), and start a write lock acquisition
ignore
If a thread holds any write locks (whether through upgrade, or because
it originally grabbed a write lock) all attempted acquisitions of a
read lock are ignored (but counted)
downgrade
Here a thread has released its last write lock while in an upgraded
state. The implementation will release the write lock then re-acquire
the number of read-locks specified (actually one lightswitch
acquisition)
"""
__slots__ = ('read', 'write', 'ignored')
def __init__(self):
self.read = 0
self.write = 0
self.ignored = 0
def __repr__(self):
return (
f'{self.__class__.__name__}('
f'read={self.read}, write={self.write}, ignored={self.ignored})')
[docs]
class RWLock:
"""
Synchronization object used in a solution of so-called second
readers-writers problem.
In this problem, many readers can simultaneously access a share, and a
writer has an exclusive access to this share. Additionally, the following
constraints should be met:
1. no reader should be kept waiting if the share is currently opened for
reading unless a writer is also waiting for the share,
2. no writer should be kept waiting for the share longer than absolutely
necessary.
The implementation is based on [1]_, secs. 4.2.2, 4.2.6 with modifications
for a more "Pythonic" style, and re-entrancy. The class provides two
objects, :attr:`read` and :attr:`write` which each support the context
manager protocol along with the typical ``acquire`` and ``release``
methods.
Unlike the implementation in the book, both ``read`` and ``write`` are
re-entrant (in the book version, ``read`` is re-entrant by virtue of it
being shared, but ``write`` is not). Furthermore, upgrade is permitted from
read lock to write lock, provided all locks are subsequently released in
the order they were obtained. In other words, a thread may obtain a read
lock, then implicitly "upgrade" to a write lock, provided it subsequently
releases the write lock, then the original read lock.
.. attribute:: read
The sub-object providing access to read locks. This object supports the
context manager protocol, and the typical ``acquire`` and ``release``
methods (see :meth:`Lock.acquire` and :meth:`Lock.release`).
The lock obtained via this object is re-entrant and shared (many
threads may simultaneously share the read lock).
.. attribute:: write
The sub-object providing access to write locks. This object supports
the context manager protocol, and the typical ``acquire`` and
``release`` methods (see :meth:`Lock.acquire` and
:meth:`Lock.release`).
The lock obtained via this object is re-entrant but exclusive (no other
thread may hold a read or write lock while this is held). A thread may
upgrade from a read lock to a write lock, but the lock is *not* held
continuously during upgrade: the read lock is released before the
upgraded write lock is obtained. Bear in mind another write lock *may*
be obtained by another thread before this thread obtains the upgraded
write lock.
"""
def __init__(self):
local = threading.local()
block_writers = threading.Lock()
block_readers = threading.Lock()
read_switch = LightSwitch(block_writers)
self.read = _ReadLock(local, read_switch, block_readers)
self.write = _WriteLock(local, read_switch, block_readers, block_writers)
class _BaseLock:
def __init__(self, local):
self._local = local
def _get_state(self):
# Retrieve the thread-local RWLockState instance. NOTE: Because this
# is thread-local we never need to worry about locking it (the instance
# will be specific to the executing thread and no other thread can
# modify it)
try:
state = self._local.state
except AttributeError:
state = self._local.state = RWLockState()
return state
class _ReadLock(_BaseLock):
def __init__(self, local, read_switch, block_readers):
super().__init__(local)
self._read_switch = read_switch
self._block_readers = block_readers
def acquire(self, blocking=True, timeout=-1):
start = monotonic()
state = self._get_state()
if state.write > 0:
# If this thread already holds a write lock (through upgrade or
# original acquisition), ignore (but count) the read acquisition
state.ignored += 1
return True
if state.read > 0:
# If the thread already holds only read locks, ignore (but count)
# the re-entrant attempt. NOTE: This is required to avoid deadlock
# when a pending write acquisition is holding _block_readers (to
# avoid starvation -- see below) but a pre-existing read needs to
# acquire more read locks to complete its processing and ultimately
# release its original lock
state.read += 1
return True
# NOTE: The block_readers lock *appears* pointless, but blocks the
# read lock acquisition when a writer holds it (see _WriteLock.acquire)
# to prevent writer starvation
if not self._block_readers.acquire(blocking, timeout):
return False
self._block_readers.release()
if not self._read_switch.acquire(blocking, remaining(timeout, start)):
return False
state.read = 1
return True
def release(self):
state = self._get_state()
if state.write > 0:
assert state.ignored > 0, 'released read before releasing write'
state.ignored -= 1
return
assert state.read > 0, 'released read too many times'
state.read -= 1
if state.read > 0:
return
self._read_switch.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *exc):
self.release()
class _WriteLock(_BaseLock):
def __init__(self, local, read_switch, block_readers, block_writers):
super().__init__(local)
self._read_switch = read_switch
self._block_readers = block_readers
self._block_writers = block_writers
def acquire(self, blocking=True, timeout=-1):
start = monotonic()
state = self._get_state()
if state.write > 0:
# Ignore (but count) re-entrant write attempts
state.write += 1
return True
if state.read > 0:
# A reader wishes to upgrade to a write lock; (temporarily) release
# its hold on _read_switch and begin the wait for a write lock. If
# the write lock fails, we *must* re-acquire _read_switch before
# returning.
# NOTE: _ReadLock.release is not called here because we want to
# preserve the thread's state counts
assert state.ignored == 0, 'double upgrade'
self._read_switch.release()
# NOTE: The read_switch acquires block_writers. Hence, when 1 or more
# readers have acquired read_switch, block_writers is acquired also
if not self._block_readers.acquire(blocking, remaining(timeout, start)):
if state.read > 0:
self._read_switch.acquire()
return False
if not self._block_writers.acquire(blocking, remaining(timeout, start)):
self._block_readers.release()
if state.read > 0:
self._read_switch.acquire()
return False
state.write = 1
return True
def release(self):
state = self._get_state()
assert state.write > 0, 'released write too many times'
state.write -= 1
if state.write > 0:
return
if state.read > 0:
# A reader that upgraded to a writer is now downgrading back to
# reader. In this case we "hack" the _read_switch to indicate we're
# still holding it. We can't acquire it "normally" by calling
# LightSwitch.acquire as we already hold _block_writers (which is
# not re-entrant), so we hack the internal count to 1 but maintain
# our hold on _block_writers.
#
# NOTE: the read switch count is the number of *distinct threads*
# that hold the read switch (which must be 1 in the case of a
# downgraded writer), *not* the number of re-entrant read locks
# this thread held before upgrade
assert state.ignored == 0, 'released write before releasing read'
with self._read_switch._mutex:
assert self._read_switch._counter == 0, (
'upgraders and readers co-existing')
self._read_switch._counter = 1
self._block_readers.release()
return
self._block_readers.release()
self._block_writers.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *exc):
self.release()