Move In, Move Out
In many application domains, the Producer-Queue-Consumer pattern is used to transport data from input to output within a multi-threaded program:
The ProducerThread creates Messages in accordance with the application’s requirements and pushes pointers to them into a lock-protected queue. The ConsumerThread, running asynchronous to the ProducerThread, pops sequences of Message pointers from the queue and processes them accordingly. The ConsumerThread may be notified by the ProducerThread when one or more Messages are available for processing or it can periodically poll the queue.
Instead of passing Message pointers, Message copies can be passed between the threads. However, copying the content can be expensive for large messages.
When using pointers to pass messages between threads, the memory to hold the data content must come from somewhere. One way to provide this memory is to use a Message buffer pool allocated on startup.
Another, simpler way that avoids the complexity of managing a Message buffer pool, is to manually “new” up the memory in the ProducerThread and then manually “delete” memory in the ConsumerThread.
Since the introduction of smart pointers in C++11, a third way of communicating messages between threads is to “move” std::unique_ptrs into and out of the InterThreadQueue:
The advantage of using smart pointers is that no “deletes” need to be manually written in the ConsumerThread code.
The following code shows the implementation and usage of a simple InterThreadQueue that moves std::unique_ptrs into and out of a lock protected std::deque.
#include "catch.hpp" #include <memory> #include <deque> #include <mutex> #include <vector> #include <stdexcept> template<typename Msg> class InterThreadQueue { public: InterThreadQueue(int32_t capacity) : _capacity(capacity) {} void push(std::unique_ptr<Msg> msg) { std::lock_guard<std::mutex> lg(_mtx); if(_queue.size() not_eq _capacity) { _queue.push_back(std::move(msg)); } else { throw std::runtime_error{"Capacity Exceeded"}; } } std::vector<std::unique_ptr<Msg>> pop() { std::vector<std::unique_ptr<Msg>> msgs{}; std::lock_guard<std::mutex> lg(_mtx); while(not _queue.empty()) { msgs.emplace_back(std::move(_queue.front())); _queue.pop_front(); } return msgs; //Move the vector to the caller } private: mutable std::mutex _mtx{}; const std::size_t _capacity; std::deque<std::unique_ptr<Msg>> _queue; }; TEST_CASE( "InterThreadQueue" ) { //Create our object under test InterThreadQueue<int32_t> itq{2}; //Note: my compiler version doesn't have std::make_unique<T>() std::unique_ptr<int32_t> dataIn{new int32_t{5}}; itq.push(std::move(dataIn)); dataIn = std::unique_ptr<int32_t>{new int32_t{10}}; itq.push(std::move(dataIn)); dataIn = std::unique_ptr<int32_t>{new int32_t{15}}; //Queue capacity is only 2 REQUIRE_THROWS(itq.push(std::move(dataIn))); auto dataOut = itq.pop(); REQUIRE(2 == dataOut.size()); REQUIRE(5 == *dataOut[0]); REQUIRE(10 == *dataOut[1]); REQUIRE(0 == itq.pop().size()); }
Why do you use a deque for _queue? If you used a std::vector, you could simplify your ::pop() method to basically
{
std::lock_guard lck(_mutex);
std::vector<…> ret;
std::swap(ret, _queue);
return ret;
}
Simply because I didn’t think of using a vector. You’re swap idea is a terrific suggestion. Thanks for the optimization!