многопоточность — тупик с очередью блокировки и барьером в переполнении стека

У меня есть эта очень простая и маленькая программа C ++, которая создает пул потоков, затем положить Сообщения в блокировка очереди делится между потоками, чтобы сказать каждому потоку, что делать.

Сообщение может быть: -1 (конец потока -> прекратить), -2 (барьер -> дождаться, пока все потоки дойдут до него, затем продолжить), другие значения делать случайные вычисления. Цикл выполняется в следующем порядке: некоторые вычисления, барьер, некоторые вычисления, барьер, …, барьер, конец потока, объединение потоков, выход.

Я не могу понять почему я захожу в тупик даже с 2 потоками в бассейне. Очередь не может стать пустой, но порядок, в котором я нажимаю и выскакиваю сообщения, всегда приводит к пустой очереди!

Реализация очереди блокировки — та, которая предложена здесь (C ++, эквивалентный Java BlockingQueue) только с двумя добавленными методами. Я также копирую код очереди ниже.

Любая помощь?

main.cpp

#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"
using namespace std;

// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
int j= q.pop();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
}
}
}

int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations

// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));

for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

// push random number
for(int j=0;j<dist;j++){
queue.push(4);
}

// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}

// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}

Queue.hpp

#include <deque>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
private:
std::mutex              d_mutex;
std::condition_variable d_condition;
std::deque<T>           d_queue;
public:

void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}

T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}

bool empty(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.empty();
}

int size(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.size();
}
};

0

Решение

Я запустил ваш код, и я понимаю проблему. Проблема с опцией «-2». Когда два потока достигают этой точки, ваш основной поток уже поместил в очередь другие значения. Таким образом, если ваша очередь увеличила свой размер между временем, когда ваши потоки получили значение «-2», и до того, как они достигнут опции «-2», ваш код застрянет:
Тема 1: получить -2.
Тема 2: получить -2.
Основная нить: нажать -1.
Основная нить: нажать -1.
Поток 1: подождите, пока вся очередь не станет пустой.
Поток 2: подождите, пока вся очередь не станет пустой.

очередь:
-1
-1

^ это в случае, если dim равно 1. В вашем коде dim равно 8, вы не хотите видеть, как это выглядит ..
Чтобы решить эту проблему, я только отключил следующий цикл:

for(int i=0;i<nt;i++){
queue.push(-2);
}

Когда это пард отключено, код работает отлично.
Вот как я это проверил:

std::mutex guarder;

// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
guarder.lock();
int j= q.pop();
guarder.unlock();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
guarder.lock();
cout << x << std::endl;
guarder.unlock();
}
}
}

int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations

// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));

for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

// push random number
for(int j=0;j<dist;j++){
queue.push(dist);
}

/*// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}*/

// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}

Результат:

4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32

Кстати, зависание не произошло, потому что ваша часть «активного ожидания». Это не хорошо, но обычно вызывает другие проблемы (например, замедляет работу вашей системы).

-1

Другие решения

Я думаю, что проблема заключается в вашем активном ожидании, которое вы описываете как «ПЛОХО, но в любом случае …» и используете размер очереди в качестве барьера вместо использования истинного барьер синхронизации

Для dim = 1 вы выдвигаете очередь с 4, -2, -2. Одна нить захватит 4 и -2, а другая — оставшиеся -2. На данный момент очередь пуста, и у вас есть три потока (два рабочих и основной поток), которые проводят активные гонки ожидания, чтобы увидеть, была ли очередь очищена. Существует мьютекс по размеру, который позволяет только одному читать размер за раз. Если основной поток запланирован первым и определит, что очередь пуста, он будет нажимать -1, -1, чтобы сигнализировать об окончании потока. Теперь очередь больше не пуста, но один или оба рабочих потока ожидают ее опустошения. Так как они ждут, пока он опустеет, прежде чем брать другой элемент, очередь в этом состоянии заблокирована.

Для случая, когда dim> 1, вероятно, существует аналогичная проблема с помещением следующего набора значений в очередь в главном потоке до того, как обе работы подтвердят пустую очередь и выйдут из активного ожидания.

0