Что не так с этим шаблоном использования boost :: asio и boost :: coroutine?

В этот вопрос, который я описал повышение :: ASIO а также повышение :: сопрограмму шаблон использования, который вызывает случайные сбои моего приложения, и я опубликовал выдержку из своего кода и Valgrind а также GDB выход.

Для дальнейшего изучения проблемы я создал меньший доказательство концепции приложение, которое применяет тот же шаблон. Я увидел, что та же проблема возникает в меньшей программе, источник которой я публикую здесь.

Код запускает несколько потоков и создает пул соединений с несколькими фиктивными соединениями (предоставленные пользователем номера). Дополнительными аргументами являются целые числа без знака, которые играют роль поддельных запросов. Фиктивная реализация sendRequest Функция просто запускает асинхронный таймер для ожидания числа секунд, равного номеру входа и yileds из функции.

Может кто-то увидеть проблему с этим кодом и может ли он предложить какое-то решение для этого?

#include "asiocoroutineutils.h"#include "concurrentqueue.h"
#include <iostream>
#include <thread>

#include <boost/lexical_cast.hpp>

using namespace std;
using namespace boost;
using namespace utils;

#define id this_thread::get_id() << ": "
// ---------------------------------------------------------------------------

/*!
* \brief This is a fake Connection class
*/
class Connection
{
public:
Connection(unsigned connectionId)
: _id(connectionId)
{
}

unsigned getId() const
{
return _id;
}

void sendRequest(asio::io_service& ioService,
unsigned seconds,
AsioCoroutineJoinerProxy,
asio::yield_context yield)
{
cout << id << "Connection " << getId()
<< " Start sending: " << seconds << endl;

// waiting on this timer is palceholder for any asynchronous operation
asio::steady_timer timer(ioService);
timer.expires_from_now(chrono::seconds(seconds));
coroutineAsyncWait(timer, yield);

cout << id << "Connection " << getId()
<< " Received response: " << seconds << endl;
}

private:
unsigned _id;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
typedef std::shared_ptr<asio::steady_timer> TimerPtr;

// ---------------------------------------------------------------------------

class ConnectionPool
{
public:
ConnectionPool(size_t connectionsCount)
{
for(size_t i = 0; i < connectionsCount; ++i)
{
cout << "Creating connection: " << i << endl;
_connections.emplace_back(new Connection(i));
}
}

ConnectionPtr getConnection(TimerPtr timer,
asio::yield_context& yield)
{
lock_guard<mutex> lock(_mutex);

while(_connections.empty())
{
cout << id << "There is no free connection." << endl;

_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());

_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();

cout << id << "Connection was freed." << endl;
}

cout << id << "Getting connection: "<< _connections.front()->getId() << endl;

ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();
return connection;
}

void addConnection(ConnectionPtr connection)
{
lock_guard<mutex> lock(_mutex);

cout << id << "Returning connection " << connection->getId()
<< " to the pool." << endl;

_connections.emplace_back(std::move(connection));

if(_timers.empty())
return;

auto timer = _timers.back();
_timers.pop_back();
auto& ioService = timer->get_io_service();

ioService.post([timer]()
{
cout << id << "Wake up waiting getConnection." << endl;
timer->cancel();
});
}

private:
mutex _mutex;
deque<ConnectionPtr> _connections;
deque<TimerPtr> _timers;
};

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr;

// ---------------------------------------------------------------------------

class ScopedConnection
{
public:
ScopedConnection(ConnectionPool& pool,
asio::io_service& ioService,
asio::yield_context& yield)
: _pool(pool)
{
auto timer = make_shared<asio::steady_timer>(ioService);
_connection = _pool.getConnection(timer, yield);
}

Connection& get()
{
return *_connection;
}

~ScopedConnection()
{
_pool.addConnection(std::move(_connection));
}

private:
ConnectionPool& _pool;
ConnectionPtr _connection;
};

// ---------------------------------------------------------------------------

void sendRequest(asio::io_service& ioService,
ConnectionPool& pool,
unsigned seconds,
asio::yield_context yield)
{
cout << id << "Constructing request ..." << endl;

AsioCoroutineJoiner joiner(ioService);

ScopedConnection connection(pool, ioService, yield);

asio::spawn(ioService, bind(&Connection::sendRequest,
connection.get(),
std::ref(ioService),
seconds,
AsioCoroutineJoinerProxy(joiner),
placeholders::_1));

joiner.join(yield);

cout << id << "Processing response ..." << endl;
}

// ---------------------------------------------------------------------------

void threadFunc(ConnectionPool& pool,
ConcurrentQueue<unsigned>& requests)
{
try
{
asio::io_service ioService;

while(true)
{
unsigned request;
if(!requests.tryPop(request))
break;

cout << id << "Scheduling request: " << request << endl;

asio::spawn(ioService, bind(sendRequest,
std::ref(ioService),
std::ref(pool),
request,
placeholders::_1));
}

ioService.run();
}
catch(const std::exception& e)
{
cerr << id << "Error: " << e.what() << endl;
}
}

// ---------------------------------------------------------------------------

int main(int argc, char* argv[])
{
if(argc < 3)
{
cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..."<< endl;
return -1;
}

try
{
auto poolSize = lexical_cast<size_t>(argv[1]);
auto threadsCount = lexical_cast<size_t>(argv[2]);

ConcurrentQueue<unsigned> requests;
for(int i = 3; i < argc; ++i)
{
auto request = lexical_cast<unsigned>(argv[i]);
requests.tryPush(request);
}

ConnectionPoolPtr pool(new ConnectionPool(poolSize));

vector<unique_ptr<thread>> threads;
for(size_t i = 0; i < threadsCount; ++i)
{
threads.emplace_back(
new thread(threadFunc, std::ref(*pool), std::ref(requests)));
}

for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
}
catch(const std::exception& e)
{
cerr << "Error: " << e.what() << endl;
}

return 0;
}

Вот некоторые вспомогательные утилиты, используемые приведенным выше кодом:

#pragma once

#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>

namespace utils
{

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context& yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}

class AsioCoroutineJoiner
{
public:
explicit AsioCoroutineJoiner(boost::asio::io_service& io)
: _timer(io), _count(0) {}

void join(boost::asio::yield_context yield)
{
assert(_count > 0);
_timer.expires_from_now(
boost::asio::steady_timer::clock_type::duration::max());
coroutineAsyncWait(_timer, yield);
}

void inc()
{
++_count;
}

void dec()
{
assert(_count > 0);
--_count;
if(0 == _count)
_timer.cancel();
}

private:
boost::asio::steady_timer _timer;
std::size_t _count;

}; // AsioCoroutineJoiner class

class AsioCoroutineJoinerProxy
{
public:
AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner)
: _joiner(joiner)
{
_joiner.inc();
}

AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy)
: _joiner(joinerProxy._joiner)
{
_joiner.inc();
}

~AsioCoroutineJoinerProxy()
{
_joiner.dec();
}

private:
AsioCoroutineJoiner& _joiner;

}; // AsioCoroutineJoinerProxy class

} // utils namespace

Для полноты кода последняя отсутствующая часть ConcurrentQueue учебный класс. Это слишком долго, чтобы вставить это здесь, но если вы хотите, вы можете найти его Вот.

Пример использования приложения:

./ connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

где первое число 3 — число поддельных соединений, а второе число 3 — количество используемых потоков. Цифры после них являются поддельными запросами.

Выход из Valgrind а также GDB такой же, как в упомянутом выше вопрос.

Используемая версия увеличение является 1,57. Компилятор GCC 4.8.3. Операционная система CentOS Linux выпуск 7.1.1503

6

Решение

Кажется, что все Valgrind ошибки вызваны из-за BOOST_USE_VALGRIND макрос не определяется как Тэннер Сансбери точки в комментарии, связанные с этот вопрос. Похоже, что кроме этой программы это правильно.

1

Другие решения