Синхронизация производителя / потребителя с барьером

У меня есть ветка продюсера, которая производит работу для трех потребительских веток. Когда работа выполнена, поток производителя ожидает, пока потоки потребителя не закончат обработку работы. Затем поток продюсера продолжает обрабатывать результаты.

#include <condition_variable>
#include <mutex>
#include <boost/thread/barrier.hpp>
#include <vector>
#include <queue>std::condition_variable cond;
std::mutex mutex;
boost::barrier barrier(4);

std::vector<std::thread> workers;
std::queue<unsigned int> work;
std::queue<unsigned int> results;

void worker();

int main()
{
// 1 producer and 3 consumers
for(unsigned int i = 0; i < 3; i++)
workers.push_back(std::thread(worker));

// Wait here so the three workers can get to cond.wait();
barrier.wait();

std::unique_lock<std::mutex> lock(mutex);
while(true)
{
// Generate work
std::cout << "gen" << std::endl;
for(unsigned int i = 0; i < 10; i++)
work.push(i);

cond.notify_all();

lock.unlock();
barrier.wait();

// Handle the results
while(results.size() > 0)
results.pop();

lock.lock();
}

return 0;
}

void worker()
{
while(true)
{
std::unique_lock<std::mutex> lock(mutex);
while(results.size() == 0)
{
lock.unlock();
barrier.wait();
lock.lock();
cond.wait(lock);
}

// Get work
unsigned int next = work.front();
work.pop();

// Store the result
results.push(next);

lock.unlock();}
}

Проблема в том, что мне нужно убедиться, что все потребительские потоки вошли cond.wait(lock) перед тем, как поток производителя начинает свою следующую итерацию:

  1. Все 4 темы достигли барьера. Барьер освобождается и нити могут продолжаться.
  2. Поток производителя блокирует мьютекс до того, как все потребительские потоки достигнут cond.wait(lock), Таким образом, по крайней мере один потребительский поток заблокирован lock.lock(),
  3. Поток производителя начинает свою следующую итерацию, создает работу и уведомляет потребителей. Так как по крайней мере один потребительский поток еще не достиг cond.wait(lock) notify_all() будет пропущен хотя бы одним потребительским потоком. Эти темы теперь ждут следующего notify_all() — который никогда не прибудет.
  4. В следующий раз, когда барьер достигнут, по крайней мере, один потребительский поток все еще ожидает следующего notify_all(), Таким образом, барьер не будет разблокирован и возникнет тупик.

Как я могу разрешить эту ситуацию?

2

Решение

Переменная condition_variable должна использоваться вместе с флагом, чтобы помочь предотвратить ложные пробуждения. Этот же флаг можно также использовать для проверки того, должен ли поток вообще ждать или просто приступить к работе.

Добавить bool go_to_work=false;затем мы просто добавляем его в качестве предиката в вызове wait и убедитесь, что мы установили / удалили его из основного потока.

В главном потоке перед вызовом notify_all мы устанавливаем наш bool

go_to_work=true;
cond.notify_all();

В нашем рабочем потоке мы добавляем предикат к нашему wait вызов

cond.wait(lock, [](){ return go_to_work; });

Наконец, в нашем главном потоке мы хотим установить флаг обратно в false после того, как вся работа была выполнена.

barrier.wait();
lock.lock();  // We need to lock the mutex before modifying the bool
go_to_work=false;
lock.unlock();

//Handle result...

Теперь, если поток достигает wait вызов после того, как основной поток установил go_to_work=true это не будет ждать вообще и просто идти вперед и делать работу. В качестве бонуса это также защищает от ложных пробуждений.

1

Другие решения

Других решений пока нет …