Thread-Safe Queue – Two Serious Errors
In my last post “Monitor Object” I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.
First, I want to show you again the erroneous implementation from my last post to understand the context.
// monitorObject.cpp #include <condition_variable> #include <functional> #include <queue> #include <iostream> #include <mutex> #include <random> #include <thread> class Monitor { public: void lock() const { monitMutex.lock(); } void unlock() const { monitMutex.unlock(); } void notify_one() const noexcept { monitCond.notify_one(); } template <typename Predicate> void wait(Predicate pred) const { // (10) std::unique_lock<std::mutex> monitLock(monitMutex); monitCond.wait(monitLock, pred); } private: mutable std::mutex monitMutex; mutable std::condition_variable monitCond; }; template <typename T> // (1) class ThreadSafeQueue: public Monitor { public: void add(T val){ lock(); myQueue.push(val); // (6) unlock(); notify_one(); } T get(){ wait( [this] { return ! myQueue.empty(); } ); // (2) lock(); auto val = myQueue.front(); // (4) myQueue.pop(); // (5) unlock(); return val; } private: std::queue<T> myQueue; // (3) }; class Dice { public: int operator()(){ return rand(); } private: std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), std::default_random_engine()); }; int main(){ std::cout << '\n'; constexpr auto NumberThreads = 100; ThreadSafeQueue<int> safeQueue; // (7) auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8) std::cout << val << " " << std::this_thread::get_id() << "; "; }; auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9) std::vector<std::thread> addThreads(NumberThreads); Dice dice; for (auto& thr: addThreads) thr = std::thread(addLambda, dice()); std::vector<std::thread> getThreads(NumberThreads); for (auto& thr: getThreads) thr = std::thread(getLambda); for (auto& thr: addThreads) thr.join(); for (auto& thr: getThreads) thr.join(); std::cout << "\n\n"; }
The key idea of the example is that the Monitor Object is encapsulated in a class and can, therefore, be reused. The class Monitor
uses a std::mutex
as monitor lock and a std::condition_variable
as monitor condition. The class Monitor
provides the minimal interface that a Monitor Object should support.
ThreadSafeQueue
in line (1) extends std::queue
in line (3) with a thread-safe interface. ThreadSafeQueue
derives from the class Monitor
and uses its member functions to support the synchronized member functions add
and get. The member functions add
and get
use the monitor lock to protect the Monitor Object, particularly the non-thread-safe myQueue
. add
notifies the waiting thread when a new item was added to myQueue
. This notification is thread-safe. The member function get
(line (3)) deserves more attention. First, the wait
member function of the underlying condition variable is called. This wait
call needs an additional predicate to protect against spurious and lost wakeups (C++ Core Guidelines: Be Aware of the Traps of Condition Variables). The operations modifying myQueue
(lines 4 and 5) must also be protected because they can interleave with the call myQueue.push(val)
(line 6). The Monitor Object safeQueue
line (7) uses the lambda functions in lines (8) and (9) to add or remove a number from the synchronized safeQueue
. ThreadSafeQueue
itself is a class template and can hold values from an arbitrary type. One hundred clients add 100 random numbers between 1 – 6 to safeQueue
(line 7), while hundred clients remove these 100 numbers concurrently from the safeQueue
. The output of the program shows the numbers and the thread ids.
struct Monitor { using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); } template <typename Predicate> void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); } // … }; template <typename T> T ThreadSafeQueue<T>::get() { auto kerberos = receiveGuard(); wait(kerberos, [this]{ return not myQueue.empty(); }); T rc = std::move(myQueue.front()); myqueue.pop(); return rc; }
This version corrects the exception problem for get()
. For add()
you can simply use the monitor object with a lock_guard
:
template <typename T> void add(T val) { { std::lock_guard<Monitor> kerberos(*this); myqueue.push(std::move(val)); } notify_one(); }
I would probably wrap the notification in a “SendGuard
” that contains a lock_guard
and a reference to the condition_variable
and sends the notification upon destruction:
class SendGuard { friend class Monitor; using deleter = decltype([](auto& cond){ cond->notify_one(); }); std::unique_ptr<std::condition_variable, deleter> notifier; std::lock_guard<std::mutex> kerberos; SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {} };
The move constructor and destructor should still be public and represent the whole interface! This would also make it much easier to use in add()
:
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
template <typename T> void add(T val) { auto kerberos = sendGuard(); myqueue.push(val); }
Finally, here is the full implementation of Dietmar. The numbers correspond to the numbers in my monitorObjec.cpp example.
// threadsafequeue1.cpp #include <condition_variable> #include <functional> #include <queue> #include <iostream> #include <mutex> #include <random> #include <thread> class Monitor { public: using Lock = std::unique_lock<std::mutex>; [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); } template <typename Predicate> void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); } class SendGuard { friend class Monitor; using deleter = decltype([](auto* cond){ cond->notify_one(); }); std::unique_ptr<std::condition_variable, deleter> notifier; std::lock_guard<std::mutex> kerberos; SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {} }; SendGuard sendGuard() { return {monitMutex, monitCond}; } private: mutable std::mutex monitMutex; mutable std::condition_variable monitCond; }; template <typename T> // (1) class ThreadSafeQueue: public Monitor { public: void add(T val){ auto kerberos = sendGuard(); myQueue.push(val); // (6) } T get(){ auto kerberos = receiveGuard(); wait(kerberos, [this] { return ! myQueue.empty(); } ); // (2) auto val = myQueue.front(); // (4) myQueue.pop(); // (5) return val; } private: std::queue<T> myQueue; // (3) }; class Dice { public: int operator()(){ return rand(); } private: std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), std::default_random_engine()); }; int main(){ std::cout << '\n'; constexpr auto NumberThreads = 100; ThreadSafeQueue<int> safeQueue; // (7) auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8) std::cout << val << " " << std::this_thread::get_id() << "; "; }; auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9) std::vector<std::thread> addThreads(NumberThreads); Dice dice; for (auto& thr: addThreads) thr = std::thread(addLambda, dice()); std::vector<std::thread> getThreads(NumberThreads); for (auto& thr: getThreads) thr = std::thread(getLambda); for (auto& thr: addThreads) thr.join(); for (auto& thr: getThreads) thr.join(); std::cout << "\n\n"; }
As a result of the discussion above, Frank has proposed the following version below, which has a consistent and easy-to-use interface for Monitor.
// threadSafeQueue.cpp #ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP #define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP #include <atomic> #include <algorithm> #include <condition_variable> #include <deque> #include <iterator> #include <mutex> #include <stdexcept> #include <thread> #include <vector> class Monitor { public: struct UnlockAndNotify { std::mutex d_mutex; std::condition_variable d_condition; void lock() { d_mutex.lock(); } void unlock() { d_mutex.unlock(); d_condition.notify_one(); } }; private: UnlockAndNotify d_combined; public: std::unique_lock<UnlockAndNotify> makeLockWithNotify() { return std::unique_lock{d_combined}; } template <typename PRED> std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) { std::unique_lock lock{d_combined.d_mutex}; d_combined.d_condition.wait(lock, waitForCondition); return lock; } }; class ThreadQueue { Monitor d_monitor; std::deque<int> d_numberQueue; auto makeLockWhenNotEmpty() { return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); }); } public: void addNumber(int number) { const auto lock = d_monitor.makeLockWithNotify(); d_numberQueue.push_back(number); } int removeNumber() { const auto lock = makeLockWhenNotEmpty(); const auto number = d_numberQueue.front(); d_numberQueue.pop_front(); return number; } }; #endif int main() { ThreadQueue queue; std::atomic<int> sharedSum{}; std::atomic<int> sharedCounter{}; std::vector<std::jthread> threads; threads.reserve(200); std::generate_n(std::back_inserter(threads), 100, [&] { return std::jthread{[&] { sharedSum += queue.removeNumber(); }}; }); std::generate_n(std::back_inserter(threads), 100, [&] { return std::jthread{[&] { queue.addNumber(++sharedCounter); }}; }); threads.clear(); // wait for all threads to finish if (sharedSum.load() != 5050) { throw std::logic_error("Wrong result for sum of 1..100"); } }
The implementation of the monitor pattern here is based on the flexibility of std::unique_lock
through its template parameter. All of the std RAII lock guards can be used with any class that has lock()
and unlock()
methods. The UnlockAndNotify
class implements this interface and notifies its condition variable from within the unlock()
method. On top of that, the Monitor
class provides a reduced public interface to create two different kinds of locks, one with notification and one without, by creating a std::unique_lock
on either the whole UnlockAndNotify
instance or just the contained std::mutex
.
On the choice of std::unique_lock
versus std::lock_guard
I (Frank) prefer the unique_lock
in the interface. This choice allows the user of the Monitor
class more flexibility. I value this flexibility higher than a possible performance difference to lock_guard
which anyway needs to be measured first. I acknowledge that the given examples don’t make use of this extra flexibility.
Afterward, Dietmar further developed Frank’s idea: here, the protected data is kept in the Monitor, making it harder to access it unprotected.
// threadsafequeue2.cpp #ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP #define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP #include <algorithm> #include <atomic> #include <condition_variable> #include <deque> #include <functional> #include <iostream> #include <iterator> #include <mutex> #include <random> #include <stdexcept> #include <thread> #include <tuple> #include <vector> namespace patterns::monitor3 { template <typename T> class Monitor { public: struct UnlockAndNotify { std::mutex d_mutex; std::condition_variable d_condition; void lock() { d_mutex.lock(); } void unlock() { d_mutex.unlock(); d_condition.notify_one(); } }; private: mutable UnlockAndNotify d_combined; mutable T d_data; public: std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const { return { d_data, std::unique_lock{d_combined} }; } template <typename PRED> std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const { std::unique_lock lock{d_combined.d_mutex}; d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); }); return { d_data, std::move(lock) }; } }; template <typename T> class ThreadQueue { Monitor<std::deque<T>> d_monitor; public: void add(T number) { auto[numberQueue, lock] = d_monitor.makeProducerLock(); numberQueue.push_back(number); } T remove() { auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); }); const auto number = numberQueue.front(); numberQueue.pop_front(); return number; } }; } #endif class Dice { public: int operator()(){ return rand(); } private: std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), std::default_random_engine()); }; int main(){ std::cout << '\n'; constexpr auto NumberThreads = 100; patterns::monitor3::ThreadQueue<int> safeQueue; auto addLambda = [&safeQueue](int val){ safeQueue.add(val); std::cout << val << " " << std::this_thread::get_id() << "; "; }; auto getLambda = [&safeQueue]{ safeQueue.remove(); }; std::vector<std::thread> addThreads(NumberThreads); Dice dice; for (auto& thr: addThreads) thr = std::thread(addLambda, dice()); std::vector<std::thread> getThreads(NumberThreads); for (auto& thr: getThreads) thr = std::thread(getLambda); for (auto& thr: addThreads) thr.join(); for (auto& thr: getThreads) thr.join(); std::cout << "\n\n"; }
Once more, thanks a lot to Frank and Dietmar. I didn’t want to prove, with my erroneous implementation of a thread-safe queue in my previous post, that concurrency is hard to get right. I’m particularly annoyed that I don’t put the mutex inside a lock (error 2). I teach this in my C++ classes: NNM (No Naked Mutex).
What’s next?
In my next post, I dive into the future of C++: C++23.
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.
My special thanks to Embarcadero | |
My special thanks to PVS-Studio | |
My special thanks to Tipi.build | |
My special thanks to Take Up Code | |
My special thanks to SHAVEDYAKS |
Seminars
I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.
Standard Seminars (English/German)
Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.
- C++ – The Core Language
- C++ – The Standard Library
- C++ – Compact
- C++11 and C++14
- Concurrency with Modern C++
- Design Pattern and Architectural Pattern with C++
- Embedded Programming with Modern C++
- Generic Programming (Templates) with C++
- Clean Code with Modern C++
- C++20
Online Seminars (German)
- Embedded
Programmierung mit modernem C++ (24. Sep. 2024 bis 26.
Sep. 2024)
Contact Me
- Mobil: +49 176 5506 5086
- Mail: schulung@ModernesCpp.de
- German Seminar Page: www.ModernesCpp.de
- Mentoring Page: www.ModernesCpp.org
Modernes C++ Mentoring,
I think there is a design issue in the example in addition to the concurrency issues. Monitor is “encapsulated in a class and can, therefore, be reused”, but this is done using public inheritance. Public inheritance models an is-a-relationship, but here I think a uses-relationship is more appropriate. With public inheritance, I can write code like “q.lock(); q.add(42);” and this would compile. So I would propose to use private inheritance if you want to stick to inheritance, or preferably a Monitor member object as in the last examples.
Jens, you are right. The reason that I have a public inheritance in my example is historical. I started with CRTP. Afterward, I removed CRTP and ended with public inheritance (is-a relation.) As you also mentioned, I generally prefer composition (use-a relation).
“The fix is that Monitor::wait() can only be called if a unique_lock is held. ”
I feel that the wording here is a bit too relaxed as specific lifetime requirements are critical (but not clear from “a unique_lock”) although the example following the text clarified it.
It is not any/just “a unique_lock” is what is necessary to be held but one that outlives the Monitor::wait().
As shown in the example this must come from the user context (ThreadSafeQueue::get()) which fulfills the requirement that once the conditions declared by the predicate are held the lock on mutex is also held.
In my opinion the lifetime of objects must be stressed.
This is a fine point to reason about and one that creates issues when reasoning about locking.
It’s critical when the code is not executed in the same function where the condition variable wait() is called, like in this case where the design strives to achieve this separation.
This is not always obvious to someone being used with the simpler use case where lifetime is not an issue as the condition variable and the code making use of the conditions declared by the predicate are living the same lifetime (e.g. same function/method).