Реализация condition_variable для решения многопоточного режима ожидания

Моя программа печатает несколько строк текста на консоли с помощью свободных рабочих потоков. Проблема, однако, заключается в том, что рабочие не ждут, пока предыдущие рабочие закончат работу, прежде чем печатать текст, в результате чего текст будет вставлен в текст другого рабочего потока, как показано на рисунке ниже:

введите описание изображения здесь

Мне нужно исправить эту проблему, известную как проблема занятого ожидания, с помощью std :: condition_variable. Я попытался реализовать условную переменную в приведенном ниже коде, основываясь на пример найден по этой ссылке, а также следующий вопрос stackoverflow помог мне, но не достаточно, из-за моих ограниченных знаний C ++ в целом. Так что, в конце концов, я только прокомментировал все назад, и теперь я в растерянности.

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>

class ThreadPool; // forward declare
//std::condition_variable cv;
//bool ready = false;
//bool processed = false;

class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
};

class ThreadPool {
public:
ThreadPool(size_t threads);
template<class F> void enqueue(F f);
~ThreadPool();
private:
friend class Worker;

std::vector<std::thread> workers;
std::deque<std::function<void()>> tasks;

std::mutex queue_mutex;
bool stop;
};

void Worker::operator()()
{
std::function<void()> task;
while (true)
{
std::unique_lock<std::mutex> locker(pool.queue_mutex);
//cv.wait(locker, [] {return ready; });

if (pool.stop) return;
if (!pool.tasks.empty())
{
task = pool.tasks.front();
pool.tasks.pop_front();
locker.unlock();
//cv.notify_one();
//processed = true;
task();
}
else {
locker.unlock();
//cv.notify_one();
}
}
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
for (size_t i = 0; i < threads; ++i)
workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
stop = true; // stop all threads

for (auto &thread : workers)
thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
std::unique_lock<std::mutex> lock(queue_mutex);
//cv.wait(lock, [] { return processed; });
tasks.push_back(std::function<void()>(f));
//ready = true;
}

int main()
{
ThreadPool pool(4);

for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

std::cin.ignore();
return 0;
}

3

Решение

Вот рабочий образец:

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>
#include <atomic>

class ThreadPool;

// forward declare
std::condition_variable ready_cv;
std::condition_variable processed_cv;
std::atomic<bool> ready(false);
std::atomic<bool> processed(false);

class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
};

class ThreadPool {
public:
ThreadPool(size_t threads);
template<class F> void enqueue(F f);
~ThreadPool();
private:
friend class Worker;

std::vector<std::thread> workers;
std::deque<std::function<void()>> tasks;

std::mutex queue_mutex;
bool stop;
};

void Worker::operator()()
{
std::function<void()> task;

// in real life you need a variable here like while(!quitProgram) or your
// program will never return. Similarly, in real life always use `wait_for`
// instead of `wait` so that periodically you check to see if you should
// exit the program
while (true)
{
std::unique_lock<std::mutex> locker(pool.queue_mutex);
ready_cv.wait(locker, [] {return ready.load(); });

if (pool.stop) return;
if (!pool.tasks.empty())
{
task = pool.tasks.front();
pool.tasks.pop_front();
locker.unlock();
task();
processed = true;
processed_cv.notify_one();
}
}
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
for (size_t i = 0; i < threads; ++i)
workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
stop = true; // stop all threads

for (auto &thread : workers)
thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.push_back(std::function<void()>(f));
processed = false;
ready = true;
ready_cv.notify_one();
processed_cv.wait(lock, [] { return processed.load(); });
}

int main()
{
ThreadPool pool(4);

for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

std::cin.ignore();
return 0;
}

Выход:

Text printed by worker 0
Text printed by worker 1
Text printed by worker 2
Text printed by worker 3
Text printed by worker 4
Text printed by worker 5
Text printed by worker 6
Text printed by worker 7

Почему бы не сделать это в рабочем коде

Поскольку назначение состоит в том, чтобы печатать строки по порядку, этот код не может на самом деле действительно распараллелить, и поэтому мы придумали способ заставить его работать полностью последовательно, используя необходимые Золотой молот из std::condition_variable, Но, по крайней мере, мы избавились от этого чертовски занятого ожидания!

В реальном примере вы хотите обрабатывать данные или выполнять задачи параллельно и синхронизировать просто выход, и эта структура все еще не является правильным подходом к этому, если вы делали это с нуля.

Что я изменил и почему

Я использовал атомарные bools для условий, потому что они имеют детерминированное поведение при совместном использовании несколькими потоками. Не обязательно во всех случаях, но, тем не менее, это хорошая практика.

Вы должен включить условие выхода в while(true) цикл (например, флаг, который устанавливается SIGINT обработчик или что-то) или ваша программа будет никогда выход. Это просто задание, поэтому мы его пропустили, но это очень важно, чтобы не пренебрегать в производственном коде.

Может быть, назначение можно решить с помощью одной условной переменной, но я не уверен в этом, и в любом случае лучше использовать две, потому что это гораздо яснее и понятнее, что делает каждый. В основном, мы ждем задачу, затем просим, ​​чтобы enqueuer подождал, пока она не была выполнена, затем скажите, что она действительно обработана, мы готовы к следующей. Сначала вы были на правильном пути, но я думаю, что с двумя резюме стало более очевидно, что происходит не так.

Кроме того, важно установить условие Vars (ready а также processed) до с помощью notify(),

я удалил locker.unlock() потому что дело не нужно. c ++ стандартные блокировки RAII структуры и, следовательно, блокировка будет разблокирована, когда она выйдет из области видимости, что в основном является следующей строкой. В общем, лучше избегать бессмысленных ветвлений, поскольку ваша программа излишне полна состояния.

Педагогическая напыщенная речь …

Теперь, когда эта проблема была решена и решена, у меня есть несколько вещей, которые, я думаю, нужно сказать о задании в целом, и которые, я думаю, вероятно, будут важнее для вашего обучения, чем решение проблемы, как указано.

Если вы были смущены или разочарованы назначением, тогда хорошо, ты должен быть. Имеет смысл, что вам трудно вставить квадратный колышек в круглое отверстие, и я думаю, что реальная ценность этой проблемы — научиться определять, когда вы используете правильный инструмент для правильной работы, а когда нет. ,

Условные переменные являются правильный инструмент для решения проблемы занятой петли, однако это назначение (как указано @ n.m.) простое состояние гонки. Тем не менее, это всего лишь простое условие гонки, поскольку оно включает в себя ненужный и плохо реализованный пул потоков, что делает проблему сложной и трудной для понимания без какой-либо цели. И это сказал, std::async в любом случае следует отдавать предпочтение перед ручным пулом потоков в современном c ++ (это проще для правильной реализации и более производительно на многих платформах, и для этого не требуется куча глобальных и синглтонов и исключительно выделенных ресурсов).

Если бы это было задание от вашего босса, а не от профессора, вот что вы бы сделали:

for(int i = 0; i < 8; ++i)
{
std::cout << "Text printed by worker " << i << std::endl;
}

Эта проблема решается (оптимально) простым for петля. Проблемы с занятым ожиданием / блокировкой являются результатом ужасного дизайна, и «правильное» решение — исправить конструкцию, а не перевязать ее. Я даже не думаю, что это задание поучительно, потому что нет никакой возможности или причины распараллелить вывод, так что это просто приводит в замешательство всех, включая сообщество SO. Похоже на негативное обучение, что потоки просто вносят ненужную сложность, не улучшая вычисления.

Трудно сказать, действительно ли профессор хорошо понимает концепции потоков и переменных условий из структуры задания. Задания по необходимости должны быть сведены к минимуму, упрощены и несколько упрощены для целей обучения, но на самом деле это противоположно тому, что было сделано здесь, где сложная проблема была сделана из простой.

Как правило, я никогда не отвечаю на вопросы, связанные с домашним заданием по SO, потому что я думаю, что раздавать ответы мешает обучению, и что наиболее ценный навык разработчика — это научиться биться головой о стену, пока идея не натолкнется на нее. Тем не менее, нет ничего, кроме отрицательного обучения, которое можно получить от надуманных заданий, подобных этому, и хотя в школе вы должны играть по правилам профессора, важно научиться распознавать надуманные проблемы, когда вы их видите, разбирать их и приходить к простое и правильное решение.

3

Другие решения

Я думаю, что это нормально, так как мьютекс не заблокирован перед печатью.
Для каждого поворота в цикле нет гарантии, что я буду напечатан раньше, чем i + 1.

Для хорошего приоритета печати вы должны отображать сообщения после блокировки мьютекса в очереди функций.

1