Создание очереди блокировки

Иногда это реализация и исполнение BlockingQueue просто работает. Иногда это segfaults. Есть идеи почему?

#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
#include <iostream>
using std::cout;
using std::endl;
#include <queue>
using std::queue;
#include <string>
using std::string;
using std::to_string;
#include <functional>
using std::ref;

template <typename T>
class BlockingQueue {
private:
mutex mutex_;
queue<T> queue_;
public:
T pop() {
this->mutex_.lock();
T value = this->queue_.front();
this->queue_.pop();
this->mutex_.unlock();
return value;
}

void push(T value) {
this->mutex_.lock();
this->queue_.push(value);
this->mutex_.unlock();
}

bool empty() {
this->mutex_.lock();
bool check = this->queue_.empty();
this->mutex_.unlock();
return check;
}
};

void fillWorkQueue(BlockingQueue<string>& workQueue) {
int size = 40000;
for(int i = 0; i < size; i++)
workQueue.push(to_string(i));
}

void doWork(BlockingQueue<string>& workQueue) {
while(!workQueue.empty()) {
workQueue.pop();
}
}

void multiThreaded() {
BlockingQueue<string> workQueue;
fillWorkQueue(workQueue);
thread t1(doWork, ref(workQueue));
thread t2(doWork, ref(workQueue));
t1.join();
t2.join();
cout << "done\n";
}

int main() {
cout << endl;

// Multi Threaded
cout << "multiThreaded\n";
multiThreaded();
cout << endl;
}

1

Решение

Посмотреть здесь:

Что я получу от front () пустого стандартного контейнера?

Плохие вещи случаются, если вы звоните .front() на пустой контейнер, лучше проверьте .empty() первый.

Пытаться:

T pop() {
this->mutex_.lock();
T value;
if( !this->queue_.empty() )
{
value = this->queue_.front();  // undefined behavior if queue_ is empty
// may segfault, may throw, etc.
this->queue_.pop();
}
this->mutex_.unlock();
return value;
}

Примечание. Поскольку атомарные операции важны в такой очереди, я рекомендую изменения API:

bool pop(T &t);  // returns false if there was nothing to read.

Еще лучше, если вы на самом деле используете это там, где это имеет значение, вы, вероятно, захотите пометить используемые элементы перед удалением в случае сбоя.

bool peekAndMark(T &t);  // allows one "marked" item per thread
void deleteMarked();     // if an item is marked correctly, pops it.
void unmark();           // abandons the mark. (rollback)
3

Другие решения

Проблема должна лежать здесь:

while(!itemQueue.empty()) {
itemQueue.pop();
}

Вы резервируете мьютекс, когда проверка значения оставлена, затем вы освобождаете мьютекс, и может случиться, что другой поток будет выполнен, обнаружит, что значение осталось, и отобразит его. В худшем случае ни один элемент не останется после этого, и первый поток попытается всплыть, пока не останется ни одного элемента.

Решение состоит в том, чтобы сделать front / pop вызовы во внутренней очереди в том же разделе, чем проверка на пустое в том же заблокированном разделе, тогда поведение всегда будет определяться.

Другое предложение будет использовать std::lock_guard при работе с мьютексом, потому что он улучшает читаемость и обеспечивает освобождение мьютекса независимо от того, что происходит.

Учитывая тот факт, что эти два совета, ваш pop метод может выглядеть так:

T pop() {
std::lock_guard lock(this->mutex_); //mutex_ is locked
T value;
if( !this->queue_.empty() )
{
value = this->queue_.front();
this->queue_.pop();
}
return value;
} //mutex_ is freed
4