возможная ошибка реализации std :: async для Windows

Кажется, что есть ошибка в реализации Windows std :: async. При большой нагрузке (порядка 1000 потоков, запускаемых асинхронно в секунду), асинхронные задачи никогда не планируются, и ожидание возвращаемого фьючерса приводит к взаимоблокировкам. Посмотрите на этот кусок кода (измененный с политикой запуска, отложенной вместо асинхронного):

BundlingChunk(size_t numberOfInputs, Bundler* parent, ChunkIdType chunkId)
: m_numberOfInputs(numberOfInputs), m_parent(parent), m_chunkId(chunkId)
{
const BundlerChunkDescription& chunk = m_parent->m_chunks[m_chunkId];
const ChunkInfo& original = chunk.m_original;
auto& deserializers = m_parent->m_deserializers;

// Fetch all chunks in parallel.
std::vector<std::map<ChunkIdType, std::shared_future<ChunkPtr>>> chunks;
chunks.resize(chunk.m_secondaryChunks.size());
static std::atomic<unsigned long long int> chunksInProgress = 0;

for (size_t i = 0; i < chunk.m_secondaryChunks.size(); ++i)
{
for (const auto& c : chunk.m_secondaryChunks[i])
{
const auto chunkCreationLambda = ([this, c, i] {
chunksInProgress++;
ChunkPtr chunk = m_parent->m_weakChunkTable[i][c].lock();
if (chunk) {
chunksInProgress--;
return chunk;
}
chunksInProgress--;
return m_parent->m_deserializers[i]->GetChunk(c);
});
std::future<ChunkPtr> chunkCreateFuture = std::async(std::launch::deferred, chunkCreationLambda);
chunks[i].emplace(c, chunkCreateFuture.share());
}
}

std::vector<SequenceInfo> sequences;
sequences.reserve(original.m_numberOfSequences);

// Creating chunk mapping.
m_parent->m_primaryDeserializer->SequenceInfosForChunk(original.m_id, sequences);
ChunkPtr drivingChunk = chunks.front().find(original.m_id)->second.get();
m_sequenceToSequence.resize(deserializers.size() * sequences.size());
m_innerChunks.resize(deserializers.size() * sequences.size());
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}

size_t currentIndex = sequenceIndex * deserializers.size();
m_sequenceToSequence[currentIndex] = sequences[sequenceIndex].m_indexInChunk;
m_innerChunks[currentIndex] = drivingChunk;
}

// Creating sequence mapping and requiring underlying chunks.
SequenceInfo s;
for (size_t deserializerIndex = 1; deserializerIndex < deserializers.size(); ++deserializerIndex)
{
auto& chunkTable = m_parent->m_weakChunkTable[deserializerIndex];
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}

size_t currentIndex = sequenceIndex * deserializers.size() + deserializerIndex;
bool exists = deserializers[deserializerIndex]->GetSequenceInfo(sequences[sequenceIndex], s);
if (!exists)
{
if(m_parent->m_verbosity >= (int)TraceLevel::Warning)
fprintf(stderr, "Warning: sequence '%s' could not be found in the deserializer responsible for stream '%ls'\n",
m_parent->m_corpus->IdToKey(sequences[sequenceIndex].m_key.m_sequence).c_str(),
deserializers[deserializerIndex]->StreamInfos().front().m_name.c_str());
m_sequenceToSequence[currentIndex] = SIZE_MAX;
continue;
}

m_sequenceToSequence[currentIndex] = s.m_indexInChunk;
ChunkPtr secondaryChunk = chunkTable[s.m_chunkId].lock();
if (!secondaryChunk)
{
secondaryChunk = chunks[deserializerIndex].find(s.m_chunkId)->second.get();
chunkTable[s.m_chunkId] = secondaryChunk;
}

m_innerChunks[currentIndex] = secondaryChunk;
}
}
}

Моя версия выше изменена так, что асинхронные задачи запускаются как отложенные, а не как асинхронные, что решает проблему. Кто-нибудь еще видел что-то подобное по состоянию на VS2017 распространяемый 14.12.25810? Воспроизвести эту проблему так же просто, как обучить модель CNTK, которая использует устройства чтения текста и изображений на компьютере с графическим процессором и твердотельным накопителем, так что десериализация процессора становится узким местом. Примерно после 30 минут тренировки обычно возникает тупик. Кто-нибудь видел подобную проблему в Linux? Если это так, это может быть ошибка в коде, хотя я сомневаюсь, потому что счетчик отладки chunksInProgress всегда 0 после тупика. Для справки, весь исходный файл находится по адресу https://github.com/Microsoft/CNTK/blob/455aef80eeff675c0f85c6e34a03cb73a4693bff/Source/Readers/ReaderLib/Bundler.cpp.

1

Решение

Новый день, лучше ответь (много лучше). Читать дальше.

Я провел некоторое время, исследуя поведение std::async на винде и ты прав. Это другое животное, видите Вот.

Итак, если ваш код опирается на std::async всегда запуск нового потока выполнения и немедленный возврат, тогда вы не сможете его использовать. Во всяком случае, не в Windows. На моей машине предел, по-видимому, составляет 768 фоновых потоков, которые более или менее соответствуют тому, что вы наблюдали.

Во всяком случае, я хотел узнать немного больше о современном C ++, поэтому у меня была возможность развернуть собственную замену std::async это может использоваться в Windows с семантикой, отмененной OP. Поэтому я смиренно представляю следующее:

AsyncTask: вставная замена для std::async

#include <future>
#include <thread>

template <class Func, class... Args>
std::future <std::result_of_t <std::decay_t <Func> (std::decay_t <Args>...)>>
AsyncTask (Func&& f, Args&&... args)
{
using decay_func = std::decay_t <Func>;
using return_type = std::result_of_t <decay_func (std::decay_t <Args>...)>;

std::packaged_task <return_type (decay_func f, std::decay_t <Args>... args)>
task ([] (decay_func f, std::decay_t <Args>... args)
{
return f (args...);
});

auto task_future = task.get_future ();
std::thread t (std::move (task), f, std::forward <Args> (args)...);
t.detach ();
return task_future;
};

Тестовая программа

#include <iostream>
#include <string>

int add_two_integers (int a, int b)
{
return a + b;
}

std::string append_to_string (const std::string& s)
{
return s + " addendum";
}

int main ()
{
auto /* i.e. std::future <int> */ f1 = AsyncTask (add_two_integers , 1, 2);
auto /* i.e. int */  i = f1.get ();
std::cout << "add_two_integers : " << i << std::endl;

auto  /* i.e. std::future <std::string> */ f2 = AsyncTask (append_to_string , "Hello world");
auto /* i.e. std::string */ s = f2.get ();        std::cout << "append_to_string : " << s << std::endl;
return 0;
}

Выход

add_two_integers : 3
append_to_string : Hello world addendum

Живая демо Вот (GCC) и Вот (Лязг).

Я многому научился, написав это, и это было очень весело. Я довольно новичок в этом деле, поэтому все комментарии приветствуются. Я буду рад обновить этот пост, если у меня что-то не так.

1

Другие решения

Других решений пока нет …