Monday, May 8, 2017

Thread Collisions detector - Fake Mutex

Some years ago I found myself wondering if a not supposed thread safe class was being used by multiple threads without being synchronized. In that occasion I wrote about it here Threading mess! and here Threading mess (2)!.
At that time (9 years ago) we had no threads neither atomic in the c++ standard and the solution proposed was based on pthreads and on gcc atomic builtins. I think it's time to refresh the implementation using some C++11 features.
The idea is very simple, upon entering a critical section (part of code that should not be executed concurrently) we should save the current thread id resetting the stored value as soon the thread leaves the critical section. If a thread tries to enter a critical section but we already have a thread id saved then we have detected the collision.
The technique is very effective and at that time I wrote for the Chromium project the class ThreadCollisionWarner thread_collision_warner.h and thread_collision_warner.cc using the described technique.
Basically what you need to do is to add to your classes a "FakeMutex" and then "locking" it where it's needed as you would do with a real mutex. It's called Fake Mutex because it will not suspend a thread if another one is active but it will assert(false) instead. If you want to use this technique in your project I suggest to use the implementation done in Chromium.

Examples of uses:

// Example: Queue implementation non thread-safe but still usable if clients
// are synchronized somehow.
//
// In this case the macro DFAKE_SCOPED_LOCK has to be
// used, it checks that if a thread is inside the push/pop then
// noone else is still inside the pop/push
class NonThreadSafeQueue {
public:
...
void push(int) { DFAKE_SCOPED_LOCK(push_pop_); ... }
int pop() { DFAKE_SCOPED_LOCK(push_pop_); ... }
...
private:
DFAKE_MUTEX(push_pop_);
};
// Example: Queue implementation non thread-safe but still usable if clients
// are synchronized somehow, it calls a method to "protect" from
// a "protected" method
//
// In this case the macro DFAKE_SCOPED_RECURSIVE_LOCK
// has to be used, it checks that if a thread is inside the push/pop
// then noone else is still inside the pop/push
class NonThreadSafeQueue {
public:
void push(int) {
DFAKE_SCOPED_LOCK(push_pop_);
...
}
int pop() {
DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_);
bar();
...
}
void bar() { DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); ... }
...
private:
DFAKE_MUTEX(push_pop_);
};
// Example: Queue implementation not usable even if clients are synchronized,
// so only one thread in the class life cycle can use the two members
// push/pop.
//
// In this case the macro DFAKE_SCOPED_LOCK_THREAD_LOCKED pins the
// specified
// critical section the first time a thread enters push or pop, from
// that time on only that thread is allowed to execute push or pop.
class NonThreadSafeQueue {
public:
...
void push(int) { DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); ... }
int pop() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); ... }
...
private:
DFAKE_MUTEX(push_pop_);
};
// Example: Class that has to be contructed/destroyed on same thread, it has
// a "shareable" method (with external synchronization) and a not
// shareable method (even with external synchronization).
//
// In this case 3 Critical sections have to be defined
class ExoticClass {
public:
ExoticClass() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... }
~ExoticClass() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... }
void Shareable() { DFAKE_SCOPED_LOCK(shareable_section_); ... }
void NotShareable() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... }
...
private:
DFAKE_MUTEX(ctor_dtor_);
DFAKE_MUTEX(shareable_section_);
};
view raw fakemutex hosted with ❤ by GitHub
the macros DFAKE_MUTEX, DFAKE_SCOPED_LOCK, DFAKE_SCOPED_RECURSIVE_LOCK and DFAKE_SCOPED_LOCK_THREAD_LOCKED are defined only if compiled in DEBUG mode removing from your production code the atomic overhead.

The modern simplified version of Chromium ThreadCollisionWarner proposed in Threading mess (2)! is reported here.

#pragma once
#include <atomic>
#include <cassert>
#include <stdexcept>
#include <thread>
#ifdef NDEBUG
#define THREAD_WATCH(obj)
#define SCOPED_WATCH(obj)
#define WATCH(obj)
#else
#define THREAD_WATCH(obj) ThreadCollisionWarning _##obj;
#define SCOPED_WATCH(obj) ThreadCollisionWarning::ScopedWatch sw_##obj(_##obj);
#define WATCH(obj) ThreadCollisionWarning::Watch w_##obj(_##obj);
#endif
class ThreadCollisionWarning
{
public:
ThreadCollisionWarning()
: theActiveThread() {
assert(theActiveThread.is_lock_free());
}
~ThreadCollisionWarning() {
}
class Watch
{
public:
Watch(ThreadCollisionWarning& aTCW)
: theWarner(aTCW) {
theWarner.enter_self();
}
~Watch() {
}
private:
ThreadCollisionWarning& theWarner;
};
class ScopedWatch
{
public:
ScopedWatch(ThreadCollisionWarning& aTCW)
: theWarner(aTCW) {
theWarner.enter();
}
~ScopedWatch() {
theWarner.leave();
}
private:
ThreadCollisionWarning& theWarner;
};
private:
void enter_self() {
auto myExpectedId = std::thread::id();
if (!theActiveThread.compare_exchange_strong(myExpectedId,
std::this_thread::get_id())) {
// Last chance! may be is the thread itself calling within a critical
// section another critical section
if (theActiveThread.load() != std::thread::id()) {
throw std::runtime_error("Thread Collision");
}
}
}
void enter() {
auto myExpectedId = std::thread::id();
if (!theActiveThread.compare_exchange_strong(myExpectedId,
std::this_thread::get_id())) {
// gotcha! another thread is trying to use the same class
throw std::runtime_error("Thread Collision");
}
}
void leave() {
theActiveThread.store(std::thread::id());
}
std::atomic<std::thread::id> theActiveThread;
};