сокеты — TCP / IP IOCP иногда получал поврежденные данные — Visual C ++ в Windows

Я пишу простой тестовый клиент и сервер ICOP, чтобы убедиться, что я правильно использую API и что данные, отправляемые клиентом, правильно принимаются сервером. Я включил весь код для этого вопроса.

Именно здесь я столкнулся с некоторыми проблемами: иногда данные в приемных буферах кажутся поврежденными (поврежденными в том смысле, что порции данных в буферах могут быть не в порядке или отсутствовать). Чтобы было ясно, это данные в отдельных буферах приема, я не имею в виду неупорядоченность между несколькими буферами из-за проблем планирования потоков. Я ранее разместил вопрос, связанный с этим Вот. Однако я проделал большую работу по получению правильного примера кода, поэтому выкладываю новый вопрос и буду ссылаться на него. Я надеюсь, что другие смогут запустить этот код и испытать такое же странное поведение.

Тестовый код

Тестовое приложение может работать в двух режимах: клиентском и серверном. Запустите сервер, и он начнет слушать, запустит клиент и подключится к серверу, и, как только он подключится, начнет сбрасывать данные на сервер так быстро, как это позволит. Затем сервер проверяет данные в каждом буфере, которые возвращаются из GetQueuedCompletionStatus после обращений к WSARecv. Каждый раз, когда завершается WSASend, я обнуляю раздел OVERLAPPED структуры и снова вызываю WSASend с исходным буфером данных.

Каждый буфер данных, который отправляет клиент, представляет собой последовательность байтов, которые увеличиваются один за другим до заданного максимума. Я не посылаю полный диапазон 0..255, если этот размер аккуратно умещается в кратные пакеты и каким-то образом скрывает проблему, поэтому в моем примере байт кода варьируется от 0..250. Для каждого созданного буфера отправки я повторяю этот шаблон numberOfGroups раз.

Этот формат должен означать, что я могу иметь несколько ожидающих WSARecv, а затем полностью проверять данные в возвращенных буферах независимо от любого другого буфера, то есть не требуется никакой синхронизации или восстановления порядка. т.е. я могу начать с первого байта и проверить, что они увеличивают один за другим до максимума, а затем сбрасывают до 0. Как только у меня этот тест работает без проблем, я могу перейти к чему-то более сложному, упорядочив полученные буферы и проверив более сложные данные.

В командной строке вы можете указать, сколько может быть одновременных ожидающих вызовов WSASend и WSARecv. Эта проблема, кажется, случается далеко чаще, когда есть 2 или более ожидающих вызовов WSARecv. С 1 он может работать довольно долго, прежде чем случайно обнаружит проблему.

Я тестировал на Windows 7 и использую Visual Studio 2010 C ++.

Количество одновременных вызовов как на клиенте, так и на сервере, похоже, влияет. Использование 2 для обоих может привести к повреждению данных больше, чем к некоторым комбинациям.

Сокеты и IOCP, по-видимому, требуют довольно много стандартного кода только для того, чтобы запустить очень простое клиентское и серверное приложение. Фактический код, который выполняет прием буферов, состоит всего из нескольких строк и включает вызов WSARecv и обработку завершенных вызовов из GetQueuedCompletionStatus.

Этот код вызывает WSARecv

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

Когда вызовы WSARecv завершены, они обрабатываются рабочими потоками — я удалил строки, не связанные с получением данных из этого фрагмента

void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
default:;
}
}
}
}

При вызове connection-> onRecv я проверяю данные. Здесь что-то выглядит явно не так?

Я включил полный код для справки и должен скомпилировать, если вы чувствуете себя авантюрным.


Полный источник для справки

Пример сервера, прослушивающий порт 3000 и имеющий не более 2 ожидающих вызовов WSARecv

> IOCPTest.exe server 3000 2

Пример клиента, подключающегося к 127.0.0.1 через порт 3000 с не более чем 2 ожидающими вызовами WSASend

> IOCPTest.exe client 127.0.0.1 3000 2

Программа состоит из небольшого количества классов

IOCPConnectionManager

Этот класс обрабатывает прослушивание соединений, а также запускает рабочие потоки.

IOCPConnection

Просто отслеживает SOCKET и несколько методов для обработки асинхронных вызовов. IOCPConnection :: onRecv вызывается, когда WSARecv возвращает и проверяет данные в буфере. Он просто печатает сообщение и возвращает результат, если обнаружены данные вне последовательности.

IOCPWorker

Рабочая нить. IOCPWorker :: execute () — это место, где вызывается GetQueuedCompletionStatus.

TestOverlapped

Требуемая ПЕРЕКРЫТНАЯ структура.

Вам также нужно включить Ws2_32.lib и Mswsock.lib для компоновщика.

Основной файл cpp

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  Some allocated objects are not freed at the end, this is a test only *
*                                                                       *
************************************************************************/

#include "stdafx.h"#include <iostream>
#include <string>
#include "IOCPTest.h"#include <Windows.h>

void printUse()
{
std::cout << "Invalid arguments" << std::endl;
std::cout << "This test app has very limited error handling or memory management" << std::endl;
std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}

int main(int argc, char *argv[])
{
if (argc < 4)
{
printUse();
return 0;
}
std::string mode(argv[1]);
if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
{
printUse();
return 0;
}

IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();

bool server = mode.compare("server") == 0;
if (server)
{
std::string listenPort(argv[2]);
std::string postedReceiveCount(argv[3]);

manager->listenPort = atoi(listenPort.c_str());
manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
manager->postedSendCount = 1; // Not really used in this mode
manager->startListening();
}
else
{
if (argc < 5)
{
printUse();
return 0;
}

std::string host(argv[2]);
std::string port(argv[3]);
std::string postedSendCount(argv[4]);

manager->postedReceiveCount = 1; // Not really used in this mode
manager->postedSendCount = atoi(postedSendCount.c_str());

IOCPTest::IOCPConnection *connection = manager->createConnection();

connection->host = host;
connection->port = atoi(port.c_str());
connection->connect();
}
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}

IOCPTest.h

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  std::cout might not be the best approach in a multithreaded          *
*  environment but this is just a simple test app.                      *
*  Some allocated objects are not cleaned up at the end either, but     *
*  again this is just a test.                                           *
*                                                                       *
************************************************************************/

#ifndef IOCPTestH
#define IOCPTestH
#endif

#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>

namespace IOCPTest
{

class IOCPConnection;

enum IOCPSocketOperation
{
soUnknown,
soAccept,
soConnect,
soDisconnect,
soSend,
soRecv,
soQuit
};

struct TestOverlapped
{
OVERLAPPED overlapped;
WSABUF buffer;
IOCPSocketOperation operation;
IOCPConnection *connection;
bool resend; // Set this to keep sending the same data over and over

TestOverlapped(int bufferSize);
~TestOverlapped();
void reset();
};

typedef TestOverlapped *PTestOverlapped;

class IOCPConnectionManager
{
public:
static const int NUMACCEPTS = 5;

WSADATA wsaData;
HANDLE iocp;
SOCKET listenSocket;
USHORT listenPort;
int postedReceiveCount;
int postedSendCount;

void startListening();
void postAcceptEx();

IOCPConnection *createConnection();

IOCPConnectionManager();
};

class IOCPConnection
{
public:
SOCKET socket;
IOCPConnectionManager *manager;
std::string host;
USHORT port;

void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void postRecv(PTestOverlapped overlapped = nullptr);
void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void send(PTestOverlapped overlapped);
void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);

void connect();
};

class IOCPWorker
{
public:
HANDLE threadHandle;
DWORD threadId;
IOCPConnectionManager *manager;

IOCPWorker(bool suspended);

void start();
void execute();
};

}

IOCPTest.cpp

#include "stdafx.h"#include "IOCPTest.h"#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>

namespace IOCPTest
{

LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;

BOOL AcceptEx
(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
)
{
if (fnAcceptEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
return false;
}
}
return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}

BOOL ConnectEx(
SOCKET s,
const struct sockaddr FAR *name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
)
{
if (fnConnectEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
return false;
}
}
return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}

// TestOverlapped

TestOverlapped::TestOverlapped(int bufferSize):
overlapped(),
operation(soUnknown),
connection(nullptr),
buffer(),
resend(false)
{
if (bufferSize > 0)
{
buffer.len = bufferSize;
buffer.buf = (CHAR*) malloc(bufferSize);
}
}

TestOverlapped::~TestOverlapped()
{
if (buffer.buf != nullptr)
{
free(buffer.buf);
}
}

void TestOverlapped::reset()
{
overlapped = OVERLAPPED();
}

// IOCPConnectionManager

IOCPConnectionManager::IOCPConnectionManager():
wsaData(),
listenSocket(0),
listenPort(0),
postedReceiveCount(1)
{
WSAStartup(WINSOCK_VERSION, &wsaData);
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

SYSTEM_INFO systemInfo = SYSTEM_INFO();
GetSystemInfo(&systemInfo);

for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
{
IOCPWorker* worker = new IOCPWorker(true);
worker->manager = this;
worker->start();
}
}

void IOCPConnectionManager::startListening()
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);

sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
localAddress.sin_port = htons(listenPort);

if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error in binding listening socket" << std::endl;
}
if (listen(listenSocket, SOMAXCONN) == 0)
{
std::cout << "Listening on port " << listenPort << std::endl;
}
for (int i = 0; i < NUMACCEPTS; i++)
{
postAcceptEx();
}
}

void IOCPConnectionManager::postAcceptEx()
{
SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
connection->socket = acceptSocket;

CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket

PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
overlapped->operation = soAccept;
overlapped->connection = connection;
DWORD byesReceived = 0;
int result = IOCPTest::AcceptEx
(
listenSocket,
acceptSocket,
overlapped->buffer.buf,
0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
&byesReceived,
(LPOVERLAPPED) overlapped
);
if (!result)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
}
}
}

IOCPConnection *IOCPConnectionManager::createConnection()
{
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;

return connection;
}

// IOCPConnection

void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
manager->postAcceptEx(); // Replace this accept
auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
if (returnCode == SOCKET_ERROR)
{
std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
}
std::cout << "Connection Accepted" << std::endl;
for (int i = 0; i < manager->postedReceiveCount; ++i)
{
postRecv();
}
}

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
if (numberOfBytesTransferred > 0)
{
byte *data = (byte *)overlapped->buffer.buf;
if (data[0] > maxByteExpected)
{
std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
return;
}
byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
{
if (data[i] != next)
{
// Not really the best solution for writing data out from multiple threads. Test app only.
std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
return;
}
else if (next == maxByteExpected)
{
next = 0;
}
else
{
++next;
}
}
//std::cout << "Valid buffer processed" << std::endl;
}
}

void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
for (int i = 0; i < manager->postedSendCount; ++i)
{
// Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
sendOverlapped->connection = this;

for (int j = 0; j < numberOfGroups; ++j)
{
for (byte k = 0; k <= maxByteExpected; ++k)
{
((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
}
}
sendOverlapped->resend = true; // Repeat sending this data
send(sendOverlapped);
}
}

void IOCPConnection::send(PTestOverlapped overlapped)
{
overlapped->reset();
overlapped->operation = soSend;

DWORD bytesSent = 0;
DWORD flags = 0;

if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
}
}
}

void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}

void IOCPConnection::connect()
{
socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
{
std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
return;
}
CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket

sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = 0;

if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
return;
}

addrinfo hints = addrinfo();
addrinfo *remoteAddress = nullptr;

hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;

std::stringstream ss;
ss << port;
//std::cout << ss.str() << std::endl;
if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
{
std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
return;
}

TestOverlapped *overlapped = new TestOverlapped(0);
overlapped->connection = this;
overlapped->operation = soConnect;

BOOL result = IOCPTest::ConnectEx
(
socket,
remoteAddress->ai_addr,
remoteAddress->ai_addrlen,
nullptr,
0,
nullptr,
LPOVERLAPPED(overlapped)
);
if (result == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
//std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
}
}

freeaddrinfo(remoteAddress);
}

// IOCPWorker

DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
((IOCPWorker*)lpParam)->execute();
return 0;
}

IOCPWorker::IOCPWorker(bool suspended)
{
threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}

void IOCPWorker::start()
{
ResumeThread(threadHandle);
}

void IOCPWorker::execute()
{
//std::cout << "TMVIOCPWorker::execute()" << std::endl;
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soAccept:
{
IOCPConnection *connection = overlapped->connection;
connection->onAcceptEx(overlapped, numberOfBytesTransferred);

delete overlapped;
overlapped = nullptr;
break;
}
case soConnect:
{
std::cout << "ConnectEx returned" << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
delete overlapped;
overlapped = nullptr;
break;
}
case soRecv:
{
//std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

overlapped->reset();
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
case soSend:
{
IOCPConnection *connection = overlapped->connection;
connection->onSent(overlapped, numberOfBytesTransferred);

// Send the same data over and over
std::cout << "Resending buffer" << std::endl;
if (overlapped->resend)
{
connection->send(overlapped);
}
else
{
delete overlapped;
}
overlapped = nullptr;
break;
}
default:;
}
}
}
}

}

Большинство полученных буферов правильные, однако у меня все еще есть много прокрутки, как это при работе с 2 буферами приема и 2 буфера отправки для сокета:

Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115

Я надеюсь, что это просто что-то, что я делаю неправильно. Я проверил ту же проверку в буфере данных перед отправкой, как и при получении, чтобы убедиться, что я не сделал что-то глупое, но прошел эту проверку. Я написал сервер на другом языке не используя IOCP, и это, кажется, получает данные правильно. Я также написал клиент на другом языке, и сервер IOCP, похоже, также обнаруживает повреждения в этом случае. Но при этом могут возникнуть проблемы как с клиентом, так и с сервером. Я ценю любое время, которое каждый желающий может потратить на это.

6

Решение

Хорошо, возможно, я нашел твою проблему. Если вы посмотрите на данные, которые вы получаете, все байты в порядке, но внезапно переходят в последовательность, как если бы она была прервана другим вызовом. Теперь из документации MSDN по WSASend а также WSARecv :

Если вы используете порты завершения ввода / вывода, помните, что порядок вызовов, выполняемых в WSASend, также является порядком заполнения буферов. WSASend не должен вызываться на одном сокете одновременно из разных потоков, поскольку это может привести к непредсказуемому порядку буфера.

Если вы используете порты завершения ввода / вывода, имейте в виду, что порядок вызовов, выполняемых в WSARecv, также является порядком заполнения буферов. WSARecv не должен вызываться на одном сокете одновременно из разных потоков, поскольку это может привести к непредсказуемому порядку буфера.

Вот и все. Сейчас я не совсем хороший способ для того, что вы хотите, но то, что вы делаете, вероятно, не так, как это должно быть использовано.

Вы пробовали это через реальную сеть? Интерфейс обратной связи — это специальная схема, которая может вести себя по-другому, но все же она не определена, поэтому не стоит полагаться на это.

4

Другие решения

После проверки рассматриваемого кода кажется, что несколько одновременных вызовов WSARecv на одном сокете могут привести к повреждению данных в результирующих буферах, которые передаются обработчику завершения. Блокировка, которая гарантирует, что каждое соединение выполняет только один вызов WSARecv за раз, исправит это.

Это согласуется с текущей документацией MSDN для WSARecv.

Если вы используете порты завершения ввода / вывода, имейте в виду, что порядок вызовов, выполняемых в WSARecv, также является порядком заполнения буферов. WSARecv не должен вызываться на одном сокете одновременно из разных потоков, поскольку это может привести к непредсказуемому порядку буфера.

Хотя лично я чувствую, что документация может быть более понятной, так как подразумевает, что «порядок заполнения буферов» может быть проблемой, которая хорошо известна и документирована и не учитывает тот факт, что входящий поток данных может действительно распространяться непредсказуемо между буферами.


Это интересно для меня тем, что я никогда не знал, что это проблема, и за 15 лет я никогда ее не видел 🙂 Однако я полагаюсь на последовательность нескольких завершений WSARecv, чтобы избежать хорошо известной и задокументированной проблемы планирования потоков, влияющей на прикажите, чтобы вы обработали завершение чтения, даже если они гарантированно выйдут из IOCP в том порядке, в котором они были введены. Для моей последовательности требуется порядковый номер в каждом буфере чтения, и поэтому у меня есть блокировка приращения порядкового номера и позвонить в WSARecv.

Учитывая, что невозможно выпустить несколько WSARecv из нескольких потоков и успешно воссоздать входящий поток данных, если вы не можете каким-то образом определить последовательность, в которой WSARecv были выпущены из завершений, я не могу понять, как это может быть реальной проблемой с TCP Розетки. Однако это может вызвать проблемы с UDP, поскольку нет необходимости в последовательности и, следовательно, нет необходимости в блокировке, кроме как для предотвращения этой проблемы, и хотя я не думаю, что когда-либо осознавал, что видел это, я думаю, что это может быть проблема в одной системе, с которой я был связан …

Мне нужно больше тестировать со стороной WSASend, но у меня нет оснований думать, что это скорее потокобезопасно, чем вызов WSARecv. Ах, хорошо, вы узнаете что-то новое каждый день …

Я писал об этом Вот.

3

Я думаю, что НЕ публиковать получать многократные
сделать только один полученный буфер для одного сокета в любое время.

после обработки всех данных снова вызовите WSARecv для получения дополнительных данных.

0