Как я могу использовать переменные условия Boost в сценарии производитель-потребитель?

РЕДАКТИРОВАТЬ: ниже

У меня есть один поток, отвечающий за потоковую передачу данных с устройства в буферах. Кроме того, у меня N потоков, выполняющих некоторую обработку этих данных. В моей настройке я бы хотел, чтобы поток стримеров извлекал данные с устройства и ждал, пока N потоков не завершат обработку, прежде чем будут получены новые данные или истечет время ожидания. N потоков должны дождаться получения новых данных, прежде чем продолжить обработку. Я считаю, что эта структура должна работать, если я не хочу, чтобы N потоков повторяли обработку в буфере, и если я хочу, чтобы все буферы обрабатывались без пропуска.

После внимательного прочтения я обнаружил, что условные переменные — это то, что мне нужно. Я следовал инструкциям и другим вопросам переполнения стека, и вот что у меня есть:

глобальные переменные:

boost::condition_variable cond;
boost::mutex mut;

переменные-члены:

std::vector<double> buffer
std::vector<bool> data_ready       // Size equal to number of threads

цикл приема данных (1 поток выполняет это):

while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(any(data_ready))
cond.wait(ll);
}

receive_data(buffer);

{
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
}

cond.notify_all();
}

цикл обработки данных (N потоков выполняют это)

while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(!data_ready[thread_id])
cond.wait(ll);
}

process_data(buffer);

{
boost::lock_guard<boost::mutex> ll(mut);
data_ready[thread_id] = false;
}
cond.notify_all();
}

Эти два цикла находятся в своих собственных функциях-членах одного и того же класса. Переменная buffer является переменной-членом, поэтому она может быть разделена между потоками.

Поток получателя будет запущен первым. Переменная data_ready — это вектор значений типа bool N. data_ready [i] — true, если данные готовы к обработке, и false, если поток уже обработал данные. Функция any (data_ready) выводит true, если любой из элементов data_ready имеет значение true, и false в противном случае. Функция set_true (data_ready) устанавливает для всех элементов data_ready значение true. Поток получателя проверит, обрабатывает ли еще какой-либо поток обработки. Если нет, он будет извлекать данные, устанавливать флаги data_ready, уведомлять потоки и продолжать цикл, который останавливается в начале, пока не будет завершена обработка. Потоки обработки будут проверять свой соответствующий флаг data_ready, чтобы быть истинным. Как только это правда, поток обработки выполнит некоторые вычисления, установит свой соответствующий флаг data_ready в 0 и продолжит цикл.

Если у меня только один поток обработки, программа работает нормально. Как только я добавляю больше потоков, я сталкиваюсь с проблемами, когда результатом обработки является мусор. Кроме того, порядок потоков обработки имеет значение по некоторым причинам; другими словами, запущенный мной последний поток будет выводить правильные данные, тогда как предыдущие потоки будут выводить мусор, независимо от того, какие входные параметры используются для обработки (при условии допустимых параметров). Я не знаю, связана ли эта проблема с моим многопоточным кодом или что-то не так с моим устройством или настройкой обработки данных. Я пытаюсь использовать couts на этапах обработки и получения, и с N потоков обработки, я вижу вывод, как следует:

receive data
process 1
process 2
...
process N
receive data
process 1
process 2
...

Является ли использование условных переменных правильным? В чем может быть проблема?

РЕДАКТИРОВАТЬ: я последовал советам форка и изменил код:

цикл приема данных (1 поток выполняет это):

while (!gotExitSignal())
{
if(!any(data_ready))
{
receive_data(buffer);
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
cond.notify_all();
}
}

цикл обработки данных (N потоков выполняют это)

while (!gotExitSignal())
{
// boost::unique_lock<boost::mutex> ll(mut);
boost::mutex::scoped_lock ll(mut);
cond.wait(ll);

process_data(buffer);

data_ready[thread_id] = false;
}

Это работает несколько лучше. Я использую правильные замки?

1

Решение

Я не прочитал всю вашу историю, но если я быстро посмотрю на код, я увижу, что вы используете неверные условия.
Условие похоже на состояние: когда вы устанавливаете поток в состояние ожидания, оно выдает процессор. Таким образом, ваш поток будет эффективно останавливаться, пока какой-либо другой процесс / поток не уведомит об этом.

В вашем коде есть цикл while, и каждый раз, когда вы проверяете данные, вы ждете. Это неправильно, это должно быть если вместо времени. Но опять же, это не должно быть там. Проверка данных должна быть сделана где-то еще. И ваш рабочий поток должен поставить себя в состояние ожидания после того, как он выполнит свою работу.

Ваши рабочие потоки являются потребителями. И производители — те, кто поставляет данные.
Я думаю, что лучшей конструкцией было бы сделать проверку потока, если есть данные, и уведомить работника (ов).

КОД ПСЕВДО:

//producer
while (true) {

1. lock mutex
2. is data available
3. unlock mutex

if (dataAvailableVariable) {
4. notify a worker
5. set waiting condition
}
}//consumer
while (true) {
1. lock mutex
2. do some work
3. unlock mutex
4. notify producer that work is done
5. set wait condition
}

Вам также следует позаботиться о том, чтобы какой-то поток должен быть живым, чтобы избежать тупика, то есть все потоки находятся в состоянии ожидания.

Надеюсь, это вам немного поможет.

0

Другие решения

Других решений пока нет …