многопоточность — блокировка очереди C ++ Segfault w / Boost

У меня была потребность в очереди блокировки в C ++ с возможностью тайм-аута offer(), Очередь предназначена для несколько производителей, один потребитель. Когда я реализовывал, я не нашел хороших существующих очередей, которые бы соответствовали этой потребности, поэтому я сам кодировал их.

Я вижу, что сегфолты выходят из take() метод в очереди, но они с перебоями. Я искал код для проблем, но я не вижу ничего, что выглядит проблематичным.

Мне интересно, если:

  • Существует существующая библиотека, которая делает это надежно, что я должен
    использовать (предпочтительнее повышение или только заголовок).
  • Любой видит в моем коде очевидный недостаток, который мне нужно исправить.

Вот заголовок:

class BlockingQueue
{
public:
BlockingQueue(unsigned int capacity) : capacity(capacity) { };
bool offer(const MyType & myType, unsigned int timeoutMillis);
MyType take();
void put(const MyType & myType);
unsigned int getCapacity();
unsigned int getCount();

private:
std::deque<MyType> queue;
unsigned int capacity;
};

И соответствующие реализации:

boost::condition_variable cond;
boost::mutex mut;

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
Timer timer;

// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);

// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());

if (monitorTimeout <= 0)
{
return false;
}

if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
{
return false;
}
}

cond.notify_all();

queue.push_back(myType);

return true;
}

void BlockingQueue::put(const MyType & myType)
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);

// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}

cond.notify_all();

queue.push_back(myType);
}

MyType BlockingQueue::take()
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);

while (queue.size() == 0)
{
cond.wait(lock);
}

cond.notify_one();

MyType myType = this->queue.front();

this->queue.pop_front();

return myType;
}

unsigned int BlockingQueue::getCapacity()
{
return this->capacity;
}

unsigned int BlockingQueue::getCount()
{
return this->queue.size();
}

И да, я не реализовывал класс с помощью шаблонов — это следующий в списке 🙂

Любая помощь с благодарностью. Проблемы с потоками могут быть действительно трудно определить.

-Бен

2

Решение

Почему cond и mut global? Я ожидаю, что они будут членами вашего объекта BlockingQueue. Я не знаю, что еще касается этих вещей, но там может быть проблема.

Я также реализовал ThreadSafeQueue как часть более крупного проекта:

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

Это похоже на вашу концепцию, за исключением того, что функции enqueue (иначе предложение) не блокируют, потому что в основном нет максимальной емкости. Для обеспечения емкости у меня обычно есть пул с N буферами, добавляемыми во время инициализации системы, и очередь для передачи сообщений во время выполнения, это также устраняет необходимость в выделении памяти во время выполнения, что я считаю хорошей вещью (обычно я работа над встроенными приложениями).

Единственная разница между пулом и очередью состоит в том, что пул получает несколько буферов, поставленных в очередь во время инициализации системы. Итак, у вас есть что-то вроде этого:

ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;

void init()
{
for (int i = 0; i < NUM_BUFS; i++)
{
pool.enqueue(new BufferDataType);
}
}

Затем, когда вы хотите отправить сообщение, вы делаете что-то вроде следующего:

void producerA()
{
BufferDataType *buf;
if (pool.waitDequeue(buf, timeout) == true)
{
initBufWithMyData(buf);
queue.enqueue(buf);
}
}

Таким образом, функция enqueue выполняется быстро и легко, но если пул пуст, вы будете блокировать, пока кто-нибудь не поместит буфер обратно в пул. Идея состоит в том, что какой-то другой поток будет блокировать очередь и будет возвращать буфера в пул, когда они будут обработаны следующим образом:

void consumer()
{
BufferDataType *buf;
if (queue.waitDequeue(buf, timeout) == true)
{
processBufferData(buf);
pool.enqueue(buf);
}
}

В любом случае, посмотрите на это, может быть, это поможет.

0

Другие решения

Я полагаю, что проблема в вашем коде заключается в изменении deque несколькими потоками. Посмотрите:

  1. ты ждешь кодирования из другого потока;
  2. и затем немедленно отправлять сигнал другим потокам, что deque разблокирован непосредственно перед тем, как вы захотите его изменить;
  3. затем вы модифицируете deque, в то время как другие потоки думают, что deque уже разблокирован и начинает делать то же самое.

Итак, попробуйте разместить все cond.notify_*() после модификации дек. т.е .:

void BlockingQueue::put(const MyType & myType)
{
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}

queue.push_back(myType);  // <- modify first

cond.notify_all();        // <- then say to others that deque is free
}

Для лучшего понимания предлагаю прочитать о pthread_cond_wait().

0