Futexes at Home

Writing a toy Java Virtual Machine, as one does, I’ve come across the need to lock objects when a synchronized method or block is run. When the first version of Java was designed, multi-threading was the future, and as such not really figured out yet: Posix’s multi-threading standard, pthreads was released in 1995 – the same year as HotJava, and it took until C++11 for C/C++ to gain their memory model, which was inspired by Java’s revamped one from 2004. So Java went with the idea that maybe each object could carry an intrinsic monitor (a lock and thread wait group) that you could grab when accessing the object from multiple threads.

I don’t believe this feature, which offers no options such as optimistic locking and makes reasoning about locking patterns harder, sees particularly widespread use today. But we still need to implement it. Ideally we’d only allocate the monitor when an object is actually getting locked, but the simple solution is to store a full monitor in every object. Maybe we can just use a std::sync::Mutex?

Unfortunately in Java these locks are reëntrant – that is, you can take the same lock multiple times from the same thread – while Rust’s Mutex isn’t. That makes sense, as locking a Mutex twice would bestow upon you the forbidden power of multiple exclusive references to the same memory, anathema under the sacred tenet of shared ⊕ mutable, while Java shares no such beliefs.

The parking_lot crate provides various synchronization primitives, ReentrantMutex among them. Just what we need! However:

To cook our own, we need two ingredients:

Baby’s first mutex

atomic_wait::wait takes a reference to a 32 bit number as well as the value we think is currently stored there. If they match, it parks the thread so it can be woken up again when progress is possible. Why isn’t supplying just the address (or some other unique identifier) enough? Well, dealing with concurrency is the whole point, and if the lock unlocked after we checked but before the syscall, we may rest in Hypnos’ arms evermore if not for this safeguard.

So we’ll need a 32 bit number:

struct Monitor {
    lock: AtomicU32,
}

Let’s define what its values mean while we’re at it:

const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;

And now we can lock it:

fn enter(monitor: &Monitor) {
    loop {
        match monitor
            .lock
            .compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed)
        {
            Ok(_) => return,
            Err(current) => {
                atomic_wait::wait(&monitor.lock, current);
            }
        }
    }
}

We try to put the lock into the locked state, and if it was already locked we wait and try again when woken. Unlocking is simple too: Put the lock into the unlocked state, then wake up a thread waiting on it (if any). Because we pair a load operation (part of the compare_exchange) which has an Acquire ordering with a Release store on the same address, everything that happened before the store in the thread that released the lock is now ordered before everything after the load in the thread that just acquired the lock.

fn exit(monitor: &Monitor) {
    monitor.lock.store(UNLOCKED, Release);
    atomic_wait::wake_one(&monitor.lock);
}

Reëntrancy

Yay, we’ve got a (barebones) mutex! But right now, if we call monitor.enter() twice on the same thread, we deadlock. Let’s fix that.

We’ll need to store which thread owns the monitor so we can continue if it’s locked by the current thread. We’ll also need to keep count of how many times we’ve entered the monitor on this thread so we know when we can unlock it:

struct Monitor {
    owner: AtomicUsize,
    count: AtomicU32,
}

count has also taken over the function of the lock field we previously had: When count is 0, the monitor is unlocked.

We’ve encountered a hiccup: We need a way to identify the current thread. The obvious one is to use std::thread::ThreadId, but the method to turn that into an u64 we can use is unstable. So we’ll make our own thread id:

fn thread_id() -> usize {
    thread_local!(static KEY: u8 = const {0});
    KEY.with(|x| x as *const _ as usize)
}

We take the address of a thread local variable! We use a variable of size greater than zero to ensure the address is actually unique. As a bonus, this only takes half the space on a 32 bit platform. The advantage of the standard library version is that it doesn’t get reused after the thread gets killed, but that’s not a property we care about.

Since the address will never be zero we can use that value to represent the lack of an owning thread.

const NO_THREAD: usize = 0;

When locking the monitor, we’ll now note down the thread id:

// in Monitor::enter
Ok(_) => {
    monitor.owner.store(thread_id(), Release);
    return;
}

and if the monitor is already locked, we check it:

// in Monitor::enter
Err(current) => {
    if monitor.owner.load(Relaxed) == thread_id() {
        monitor.count.store(current+1, Relaxed);
        return;
    } else {
        atomic_wait::wait(&monitor.count, current);
    }
}

Then when we exit the monitor:

// in Monitor::exit
let old_count = monitor.count.load(Relaxed);
if old_count > 1 {
    monitor.count.store(old_count - 1, Release);
} else {
    monitor.owner.store(NO_THREAD, Release);
    monitor.count.store(0, Release);
    atomic_wait::wake_one(&monitor.count);
}

The order of operation is important here. We need to clear the owner before we release the lock, because otherwise we might overwrite it after another thread has already taken the lock; and we need to release the lock before we wake another thread so that that thread can proceed instead of waiting again, potentially forever.

We also need to deal with the count overflowing. We could either panic on overflow, or we can refuse to unlock the object if the counter has overflown:

// in Monitor::enter
monitor.count.store(current.saturating_add(1), Relaxed);
// in Monitor::exit
if old_count == u32::MAX {
    return;
}

Tada!

Futex, but like for real this time

On Linux, the name of the syscall behind atomic-wait is SYS_futex. To my immense disappointment it turns out this isn’t a portmanteau of “futanari” and “latex”. Rather it stands for “fast user-space mutex”: The idea is to do what you can in user-space and only call the kernel when the lock is contended. Right now we don’t do that – we make a syscall every time we unlock the monitor, even if there’s no thread to wake up!

Well afaik, depending on the platform, the underlying API may already do this tracking for us. But for those that don’t, let’s keep track of whether there’s another thread waiting on the monitor. count can spare a bit for that:

const CONTENDED: u32 = 1;
const INCREMENT: u32 = 2;

Buckle up everypony, this is going be a bit trickier. The first step of entering a monitor hasn’t changed much:

// in Monitor::enter
// Try to aquire the lock
let current = match monitor.count.compare_exchange(0, INCREMENT, Acquire, Relaxed) {
    Ok(_) => {
        monitor.owner.store(thread_id(), Release);
        return;
    }
    Err(loaded) => loaded,
};
// Already locked, check if by the same thread
if monitor.owner.load(Acquire) == thread_id() {
    monitor.count.store(loaded.saturating_add(INCREMENT), Relaxed);
    return;
}

But now, if that fails, we set the CONTENDED flag both when we want to wait and when acquiring the lock. If setting the flag fails, the kernel immediately returns from the wait call, preventing us from deadlocking if we fail to set the flag. Before trying to acquire the lock we do a load so the core our thread is running on doesn’t have to take ownership of the cache line because of the compare_exchange if the monitor is still locked.

loop {
    // Try to set flag
    if (current & CONTENDED == 0) {
        let _ = monitor
          .count
          .compare_exchange_weak(current, current | CONTENDED, Release, Relaxed);
    }

    // Wait if flag successfully set
    atomic_wait::wait(&self.count, current | CONTENDED);

    // Try to aquire the lock
    current = self.count.load(Relaxed);
    if (current & !CONTENDED) == 0 {
        match self
            .count
            .compare_exchange(current, INCREMENT | CONTENDED, Acquire, Relaxed)
        {
            Ok(_) => {
                self.locked_by.store(thread_id(), Release);
                return;
            }
            Err(value) => {
                current = value;
            }
        }
    }
}

When we decrement the count and the thread is still holding the lock, we need to be careful not to clear the CONTENDED flag. But when unlocking the monitor we clear the CONTENDED flag, and only notify a thread when it was set – which was the entire reason we’re doing this. There may have been more than one thread waiting, which is why the woken up thread sets the flag again even if it successfully takes the lock.

// in Monitor::exit

let old_count = monitor.count.load(Acquire);
if old_count > INCREMENT | CONTENDED {
    // Lock still held by this thread
    let loaded = count(monitor).swap(current - INCREMENT, AcqRel);
    if (current & CONTENDED == 0) & (loaded & CONTENDED == CONTENDED) {
        count(monitor).store((current - INCREMENT) | CONTENDED, Relaxed);
    }
} else {
    // Unlock the monitor
    monitor.owner.store(NO_THREAD, Release);
    if monitor.count.swap(0, AcqRel) == INCREMENT | CONTENDED {
        atomic_wait::wake_one(&monitor.count);
    }
}

Let’s try out our monitor by writing a test that spawns a bunch of threads that each enter the monitor twice (testing reëntrancy), increase a shared variable by one, then exit the monitor twice; we then check whether the variable has reached the value we expect (testing mutual exclusion). Let’s run our test and… it hangs?

running 1 test
test monitor::test::monitor_contention has been running for over 60 seconds

Miri to the rescue

Usually Rust code gets compiled to native code – but not necessarily. rustc actually comes with an interpreter for its mid-level intermediate representation (Mir), aptly named Miri. Every time rustc needs to evaluate const expression, miri is at work. But it truly shines as a tool for debugging undefined behavior, or in our case deadlocks.

Invoking it is as easy as cargo miri test. And sure, it detects the deadlock:

[snip]
error: deadlock: the evaluated program deadlocked
   --> /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/linux.rs:12:9
    |
12  |         );
    |         ^ the evaluated program deadlocked
    |
    = note: BACKTRACE on thread `unnamed-3`:
    = note: inside `atomic_wait::platform::wait` at /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/linux.rs:12:9: 12:10
    = note: inside `atomic_wait::wait` at /home/vj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/atomic-wait-1.1.0/src/lib.rs:28:5: 28:34
note: inside `monitor::enter_locked`
   --> src/monitor.rs:121:17
    |
121 |                 atomic_wait::wait(&monitor.count, loaded | CONTENDED);
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[snip]

If we run Miri again, we’ll get the exact same error again, because Miri is deterministic (unless we disable its sandbox, which we don’t need to do here). If we want different executions, we can run it with --many-seeds; if we want to take a closer look at one of them we can instead specify it with MIRIFLAGS="-Zmiri-seed=<seed>". The seed affects e.g. when different threads get preëmpted or what value (from the set of possibly observable values) an atomic load reads. Because yeah, Miri can emulate weak memory too!

The error message tells us that a thread is stuck at the atomic_wait::wait call. No surprise there. To find out why, let’s sprinkle in a few logging statements to see what’s happening. We can’t use println!(), as that locks stdout, which synchronizes the threads and makes the race condition vanish, but luckily Miri is nice enough to provide a function we can link to:

#[cfg(miri)]
extern "Rust" {
    fn miri_write_to_stdout(bytes: &[u8]);
}

macro_rules! miri_println {
    ($fmt:literal $(, $exprs:expr)*) => {
        #[cfg(miri)]
        unsafe {
            let message = format!($fmt, $($exprs),*) + "\n";
            miri_write_to_stdout(message.as_bytes());
        }
    };
}

Add in some prints, and behold:

[snip]
Thread 2536232: enter called, count == 2
Thread 2483964: enter called, count == 2
Thread 2483964: lock owned by different thread, set contended flag (count = 3)
Thread 2483964: try to sleep on count == 3
Thread 2536232: lock owned by same thread, set count to 4
[snip]

So what happened here? Well the first thread already owns the lock, reads the current count and increments it. Before it can write it back, the other thread fails to acquire the lock, sets the CONTENDED flag, and goes to sleep. When the first thread finally issues the store, it overwrites the flag:

monitor.count.store(loaded.saturating_add(INCREMENT), Relaxed);

The fix is to set the flag again if that happens:

let new_count = loaded.saturating_add(INCREMENT);
if monitor.count.swap(new_count, Relaxed) & CONTENDED != 0 {
    monitor.count.store(new_count | CONTENDED, Relaxed);
}

Whew!

I heard spinning is a good trick

Immediately giving up and parking the thread when we can’t acquire the lock is a bit pessimistic. A trick many locks use is to keep checking the lock in a hot loop for a little bit in case it’s about to get unlocked. There’s even a kind of mutex, the spin lock, that works entirely on this basis. Whether this makes sense depends on how long you expect the lock to be held for, as well as how you value latency and throughput as opposed to efficiency.

Unfortunately, the toy JVM this is going into is purely an interpreter, and thus not particularly fast, so critical sections written in Java are going to take a lot longer than in Rust (though whether it’s worth it can only be really decided with realistic benchmarks, of course). I’ll revisit this point when I’ve got good benchmarks, but writing concurrency primitives is already hard enough; benchmarking them across platforms is not quite my idea of a fun time.

Honey, I’ve shrunk the monitor

Right now, on a 64 bit system our monitor looks like this:

┌───────────────┬───────┬───────┐
│     owner     │ count │       │
└───────────────┴───────┴───────┘
 0             7 8     B       F

The order of fields can differ, but in any case we’re wasting 4 bytes on padding because the 8 byte atomic will need to be 8 byte aligned. Wait a moment, we’re going to embed this in Java heap object whose layout me have to manually (and dynamically) manage anyways… let’s just ignore padding! This means we’re not going to use references, since they have to point at a valid structure. Instead we can use pointers:

fn enter(monitor: *const Monitor) {

But let’s hold off on that optimization for a moment. Can we reduce the space the thread id takes up? Instead of taking the address of a thread-local, let’s just store our id in it:

thread_local!(static THREAD_ID: u32 = const { Cell::new(NO_THREAD) });

Every time we create a thread, we’ll allocate a thread id:

static PREVIOUS_THREAD_ID: Mutex<u32> = Mutex::new(NO_THREAD);

fn init_thread_id() {
    let alloc = PREVIOUS_THREAD_ID.lock().unwrap();
    let id = alloc.checked_add(1).expect("Too many threads");
    THREAD_ID.set(id);
    *alloc = id;
}

That works, but should a program keep spawning (and killing) enough threads long enough, we would eventually run out of ids, so let’s reclaim them when a thread gets killed:

struct ThreadIdAlloc {
    previous: u32,
    freelist: Vec<u32>
}

static THREAD_ID_ALLOC: Mutex<ThreadIdAlloc> = Mutex::new(ThreadIdAlloc { 
    previous: NO_THREAD,
    freelist: Vec::new()
});

thread_local!(static THREAD_ID: SmallThreadId = const { ThreadId(Cell::new(NO_THREAD)) });

struct SmallThreadId(Cell<u32>);

impl Drop for SmallThreadId {
    fn drop(&mut self) {
        let id = self.0.get();
        if id != NO_THREAD {
            THREAD_ID_ALLOC.lock().unwrap().freelist.push(id);
        }
    }
}

fn thread_id() -> u32 {
    THREAD_ID.with(|id| id.0.get())
}

fn init_thread_id() {
    let mut alloc = THREAD_ID_ALLOC.lock().unwrap();
    let id = alloc.freelist.pop().unwrap_or_else(|| {
        let id = alloc.previous.checked_add(1).expect("Too many threads");
        alloc.previous = id;
        id
    });
    THREAD_ID.with(|key| key.0.set(id))
}

According to the docs, destructors of thread-locals aren’t guaranteed to run, but we don’t care about the cases they don’t (and a few ids leaking isn’t a problem).

┌───────┬───────┐
│ owner │ count │
└───────┴───────┘
 0     3 4     7

A further step could be reducing the size of count depending on the platform. While we’re stuck with 32 bit on Linux, we can specify the size used on other operating system, e.g. with the AddressSize parameter for WaitOnAddress on Windows. That comes with the obvious downside of reducing the maximum count, so let’s table (in American parlance) it for now. We’ve shrunk our monitor down to 8 bytes with the option to shrink it further by compromising on threat count or recursion limits.

Wake me up inside

Besides entering and exiting the monitor, we need to support letting a thread wait on an object until it’s notified. To do that, we add another field:

pub(crate) struct Monitor {
    owner: AtomicU32,
    count: AtomicU32,
    condvar: AtomicU32
}

I’ll call it condvar here because this feature is primarily used as a condition variable: A queue of threads waiting for the data in the object this monitor is attached to to conform to a condition. Java defines three operations, all of which require the caller to hold the lock:

Our lock is reëntrant, so we can’t just release it by calling exit, as we may have entered the monitor multiple times, but releasing the lock is simple. We don’t even need to clear the owner field.

pub fn wait(monitor: &Monitor) {
    let condvar_state = monitor.condvar.load(Relaxed);
    let count = monitor.count.swap(0, AcqRel);
    if (count & CONTENDED) == CONTENDED {
        atomic_wait::wake_one(&monitor.count);
    }

Then we wait. What if another thread has taken the lock and send a notification before we can wait? Because of that case we change the value of the condvar when we do a notification, which will cause the wait to fail:

    atomic_wait::wait(&monitor.condvar, condvar_state);

Now we can require the lock and restore the lock count. Careful to not accidentally clear the CONTENDED flag!

    enter(monitor);
    if monitor.count.swap(count, Relaxed) & CONTENDED != 0 {
        monitor.count.store(count | CONTENDED, Relaxed);
    }
}

In order to notify a thread (or all waiting threads in notify_all) we change the value stored in the condvar, then wake the thread(s).

fn notify_one(monitor: &Monitor) {
    monitor.condvar.fetch_add(1, Relaxed);
    atomic_wait::wake_one(&monitor.condvar);
}

Do we really need to use an extra field? We already have a field that’s currently not getting waited on: owner. Good thing we reduced it to 32 bit, as mixed-size atomic accesses to the same address are undefined behavior in Rust (and even if they weren’t, CPUs aren’t necessarily happy about them).

We don’t need to change much, just don’t do any mutations in notify_one, wait on owner and skip checking which thread owns the lock when trying to reäcquiring it after the wait. This works because to notify the object a thread will have to lock it first, which overwrites the owner and thereby prevent the wait. It does however mean that even when not sending a notification, this causes a spurious wakeup, which is… not great. However, it’s semantically correct, so I’ll take it.

A possible fix could be to reserve a bit in the thread id and set it when sending a notification. If the wait fails and the bit isn’t set we can try to sleep again; if it is, we clear it and continue. That also means we have to be careful not to accidentally clear the bit when acquiring a lock.

Herds a-thundering

When we notify_all a monitor, all threads waiting on it will wake up… and try to acquire the same lock, even though at most one of them can succeed. On Linux, we can choose to wake up a single thread and move the rest over to waiting on count instead of on locked_by. atomic-wait, focussing a minimal, portable api doesn’t cover this, so we’ll do the syscall ourselves:

unsafe {
    libc::syscall(
        libc::SYS_futex,
        // uaddr: Wait queue to wake threads up from
        &self.locked_by,
        // futex_op: Operation to perform;
        // futex is not shared between processes
        libc::FUTEX_REQUEUE | libc::FUTEX_PRIVATE_FLAG,
        // val: Number of threads to wake
        1,
        // val2: Max number of threads to requeue
        i32::MAX,
        // uaddr2: Wait queue to move threads to
        &self.count,
    );
};

As the woken-up thread will set the CONTENDED flag, threads will get woken up on lock release till the wait queue is empty.

Need for Speed: Also Desired

Let’s do a quick sanity check regarding performance. Our lock is probably going to lag behind a more refined implementation by someone who actually knows what they’re doing, but hopefully it won’t be too painful. Remember not to put too much stock in these numbers! This is only going to be a very limited microbenchmark, run on a single OS (Linux) on a single architecture (amd64), and we’re only taking a look at total execution time, not at the cpu time – when two implementations run in the same amount of time, the version that spent the least energy and cpu share is obviously preferable.

Here are the results:

Our monitor             time:   [6.1984 ms 6.3496 ms 6.4977 ms]

Other implementation    time:   [5.6999 ms 5.8481 ms 5.9992 ms]
                        change: [-10.958% -7.8976% -4.9026%] (p = 0.00 < 0.05)
                        Performance has improved.

Just kidding, I lied. Both versions are our implementation, I just forgot to turn off cpu frequency scaling. Without it, the results are very consistent.

sudo cpupower frequency-set --governor performance

We’ll be running the following function on the same object with iterations split among a different number of threads:

class Monitor {
    void stress(int iterations) {
        for(int i = 0; i<iterations; i++) {
            synchronized (this) {
                for(int delay=0; delay<10; delay++) {}
            }
        }
    }
}

I’ll use Criterion to run the benches as it takes care of warming up our benchmark, gathering statistics, and comparing them.

cores: 6 physical, 12 logical

[parking_lot]
with_1_thread       time:   [17.250 ms 17.296 ms 17.349 ms]
with_4_threads      time:   [27.576 ms 27.619 ms 27.660 ms]
with_32_threads     time:   [141.38 ms 142.43 ms 143.39 ms]

[our version]
with_1_thread       time:   [16.805 ms 16.879 ms 16.963 ms]
                    change: [-2.8942% -2.4093% -1.8563%] (p = 0.00 < 0.05)
                    Performance has improved.

with_4_threads      time:   [31.395 ms 31.535 ms 31.670 ms]
                    change: [+13.628% +14.181% +14.770%] (p = 0.00 < 0.05)
                    Performance has regressed.

with_32_threads     time:   [38.617 ms 38.697 ms 38.788 ms]
                    change: [-73.022% -72.830% -72.619%] (p = 0.00 < 0.05)
                    Performance has improved.

Huh, we’re actually mostly keeping up. Interestingly, with a lot of threads we come out significantly on top, but if you’ve got that many threads vying for the same lock in a hot loop, there’s something seriously wrong with your architecture.

Circling back

In the beginning, I mentioned that ideally you’d only allocate the lock under contention. That’s exactly what parking_lot’s core idea is: Its RawMutex consists of a single byte storing whether the mutex is locked and whether it’s contended. If a thread needs to wait, a structure containing the lock whose address is passed to SYS_futex/WaitOnAddress/etc as well as any necessary metadata gets allocated in the parking lot, a global HashMap keyed by the RawMutex’s address. To support reëntrancy and Object::wait without making the per-object overhead bigger, we could put the additional metadata there. I’d expect taking the lock reëntrantly might have a small performance penality, but that doesn’t matter much.

Still, for now I’m happy with what we’ve got. Although there are some caveats regarding spurious wakeups, we’ve focused mostly on Linux, and it isn’t as small as it could be, we’ve managed to reduce the size of the monitor down to a quarter of the (ReentrantMutex<()>, Condvar) version. Not half bad, given that this was my first time implementing a concurrency primitive or messing with Miri. Here’s the full code if you want to check it out. Toodles!