производитель многопоточных программ / потребитель [boost]

Я играю с буст-библиотекой и C ++. Я хочу создать многопоточную программу, которая содержит производителя, потребителя и стек. Прокурер заполняет стек, потребитель удаляет элементы (int) из стека. все работает (pop, push, mutex) Но когда я вызываю pop / push winthin в потоке, я не получаю никакого эффекта

я сделал этот простой код:

#include "stdafx.h"#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/signals2/mutex.hpp>
#include <ctime>

using namespace std;

/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private :
stack<int> m_Stack;
boost::signals2::mutex m;

public :
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){

}
void push(int x){
m.lock();
m_Stack.push(x);
m.unlock();
}

void pop(){
m.lock();
//return m_Stack.top();
if(!m_Stack.empty())
m_Stack.pop();
m.unlock();
}
int size(){
return m_Stack.size();
}
bool isEmpty(){
return m_Stack.empty();
}
int top(){
return m_Stack.top();
}
};

/*
*The producer is the class that fills the stack. It encapsulate the thread object
*/

class Producer{
public:
Producer(int number ){
//create thread here but don't start here
m_Number=number;}
void fillStack (ProtectedStack& s ) {
int object = 3; //random value
s.push(object);
//cout<<"push object\n";
}

void produce (ProtectedStack & s){
//call fill within a thread
m_Thread = boost::thread(&Producer::fillStack,this, s);
}

private :
int m_Number;
boost::thread m_Thread;

};/* The consumer will consume the products produced by the producer */

class Consumer {
private :
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
m_Number = n;
}

void remove(ProtectedStack &s ) {

if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
//cout<<"stack is empty\n";
boost::posix_time::seconds workTime(1);
boost::this_thread::sleep(workTime);
}
else{
s.pop(); //pop it
//cout<<"pop object\n";

}

}

void consume (ProtectedStack & s){
//call remove within a thread
m_Thread = boost::thread(&Consumer::remove, this, s);
}

};int main(int argc, char* argv[])
{ProtectedStack s;Producer p(0);
p.produce(s);

Producer p2(1);
p2.produce(s);

cout<<"size after production "<<s.size()<<endl;
Consumer c(0);
c.consume(s);
Consumer c2(1);
c2.consume(s);
cout<<"size after consumption  "<<s.size()<<endl;

getchar();
return 0;
}

После того, как я запускаю это в VC ++ 2010 / win7
я получил :
0
0

Не могли бы вы помочь мне понять, почему, когда я вызываю функцию fillStack из основного, я получаю эффект, но когда я вызываю его из потока, ничего не происходит?
Спасибо

0

Решение

Основная проблема с вашим кодом заключается в том, что ваши потоки не синхронизированы.
Помните, что по умолчанию выполнение потоков не упорядочено и не упорядочено, поэтому потребительские потоки могут быть (и в вашем конкретном случае) завершены до любой поток производителя производит любые данные.

Чтобы убедиться, что потребители будут работать после того, как производители закончили свою работу, вам нужно использовать thread::join() функция на потоках производителей, она остановит выполнение основного потока до выхода производителей:

// Start producers
...

p.m_Thread.join();  // Wait p to complete
p2.m_Thread.join(); // Wait p2 to complete

// Start consumers
...

Это сделает свое дело, но, вероятно, это не хорошо для типичного варианта использования производитель-потребитель.

Чтобы добиться более полезного случая, вам нужно исправить функцию потребителя.
Ваша потребительская функция на самом деле не ждет ожидаемых данных, она просто завершится, если стек пуст, и никогда не потребляет никаких данных, если еще не было произведено никаких данных.

Это должно быть так:

void remove(ProtectedStack &s)
{
// Place your actual exit condition here,
// e.g. count of consumed elements or some event
// raised by producers meaning no more data available etc.
// For testing/educational purpose it can be just while(true)
while(!_some_exit_condition_)
{
if(s.isEmpty())
{
// Second sleeping is too big, use milliseconds instead
boost::posix_time::milliseconds workTime(1);
boost::this_thread::sleep(workTime);
}
else
{
s.pop();
}
}
}

Другая проблема не так thread использование конструктора:

m_Thread = boost::thread(&Producer::fillStack, this, s);

Цитата из Boost.Thread документация:

Конструктор потока с аргументами

template <class F,class A1,class A2,...>
thread(F f,A1 a1,A2 a2,...);

Предпосылки:
F и каждый An должен быть копируемым или подвижным.

Последствия:
Как будто thread(boost::bind(f,a1,a2,...)), Как следствие, е и каждый копируются в
внутреннее хранилище для доступа новым потоком
.

Это означает, что каждый ваш поток получает свою собственную копию s и все модификации не применяются к s но в локальную ветку копирует. Это тот же случай, когда вы передаете объект в аргумент функции по значению. Вам нужно пройти s объект по ссылке вместо этого — используя boost::ref:

void produce(ProtectedStack& s)
{
m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
}

void consume(ProtectedStack& s)
{
m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
}

Еще одна проблема связана с использованием мьютекса. Это не самое лучшее.

  1. Почему вы используете мьютекс из библиотеки Signals2? Просто используйте boost::mutex из Boost.Thread и удалить ненужную зависимость от библиотеки Signals2.

  2. Используйте оболочку RAII boost::lock_guard вместо прямого lock/unlock звонки.

  3. Как уже упоминалось, вы должны защищать всех членов ProtectedStack,

Образец:

boost::mutex m;

void push(int x)
{
boost::lock_guard<boost::mutex> lock(m);
m_Stack.push(x);
}

void pop()
{
boost::lock_guard<boost::mutex> lock(m);
if(!m_Stack.empty()) m_Stack.pop();
}

int size()
{
boost::lock_guard<boost::mutex> lock(m);
return m_Stack.size();
}

bool isEmpty()
{
boost::lock_guard<boost::mutex> lock(m);
return m_Stack.empty();
}

int top()
{
boost::lock_guard<boost::mutex> lock(m);
return m_Stack.top();
}
2

Другие решения

Ваш пример кода страдает от пары проблем с синхронизацией, как отмечено другими:

  • Отсутствуют блокировки при вызовах некоторых участников ProtectedStack.
  • Основной поток может завершиться без разрешения присоединения рабочих потоков.
  • Производитель и потребитель не делают петли, как вы ожидаете. Производители должны всегда (когда они могут) производить, а потребители должны продолжать потреблять, так как новые элементы помещаются в стек.
  • cout’s в главном потоке вполне может быть выполнен до того, как производители или потребители уже успели поработать.

Я бы рекомендовал использовать условную переменную для синхронизации между вашими производителями и потребителями. Посмотрите на пример производителя / потребителя здесь: http://en.cppreference.com/w/cpp/thread/condition_variable
Это довольно новая функция в стандартной библиотеке C ++ 11 и поддерживается начиная с VS2012. До VS2012 вам необходимо либо повысить, либо использовать вызовы Win32.

Использование условной переменной для решения проблемы производителя / потребителя хорошо, потому что она почти навязывает использование мьютекса для блокировки общих данных и предоставляет механизм сигнализации, позволяющий потребителям знать, что что-то готово к использованию, чтобы они не вращались. (что всегда является компромиссом между отзывчивостью потребителя и использованием процессора, опрашивающим очередь). Это также делает его атомарным, что предотвращает вероятность того, что потоки пропустят сигнал о том, что есть что потреблять, как описано здесь: https://en.wikipedia.org/wiki/Sleeping_barber_problem

Чтобы дать краткий обзор того, как переменная условия заботится об этом …

  • Производитель выполняет все трудоемкие действия в своем потоке без владения мьютексом.
  • Производитель блокирует мьютекс, добавляет элемент, который он произвел, в глобальную структуру данных (возможно, какую-то очередь), отпускает мьютекс и сигнализирует, что единственный потребитель должен идти — в этом порядке.
  • Потребитель, который ожидает переменную условия, повторно получает мьютекс автоматически, удаляет элемент из очереди и выполняет с ним некоторую обработку. В течение этого времени производитель уже работает над созданием нового предмета, но должен дождаться, пока потребитель закончит, прежде чем он сможет поставить товар в очередь.

Это будет иметь следующее влияние на ваш код:

  • Больше нет необходимости в ProtectedStack, подойдет обычная структура данных стека / очереди.
  • Нет необходимости в надстройке, если вы используете достаточно новый компилятор — удаление зависимостей сборки — это всегда хорошо.

У меня такое ощущение, что многопоточность является для вас чем-то новым, поэтому я могу только посоветовать посмотреть, как другие решают проблемы с синхронизацией, поскольку очень сложно обдумать это. Путаница в отношении того, что происходит в среде с несколькими потоками и общими данными, обычно приводит к таким проблемам, как тупики в будущем.

5

Вы не проверяете, что поток создания выполнялся, прежде чем пытаться потреблять. Вы также не ограничиваетесь размером / пустым / верхом … это не безопасно, если контейнер обновляется.

1