Segfault происходит при попытке синхронизации очереди

Я изучаю многопоточность, и я хотел смоделировать проблему производителя-потребителя (используя семафор, если я могу это так назвать).

У меня есть класс, который содержит очередь, производитель вставляет в очередь, а потребитель получает его и печатает. Я смоделировал это следующим образом

class TestClass{
public:
void producer( int i ){
unique_lock<mutex> l(m);
q.push(i);
if( q.size() )
cnd.notify_all();
}

void consumer(){
unique_lock<mutex> l(m);
while( q.empty() ){
cnd.wait(l);
}
int tmp = q.front();
q.pop();
cout << "Producer got " << tmp << endl;
}
void ConsumerInit( int threads ){
for( int i = 0; i < threads; i++ ){
thrs[i] = thread(&TestClass::consumer, this );
}
for( auto &a : thrs )
a.join();
}private:
queue<int> q;
vector<thread> thrs;
mutex m;
condition_variable cnd;
};

И я использовал небольшое консольное приложение для вызова данных:

int main(){
int x;
TestClass t;
int counter = 0;
while( cin >> x ){
if( x == 0 )
break;
if( x == 1)
t.producer(counter++);
if( x == 2 )
t.ConsumerInit(5);
}
}

Поэтому, когда пользователь вводит 1, данные помещаются в очередь, если пользователь нажимает 2 потока, создаются.

В любом порядке, например, нажмите 1 1, а затем 2 или 2 1 1.
это бросает segfault. Я не уверен, почему мое понимание моего кода выглядит следующим образом: давайте предположим, что порядок 2 1 1

Я инициализирую 5 потоков, они видят, что очередь пуста, поэтому они идут спать. Когда я помещаю число в очередь, оно уведомляет все спящие потоки.
Первый снова пробуждает блокировку мьютекса и продолжает извлекать номер из очереди, а затем освобождает мьютекс, когда мьютекс освобождается, другой поток делает то же самое и разблокирует мьютекс, третий поток после разблокировки мьютекса все еще находится в цикле и видит, что Очередь снова пуста и снова переходит в спящий режим, как и все остальные потоки.

Правильна ли эта логика? Если так, то почему это все-таки выдает segfault, если нет, я ценю все объяснения.

Спасибо за помощь!

//редактировать
Судя по ответам suggets, я заменил [] на vector.push_back, но потребитель сейчас ничего не делает с данными, не берет их и не печатает.

2

Решение

Вы не расширяете вектор thrs, когда делаете

thrs[i] = thread(&CTest::consumer, this );

Ты должен сделать

thrs.emplace_back(&CTest::consumer, this);

Вот где будет авария.

1

Другие решения

Ваша проблема не имеет ничего общего с многопоточностью. Вы получаете доступ к std::vector за границами:

  for (int i = 0; i < threads; i++) {
thrs[i] = thread(&CTest::consumer, this);

//...
vector<thread> thrs;

thrs вектор пуст, и вы пытаетесь получить доступ, как если бы он имел записи.

Чтобы показать ошибку, используйте:

        thrs.at(i) = thread(&CTest::consumer, this);

и вас встретят с std::out_of_range исключение вместо ошибки сегментации.

1

Ваша программа блокируется, если последовательность ввода не имеет вид 1 1 1 1 1 ... 2, Это если число, если 1s предшествующий 2 меньше пяти.

Вот причина:

Если общее количество элементов в очереди меньше 5 и основной поток вызывает consumerInitнекоторые из пяти созданных пользовательских потоков будут блокировать ожидание очереди на получение элементов. Между тем основной поток блоков на join операция. Поскольку основной поток будет ожидать завершения потока потребителя, в то время как некоторые из этих потоков ожидают получения данных, никакого прогресса не будет. Отсюда тупик.

1

Проблема здесь:

  for( auto &a : thrs )
a.join();

Основной поток блокируется здесь после входа 2 в ожидании потребителей, чтобы закончить. Таким образом, после этого момента вы думаете, что вводите входные данные, в то время как нет cin происходит.

Удалите эти две строки, а затем вы можете ввести 1 и производитель / потребитель сделает свою работу.

1