Writing a toy
Java Virtual Machine, as one does, I’ve come across the need to lock
objects when a synchronized method or block is run. When the first
version of Java was designed, multi-threading was the future, and as
such not really figured out yet: Posix’s multi-threading standard,
pthreads
was released in 1995 – the same year as HotJava,
and it took until C++11 for C/C++ to gain their memory model, which was
inspired by Java’s revamped
one from 2004. So Java went with the idea that maybe each object could
carry an intrinsic monitor (a lock and thread wait group) that you could
grab when accessing the object from multiple threads.
I don’t believe this feature, which offers no options such as
optimistic locking and makes reasoning about locking patterns harder,
sees particularly widespread use today. But we still need to implement
it. Ideally we’d only allocate the monitor when an object is actually
getting locked, but the simple solution is to store a full monitor in
every object. Maybe we can just use a std::sync::Mutex
?
Unfortunately in Java these locks are reëntrant – that is, you can
take the same lock multiple times from the same thread – while Rust’s
Mutex
isn’t. That makes sense, as locking a
Mutex
twice would bestow upon you the forbidden power of
multiple exclusive references to the same memory, anathema under the
sacred tenet of shared ⊕ mutable, while Java shares no such beliefs.
The parking_lot
crate provides various synchronization primitives,
ReentrantMutex
among them. Just what we need! However:
size_of::<(ReentrantMutex<()>, Condvar)>()
is 32 bytes (on a 64 bit architecture), quite a bit if we want to embed
it in every Java objectCondvar
doesn’t work with ReentrantMutex
in the first placeTo cook our own, we need two ingredients:
wait
and wake
operations, provided by
the operating system and nicely prepackaged for us by the atomic-wait
crateatomic_wait::wait
takes a reference to a 32 bit number
as well as the value we think is currently stored there. If they match,
it parks the thread so it can be woken up again when progress is
possible. Why isn’t supplying just the address (or some other unique
identifier) enough? Well, dealing with concurrency is the whole point,
and if the lock unlocked after we checked but before the syscall, we may
rest in Hypnos’ arms evermore if not for this safeguard.
So we’ll need a 32 bit number:
struct Monitor {
: AtomicU32,
lock}
Let’s define what its values mean while we’re at it:
const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;
And now we can lock it:
fn enter(monitor: &Monitor) {
loop {
match monitor
.lock
.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed)
{
Ok(_) => return,
Err(current) => {
atomic_wait::wait(&monitor.lock, current);
}
}
}
}
We try to put the lock into the locked state, and if it was already
locked we wait and try again when woken. Unlocking is simple too: Put
the lock into the unlocked state, then wake up a thread waiting on it
(if any). Because we pair a load operation (part of the
compare_exchange
) which has an Acquire
ordering with a Release
store on the same address, everything that happened before the store in
the thread that released the lock is now ordered before everything after
the load in the thread that just acquired the lock.
fn exit(monitor: &Monitor) {
.lock.store(UNLOCKED, Release);
monitoratomic_wait::wake_one(&monitor.lock);
}
Yay, we’ve got a (barebones) mutex! But right now, if we call
monitor.enter()
twice on the same thread, we deadlock.
Let’s fix that.
We’ll need to store which thread owns the monitor so we can continue if it’s locked by the current thread. We’ll also need to keep count of how many times we’ve entered the monitor on this thread so we know when we can unlock it:
struct Monitor {
: AtomicUsize,
owner: AtomicU32,
count}
count
has also taken over the function of the
lock
field we previously had: When count is 0, the monitor
is unlocked.
We’ve encountered a hiccup: We need a way to identify the current
thread. The obvious one is to use std::thread::ThreadId
,
but the method to turn that into an u64
we can use is
unstable. So we’ll make our own thread id:
fn thread_id() -> usize {
thread_local!(static KEY: u8 = const {0});
.with(|x| x as *const _ as usize)
KEY}
We take the address of a thread local variable! We use a variable of size greater than zero to ensure the address is actually unique. As a bonus, this only takes half the space on a 32 bit platform. The advantage of the standard library version is that it doesn’t get reused after the thread gets killed, but that’s not a property we care about.
Since the address will never be zero we can use that value to represent the lack of an owning thread.
const NO_THREAD: usize = 0;
When locking the monitor, we’ll now note down the thread id:
// in Monitor::enter
Ok(_) => {
.owner.store(thread_id(), Release);
monitorreturn;
}
and if the monitor is already locked, we check it:
// in Monitor::enter
Err(current) => {
if monitor.owner.load(Relaxed) == thread_id() {
.count.store(current+1, Relaxed);
monitorreturn;
} else {
atomic_wait::wait(&monitor.count, current);
}
}
Then when we exit the monitor:
// in Monitor::exit
let old_count = monitor.count.load(Relaxed);
if old_count > 1 {
.count.store(old_count - 1, Release);
monitor} else {
.owner.store(NO_THREAD, Release);
monitor.count.store(0, Release);
monitoratomic_wait::wake_one(&monitor.count);
}
The order of operation is important here. We need to clear the owner
before we release the lock, because otherwise we might overwrite it
after another thread has already taken the lock; and we need to release
the lock before we wake another thread so that that thread can proceed
instead of wait
ing again, potentially forever.
We also need to deal with the count overflowing. We could either panic on overflow, or we can refuse to unlock the object if the counter has overflown:
// in Monitor::enter
.count.store(current.saturating_add(1), Relaxed); monitor
// in Monitor::exit
if old_count == u32::MAX {
return;
}
Tada!
On Linux, the name of the syscall behind atomic-wait
is
SYS_futex
.
To my immense disappointment it turns out this isn’t a portmanteau of
“futanari” and “latex”. Rather it stands for “fast user-space mutex”:
The idea is to do what you can in user-space and only call the kernel
when the lock is contended. Right now we don’t do that – we make a
syscall every time we unlock the monitor, even if there’s no thread to
wake up!
Well afaik, depending on the platform, the underlying API may already
do this tracking for us. But for those that don’t, let’s keep track of
whether there’s another thread waiting on the monitor.
count
can spare a bit for that:
const CONTENDED: u32 = 1;
const INCREMENT: u32 = 2;
Buckle up everypony, this is going be a bit trickier. The first step of entering a monitor hasn’t changed much:
// in Monitor::enter
// Try to aquire the lock
let current = match monitor.count.compare_exchange(0, INCREMENT, Acquire, Relaxed) {
Ok(_) => {
.owner.store(thread_id(), Release);
monitorreturn;
}
Err(loaded) => loaded,
};
// Already locked, check if by the same thread
if monitor.owner.load(Acquire) == thread_id() {
.count.store(loaded.saturating_add(INCREMENT), Relaxed);
monitorreturn;
}
But now, if that fails, we set the CONTENDED
flag both
when we want to wait and when acquiring the lock. If setting the flag
fails, the kernel immediately returns from the wait call, preventing us
from deadlocking if we fail to set the flag. Before trying to acquire
the lock we do a load so the core our thread is running on doesn’t have
to take ownership of the cache line because of the
compare_exchange
if the monitor is still locked.
loop {
// Try to set flag
if (current & CONTENDED == 0) {
let _ = monitor
.count
.compare_exchange_weak(current, current | CONTENDED, Release, Relaxed);
}
// Wait if flag successfully set
atomic_wait::wait(&self.count, current | CONTENDED);
// Try to aquire the lock
= self.count.load(Relaxed);
current if (current & !CONTENDED) == 0 {
match self
.count
.compare_exchange(current, INCREMENT | CONTENDED, Acquire, Relaxed)
{
Ok(_) => {
self.locked_by.store(thread_id(), Release);
return;
}
Err(value) => {
= value;
current }
}
}
}
When we decrement the count and the thread is still holding the lock,
we need to be careful not to clear the CONTENDED
flag. But
when unlocking the monitor we clear the CONTENDED
flag, and
only notify a thread when it was set – which was the entire reason we’re
doing this. There may have been more than one thread waiting, which is
why the woken up thread sets the flag again even if it successfully
takes the lock.
// in Monitor::exit
let old_count = monitor.count.load(Acquire);
if old_count > INCREMENT | CONTENDED {
// Lock still held by this thread
let loaded = count(monitor).swap(current - INCREMENT, AcqRel);
if (current & CONTENDED == 0) & (loaded & CONTENDED == CONTENDED) {
.store((current - INCREMENT) | CONTENDED, Relaxed);
count(monitor)}
} else {
// Unlock the monitor
.owner.store(NO_THREAD, Release);
monitorif monitor.count.swap(0, AcqRel) == INCREMENT | CONTENDED {
atomic_wait::wake_one(&monitor.count);
}
}
Let’s try out our monitor by writing a test that spawns a bunch of threads that each enter the monitor twice (testing reëntrancy), increase a shared variable by one, then exit the monitor twice; we then check whether the variable has reached the value we expect (testing mutual exclusion). Let’s run our test and… it hangs?
running 1 test
test monitor::test::monitor_contention has been running for over 60 seconds
Usually Rust code gets compiled to native code – but not necessarily.
rustc
actually comes with an interpreter for its mid-level
intermediate representation (Mir), aptly named Miri. Every time rustc
needs to evaluate const expression, miri is at work. But it truly shines
as a tool for debugging undefined behavior, or in our case
deadlocks.
Invoking it is as easy as cargo miri test
. And sure, it
detects the deadlock:
[snip]
error: deadlock: the evaluated program deadlocked
--> /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/linux.rs:12:9
|
12 | );
| ^ the evaluated program deadlocked
|
= note: BACKTRACE on thread `unnamed-3`:
= note: inside `atomic_wait::platform::wait` at /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/linux.rs:12:9: 12:10
= note: inside `atomic_wait::wait` at /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/lib.rs:28:5: 28:34
note: inside `monitor::enter_locked`
--> src/monitor.rs:121:17
|
121 | atomic_wait::wait(&monitor.count, loaded | CONTENDED);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[snip]
If we run Miri again, we’ll get the exact same error again, because
Miri is deterministic (unless we disable its sandbox, which we don’t
need to do here). If we want different executions, we can run it with
--many-seeds
; if we want to take a closer look at one of
them we can instead specify it with
MIRIFLAGS="-Zmiri-seed=<seed>"
. The seed affects
e.g. when different threads get preëmpted or what value (from the set of
possibly observable values) an atomic load reads. Because yeah, Miri can
emulate weak memory too!
The error message tells us that a thread is stuck at the
atomic_wait::wait
call. No surprise there. To find out why,
let’s sprinkle in a few logging statements to see what’s happening. We
can’t use println!()
, as that locks stdout, which
synchronizes the threads and makes the race condition vanish, but
luckily Miri is nice enough to provide a function we can link to:
#[cfg(miri)]
extern "Rust" {
fn miri_write_to_stdout(bytes: &[u8]);
}
macro_rules! miri_println {
$fmt:literal $(, $exprs:expr)*) => {
(#[cfg(miri)]
unsafe {
let message = format!($fmt, $($exprs),*) + "\n";
.as_bytes());
miri_write_to_stdout(message}
};
}
Add in some prints, and behold:
[snip]
Thread 2536232: enter called, count == 2
Thread 2483964: enter called, count == 2
Thread 2483964: lock owned by different thread, set contended flag (count = 3)
Thread 2483964: try to sleep on count == 3
Thread 2536232: lock owned by same thread, set count to 4
[snip]
So what happened here? Well the first thread already owns the lock,
reads the current count and increments it. Before it can write it back,
the other thread fails to acquire the lock, sets the
CONTENDED
flag, and goes to sleep. When the first thread
finally issues the store, it overwrites the flag:
.count.store(loaded.saturating_add(INCREMENT), Relaxed); monitor
The fix is to set the flag again if that happens:
let new_count = loaded.saturating_add(INCREMENT);
if monitor.count.swap(new_count, Relaxed) & CONTENDED != 0 {
.count.store(new_count | CONTENDED, Relaxed);
monitor}
Whew!
Immediately giving up and parking the thread when we can’t acquire the lock is a bit pessimistic. A trick many locks use is to keep checking the lock in a hot loop for a little bit in case it’s about to get unlocked. There’s even a kind of mutex, the spin lock, that works entirely on this basis. Whether this makes sense depends on how long you expect the lock to be held for, as well as how you value latency and throughput as opposed to efficiency.
Unfortunately, the toy JVM this is going into is purely an interpreter, and thus not particularly fast, so critical sections written in Java are going to take a lot longer than in Rust (though whether it’s worth it can only be really decided with realistic benchmarks, of course). I’ll revisit this point when I’ve got good benchmarks, but writing concurrency primitives is already hard enough; benchmarking them across platforms is not quite my idea of a fun time.
Right now, on a 64 bit system our monitor looks like this:
┌───────────────┬───────┬───────┐
│ owner │ count │ │
└───────────────┴───────┴───────┘
0 7 8 B F
The order of fields can differ, but in any case we’re wasting 4 bytes on padding because the 8 byte atomic will need to be 8 byte aligned. Wait a moment, we’re going to embed this in Java heap object whose layout me have to manually (and dynamically) manage anyways… let’s just ignore padding! This means we’re not going to use references, since they have to point at a valid structure. Instead we can use pointers:
fn enter(monitor: *const Monitor) {…
But let’s hold off on that optimization for a moment. Can we reduce the space the thread id takes up? Instead of taking the address of a thread-local, let’s just store our id in it:
thread_local!(static THREAD_ID: u32 = const { Cell::new(NO_THREAD) });
Every time we create a thread, we’ll allocate a thread id:
static PREVIOUS_THREAD_ID: Mutex<u32> = Mutex::new(NO_THREAD);
fn init_thread_id() {
let alloc = PREVIOUS_THREAD_ID.lock().unwrap();
let id = alloc.checked_add(1).expect("Too many threads");
.set(id);
THREAD_ID*alloc = id;
}
That works, but should a program keep spawning (and killing) enough threads long enough, we would eventually run out of ids, so let’s reclaim them when a thread gets killed:
struct ThreadIdAlloc {
: u32,
previous: Vec<u32>
freelist}
static THREAD_ID_ALLOC: Mutex<ThreadIdAlloc> = Mutex::new(ThreadIdAlloc {
: NO_THREAD,
previous: Vec::new()
freelist});
thread_local!(static THREAD_ID: SmallThreadId = const { ThreadId(Cell::new(NO_THREAD)) });
struct SmallThreadId(Cell<u32>);
impl Drop for SmallThreadId {
fn drop(&mut self) {
let id = self.0.get();
if id != NO_THREAD {
.lock().unwrap().freelist.push(id);
THREAD_ID_ALLOC}
}
}
fn thread_id() -> u32 {
.with(|id| id.0.get())
THREAD_ID}
fn init_thread_id() {
let mut alloc = THREAD_ID_ALLOC.lock().unwrap();
let id = alloc.freelist.pop().unwrap_or_else(|| {
let id = alloc.previous.checked_add(1).expect("Too many threads");
.previous = id;
alloc
id});
.with(|key| key.0.set(id))
THREAD_ID}
According to the docs, destructors of thread-locals aren’t guaranteed to run, but we don’t care about the cases they don’t (and a few ids leaking isn’t a problem).
┌───────┬───────┐
│ owner │ count │
└───────┴───────┘
0 3 4 7
A further step could be reducing the size of count
depending on the platform. While we’re stuck with 32 bit on Linux, we
can specify the size used on other operating system, e.g. with the
AddressSize
parameter for WaitOnAddress
on Windows. That comes with the obvious downside of reducing the maximum
count, so let’s table (in American parlance) it for now. We’ve shrunk
our monitor down to 8 bytes with the option to shrink it further by
compromising on threat count or recursion limits.
Besides entering and exiting the monitor, we need to support letting a thread wait on an object until it’s notified. To do that, we add another field:
pub(crate) struct Monitor {
: AtomicU32,
owner: AtomicU32,
count: AtomicU32
condvar}
I’ll call it condvar
here because this feature is
primarily used as a condition variable: A queue of threads waiting for
the data in the object this monitor is attached to to conform to a
condition. Java defines three operations, all of which require the
caller to hold the lock:
wait
drops the lock and sleeps until it’s woken up,
then reäcquires the lock. Importantly, it’s allowed for the thread to
wake up spuriously instead of because of an notification.notify
wakes up one thread waiting on this object.notify_all
wakes all of them.Our lock is reëntrant, so we can’t just release it by calling
exit
, as we may have entered the monitor multiple times,
but releasing the lock is simple. We don’t even need to clear the
owner
field.
pub fn wait(monitor: &Monitor) {
let condvar_state = monitor.condvar.load(Relaxed);
let count = monitor.count.swap(0, AcqRel);
if (count & CONTENDED) == CONTENDED {
atomic_wait::wake_one(&monitor.count);
}
Then we wait. What if another thread has taken the lock and send a notification before we can wait? Because of that case we change the value of the condvar when we do a notification, which will cause the wait to fail:
atomic_wait::wait(&monitor.condvar, condvar_state);
Now we can require the lock and restore the lock count. Careful to
not accidentally clear the CONTENDED
flag!
;
enter(monitor)if monitor.count.swap(count, Relaxed) & CONTENDED != 0 {
.count.store(count | CONTENDED, Relaxed);
monitor}
}
In order to notify a thread (or all waiting threads in
notify_all
) we change the value stored in the condvar, then
wake the thread(s).
fn notify_one(monitor: &Monitor) {
.condvar.fetch_add(1, Relaxed);
monitoratomic_wait::wake_one(&monitor.condvar);
}
Do we really need to use an extra field? We already have a field
that’s currently not getting waited on: owner
. Good thing
we reduced it to 32 bit, as mixed-size atomic accesses to the same
address are undefined behavior in Rust (and even if they weren’t, CPUs
aren’t necessarily happy about them).
We don’t need to change much, just don’t do any mutations in
notify_one
, wait on owner
and skip checking
which thread owns the lock when trying to reäcquiring it after the wait.
This works because to notify the object a thread will have to lock it
first, which overwrites the owner
and thereby prevent the
wait
. It does however mean that even when not sending a
notification, this causes a spurious wakeup, which is… not great.
However, it’s semantically correct, so I’ll take it.
A possible fix could be to reserve a bit in the thread id and set it when sending a notification. If the wait fails and the bit isn’t set we can try to sleep again; if it is, we clear it and continue. That also means we have to be careful not to accidentally clear the bit when acquiring a lock.
When we notify_all
a monitor, all threads waiting on it
will wake up… and try to acquire the same lock, even though at most one
of them can succeed. On Linux, we can choose to wake up a single thread
and move the rest over to waiting on count
instead of on
locked_by
. atomic-wait
, focussing a minimal,
portable api doesn’t cover this, so we’ll do the syscall ourselves:
unsafe {
libc::syscall(
libc::SYS_futex,
// uaddr: Wait queue to wake threads up from
&self.locked_by,
// futex_op: Operation to perform;
// futex is not shared between processes
libc::FUTEX_REQUEUE | libc::FUTEX_PRIVATE_FLAG,
// val: Number of threads to wake
1,
// val2: Max number of threads to requeue
i32::MAX,
// uaddr2: Wait queue to move threads to
&self.count,
;
)};
As the woken-up thread will set the CONTENDED
flag,
threads will get woken up on lock release till the wait queue is
empty.
Let’s do a quick sanity check regarding performance. Our lock is probably going to lag behind a more refined implementation by someone who actually knows what they’re doing, but hopefully it won’t be too painful. Remember not to put too much stock in these numbers! This is only going to be a very limited microbenchmark, run on a single OS (Linux) on a single architecture (amd64), and we’re only taking a look at total execution time, not at the cpu time – when two implementations run in the same amount of time, the version that spent the least energy and cpu share is obviously preferable.
Here are the results:
Our monitor time: [6.1984 ms 6.3496 ms 6.4977 ms]
Other implementation time: [5.6999 ms 5.8481 ms 5.9992 ms]
change: [-10.958% -7.8976% -4.9026%] (p = 0.00 < 0.05) Performance has improved.
Just kidding, I lied. Both versions are our implementation, I just forgot to turn off cpu frequency scaling. Without it, the results are very consistent.
sudo cpupower frequency-set --governor performance
We’ll be running the following function on the same object with iterations split among a different number of threads:
class Monitor {
void stress(int iterations) {
for(int i = 0; i<iterations; i++) {
synchronized (this) {
for(int delay=0; delay<10; delay++) {}
}
}
}
}
I’ll use Criterion to run the benches as it takes care of warming up our benchmark, gathering statistics, and comparing them.
cores: 6 physical, 12 logical
[parking_lot]
with_1_thread time: [17.250 ms 17.296 ms 17.349 ms]
with_4_threads time: [27.576 ms 27.619 ms 27.660 ms]
with_32_threads time: [141.38 ms 142.43 ms 143.39 ms]
[our version]
with_1_thread time: [16.805 ms 16.879 ms 16.963 ms]
change: [-2.8942% -2.4093% -1.8563%] (p = 0.00 < 0.05)
Performance has improved.
with_4_threads time: [31.395 ms 31.535 ms 31.670 ms]
change: [+13.628% +14.181% +14.770%] (p = 0.00 < 0.05)
Performance has regressed.
with_32_threads time: [38.617 ms 38.697 ms 38.788 ms]
change: [-73.022% -72.830% -72.619%] (p = 0.00 < 0.05) Performance has improved.
Huh, we’re actually mostly keeping up. Interestingly, with a lot of threads we come out significantly on top, but if you’ve got that many threads vying for the same lock in a hot loop, there’s something seriously wrong with your architecture.
In the beginning, I mentioned that ideally you’d only allocate the
lock under contention. That’s exactly what parking_lot
’s core idea is: Its
RawMutex
consists of a single byte storing whether the
mutex is locked and whether it’s contended. If a thread needs to wait, a
structure containing the lock whose address is passed to
SYS_futex
/WaitOnAddress
/etc as well as any
necessary metadata gets allocated in the parking lot, a global
HashMap
keyed by the RawMutex
’s address. To
support reëntrancy and Object::wait
without making the
per-object overhead bigger, we could put the additional metadata there.
I’d expect taking the lock reëntrantly might have a small performance
penality, but that doesn’t matter much.
Still, for now I’m happy with what we’ve got. Although there are some
caveats regarding spurious wakeups, we’ve focused mostly on Linux, and
it isn’t as small as it could be, we’ve managed to reduce the size of
the monitor down to a quarter of the
(ReentrantMutex<()>, Condvar)
version. Not half bad,
given that this was my first time implementing a concurrency primitive
or messing with Miri. Here’s
the full code if you want to check it out. Toodles!