Блокировка потоков происходит с помощью boost :: thread. Что не так с моими переменными условия?

Я написал класс Link для передачи данных между двумя узлами в сети. Я реализовал это с двумя запросами (один для данных, идущих от узла 0 к узлу 1, а другой для данных, идущих от узла 1 к узлу 0). Я пытаюсь многопоточности приложения, но я получаю блокировки потоков. Я пытаюсь предотвратить чтение и запись в один и тот же раздел одновременно. Читая больше о том, как я изначально реализовал это, я думаю, что я неправильно использую условные переменные (и, возможно, не следует использовать логические переменные?). Должен ли я иметь два мьютекса, по одному на каждую деку? Пожалуйста, помоги, если можешь. Спасибо!

class Link {
public:
// other stuff...
void push_back(int sourceNodeID, Data newData);
void get(int destinationNodeID, std::vector<Data> &linkData);

private:
// other stuff...
std::vector<int> nodeIDs_;
// qVector_ has two deques, one for Data from node 0 to node 1 and
// one for Data from node 1 to node 0
std::vector<std::deque<Data> > qVector_;
void initialize(int nodeID0, int nodeID1);

boost::mutex mutex_;
std::vector<boost::shared_ptr<boost::condition_variable> > readingCV_;
std::vector<boost::shared_ptr<boost::condition_variable> > writingCV_;
std::vector<bool> writingData_;
std::vector<bool> readingData_;
};

Функция push_back:

void Link::push_back(int sourceNodeID, Data newData)
{
int idx;
if (sourceNodeID == nodeIDs_[0]) idx = 1;
else
{
if (sourceNodeID == nodeIDs_[1]) idx = 0;
else throw runtime_error("Link::push_back: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (readingData_[idx]) readingCV_[idx]->wait(lock);

writingData_[idx] = true;
qVector_[idx].push_back(newData);
writingData_[idx] = false;
writingCV_[idx]->notify_all();
}

Функция get:

void Link::get(int destinationNodeID,
std::vector<Data> &linkData)
{
int idx;
if (destinationNodeID == nodeIDs_[0]) idx = 0;
else
{
if (destinationNodeID == nodeIDs_[1]) idx = 1;
else throw runtime_error("Link::get: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (writingData_[idx]) writingCV_[idx]->wait(lock);
readingData_[idx] = true;

std::copy(qVector_[idx].begin(),qVector_[idx].end(),back_inserter(linkData));
qVector_[idx].erase(qVector_[idx].begin(),qVector_[idx].end());
readingData_[idx] = false;
readingCV_[idx]->notify_all();
return;
}

и здесь инициализировать (в случае, если это полезно)

void Link::initialize(int nodeID0, int nodeID1)
{
readingData_ = std::vector<bool>(2,false);
writingData_ = std::vector<bool>(2,false);
for (int i = 0; i < 2; ++i)
{
readingCV_.push_back(make_shared<boost::condition_variable>());
writingCV_.push_back(make_shared<boost::condition_variable>());
}
nodeIDs_.reserve(2);
nodeIDs_.push_back(nodeID0);
nodeIDs_.push_back(nodeID1);
qVector_.reserve(2);
qVector_.push_back(std::deque<Data>());
qVector_.push_back(std::deque<Data>());
}

0

Решение

Я пытаюсь многопоточности приложения, но я получаю блокировки потоков.

Что такое «нить»? Трудно понять, чего пытается добиться ваш код. Сначала рассмотрим код push_back (), синхронизированная часть которого выглядит следующим образом:

boost::unique_lock<boost::mutex> lock(mutex_);

while (readingData_[idx]) readingCV_[idx]->wait(lock);

writingData_[idx] = true;
qVector_[idx].push_back(newData);
writingData_[idx] = false;
writingCV_[idx]->notify_all();

Ваш writingData[idx] логическое значение начинается с ложного значения и становится истинным только на мгновение, пока поток заблокирован мьютекс. К тому времени, когда мьютекс освобождается, он снова становится ложным. Так что для любого Другой поток, который должен ждать, чтобы получить мьютекс, writingData[idx] будут никогда быть правдой.

Но в вашем коде get () у вас есть

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (writingData_[idx]) writingCV_[idx]->wait(lock);

К тому времени, когда поток получает блокировку мьютекса, writingData[idx] возвращается к ложному и поэтому цикл while (и ожидание в резюме) никогда поступил.

Точно симметричный анализ относится к readingData[idx] логическое значение, которое также является всегда ложь за пределами блокировки мьютекса.

Таким образом, ваши переменные условия никогда ждал Вы должны полностью переосмыслить свой дизайн.

Начните с одного мьютекса на очередь (для простой передачи данных очередь излишня), и для каждой очереди свяжите переменную условия с непустой очередью. get() Таким образом, метод будет ждать, пока очередь не станет пустой, о чем будет сообщено в push_back() метод. Примерно так (непроверенный код):

template <typename Data>
class BasicQueue
{
public:
void push( Data const& data )
{
boost::unique_lock  _lock( mutex_ );
queue_.push_back( data );
not_empty_.notify_all();
}

void get ( Data& data )
{
boost::unique_lock  _lock( mutex_ );
while ( queue_.size() == 0 )
not_empty_.wait( _lock ); // this releases the mutex
// mutex is reacquired here, with queue_.size() > 0
data = queue_.front();
queue_.pop_front();
}

private:
std::queue<Data>            queue_;
boost::mutex                mutex_;
boost::condition_variable   not_empty_;
};
2

Другие решения

Да. Вам нужны два мьютекса. Ваши взаимные блокировки почти наверняка являются результатом раздора в единственном мьютексе. Если вы взломаете свою запущенную программу с помощью отладчика, вы увидите, где висят потоки. Также я не понимаю, зачем вам нужны bools. (РЕДАКТИРОВАТЬ: может быть возможно придумать дизайн, который использует один мьютекс, но проще и безопаснее придерживаться одного мьютекса на общую структуру данных)

Практическое правило — иметь один мьютекс на общую структуру данных, которую вы пытаетесь защитить. Этот мьютекс защищает структуру данных от одновременного доступа и обеспечивает безопасность потоков. В вашем случае один мьютекс на дек. Например.:

class SafeQueue
{
private:
std::deque<Data> q_;
boost::mutex m_;
boost::condition_variable v_;

public:
void push_back(Data newData)
{
boost::lock_guard<boost::mutex> lock(m_);
q_.push_back(newData);
// notify etc.
}
// ...
};

С точки зрения уведомления через условные переменные смотрите здесь:

Использование условной переменной в ситуации производитель-потребитель

Так что будет также один condition_variable за объект, о котором производитель уведомит, а потребитель будет ждать. Теперь вы можете создать две из этих очередей для общения в обоих направлениях. Имейте в виду, что только с двумя потоками вы все еще можете заблокировать, если оба потока заблокированы (ожидают данные), и обе очереди пусты.

1