WinSock I/O 模型 -- OVERLAPPED I/O 模型-爱代码爱编程
简介
OVERLAPPED I/O 模型也是 WinSock 中常见的异步 I/O 模型,相比于我们之前提到的 Select 模型,WSAAsyncSelect 模型 和 WSAEventSelect 模型有更好的性能.
为了方便描述,下文我们将称 Overlapped I/O 模型为 “重叠模型”.
重叠模型的基本设计原理便是让应用程序使用一个
重叠的数据结构(Overlapped),一次投递一个或多个 Winsock I/O 请求。针对那些提交的请求,在它们完成
之后,应用程序可为它们提供服务
使用这个模型,网络应用程序通过接收以 Windows 消息为基础的网络事件通知来处理网络请求。
这篇文章我们就来看看如何使用 重叠 I/O 相关的 api 来实现一个简单的 TCP 服务器.
这里我们介绍基于 Event 的实现.
API 基础
这里我们不再介绍 WSAEvent 类型相关的API,之前的文章中已经涉及过.
Overlapped 结构体
对于该结构体,官方的描述为:
一个包含异步输入输出任务信息的结构体
typedef struct _OVERLAPPED {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
struct {
DWORD Offset;
DWORD OffsetHigh;
} DUMMYSTRUCTNAME;
PVOID Pointer;
} DUMMYUNIONNAME;
HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;
对于该结构体中的字段,我们这里不详细描述,因为大部分虽然当前官方文档中有详细描述,但是同时也声明了未来可能会改变,因此我们的应用程序不应该依赖于这些字段的任何特定值. 而是应该通过对应的 API 方法来获取自己感兴趣的信息.
使用是应该总是将所有字段置为 0 或这 NULL, 除了 hEvent 字段.
唯一非常重要的字段是:
hEvent:一个 WSAEvent 事件的 handle. 当与当前 Overlapped 结构体关联的异步任务完成时,该 hEvent 会被触发.
WSAGetOverlappedResult
WSAGetOverlappedResult 用于获取某 SOCKET 异步任务的结果.
BOOL WSAAPI WSAGetOverlappedResult(
SOCKET s,
LPWSAOVERLAPPED lpOverlapped,
LPDWORD lpcbTransfer,
BOOL fWait,
LPDWORD lpdwFlags
);
- s: SOCKET s 为当通过特定 API(AcceptEx, ConnectEx, DisconnectEx, TransmitFile, TransmitPackets, WSARecv, WSARecvFrom, LPFN_WSARECVMSG (WSARecvMsg), WSASend, WSASendMsg, WSASendTo, 和 WSAIoctl) 添加这个异步任务时,这个异步任务所关联的 SOCKET。
- lpOverlapped: 一个 OVERLAPPED 结构体的指针,为添加该异步任务时所使用的 Overlapped 结构体. 该参数不能为 NULL.
- lpcbTransfer: 返回当前异步任务上已经传输的字节数(发送或者接收)。该参数不能为 NULL
- fWait:指定当前方法调用是否等待当前异步任务结束. 当指定为 TRUE时,该方法会一直阻塞直到当前异步任务完成. 当指定为 FALSE 时,如果当前异步任务还未完成,这个方法会返回 FALSE, 此时调用 WSAGetLastError 将会返回 WSA_IO_INCOMPLETE。
- lpdwFlags:略
AcceptEx
该 API 也可以在 重叠 I/O 模式下使用,并且该方法的性能高于传统的 accept 方法,这里我们为了简单,先不使用 AcceptEx 方法,在 IOCP 模式我们再介绍该方法.
WSARecv
WSARecv 用于从一个已经连接的 SOCKET 接收数据.
int WSAAPI WSARecv(
SOCKET s,
LPWSABUF lpBuffers,
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesRecvd,
LPDWORD lpFlags,
LPWSAOVERLAPPED lpOverlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
- s: SOCKET handle
- lpBufffers: 一个 WSABuf 结构体的数组. 该结构体比较简单,我们在实例小节描述其用法.
- dwBufferCount: lpBuffers 数组中元素的数量
- lpNumberOfBytesRecvd: 当此次方法调用,函数返回时已经成功的在 SOCKET 上读取到了数据,这个参数保存读取到的字节数. 当 lpOverlapped 参数不为空时,该参数可以为空.
- lpOverlapped: 与当前异步接收任务关联的 Overlapped 结构体.
- lpCompletionRoutine: 本文中我们使用基于事件的重叠I/O模型,因此我们不使用这个字段.
- 返回值: 如果当前读操作立马成功,返回值为 0. 否则,返回 SOCKET_ERROR. 具体的错误码通过 WSAGetLastError 获取。 如果具体的错误码为 WSA_IO_PENDING 表明当前异步任务已经成功提交,在该任务完成后 lpCompletionRoutine 会被调用或者 Overlapped 结构体中的 hEvent 事件会被触发。本文,我们将依赖于 hEvent 参数来处理异步完成的任务. 对于其他的错误码,请参考该 API 的官方文档.
WSASend 与 WSARecv 类似,我们不再赘述.
实现思路
- 创建一个 socket 作为监听 socket
- 创建子线程用于等待并处理异步 I/O 任务的结果。
- 在主线程中循环等待新连接的到来。注意,这里我们为了简单使用阻塞的 Accept 方法。 使用 AcceptEx 方法可以异步的来接收新的连接。 但是我们使用较简单的 Accept 方法.
- 在主线程中,当新连接到来,接收它,并为他创建对应的 OVERLAPPED 结构体和 WSAEvent 对象。将 WSAEvent 对象设置到 OVERLAPPED 对象的 hEvent 字段. 然后使用 WSARecv api 来从该客户端链接上接收数据. 注意该读不会阻塞主线程,它是异步的.
- 在子线程中,使用 WSAWaitForMultipleEvents 来等待我们所创建中的所有 Event 中任何一个被触发的事件. 否则阻塞子线程.
- 当有新的 event 被触发时,使用 WSAGetOverlappedResult 来获取当前任务的完成结果, 并处理它(一般都会再次提交新的异步 I/O 任务).
实例
#include <winsock2.h>
#include <windows.h>
#include <stdio.h>
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#pragma comment(lib,"ws2_32.lib")
#define PORT 8080
#define DATA_BUFSIZE 8192
typedef struct _SOCKET_CONTEXT {
CHAR Buffer[DATA_BUFSIZE];
WSABUF DataBuf;
SOCKET Socket;
WSAOVERLAPPED Overlapped;
DWORD BytesSEND;
DWORD BytesRECV;
} SOCKET_CONTEXT, * LPSOCKET_CONTEXT;
DWORD WINAPI ProcessIO(LPVOID lpParameter);
DWORD EventTotal = 0;
WSAEVENT EventArray[WSA_MAXIMUM_WAIT_EVENTS];
LPSOCKET_CONTEXT SocketArray[WSA_MAXIMUM_WAIT_EVENTS];
CRITICAL_SECTION CriticalSection;
int main() {
WSADATA wsaData;
SOCKET ListenSocket, AcceptSocket;
SOCKADDR_IN Addr;
DWORD Flags;
DWORD ThreadId;
DWORD RecvBytes;
// 我们是多线程程序,锁是必不可少的
InitializeCriticalSection(&CriticalSection);
if (WSAStartup(0x0202, &wsaData) != 0) {
printf("WSAStartup() failed with error %d\n", WSAGetLastError());
return 1;
}
if ((ListenSocket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
printf("socket() failed with error %d\n", WSAGetLastError());
return 1;
}
Addr.sin_family = AF_INET;
Addr.sin_addr.s_addr = htonl(INADDR_ANY);
Addr.sin_port = htons(PORT);
if (bind(ListenSocket, (PSOCKADDR) &Addr, sizeof(Addr)) == SOCKET_ERROR) {
printf("bind() failed with error %d\n", WSAGetLastError());
return 1;
}
if (listen(ListenSocket, 10)) {
printf("listen() failed with error %d\n", WSAGetLastError());
return 1;
}
if ((AcceptSocket = WSASocketW(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
printf("Failed to get a socket %d\n", WSAGetLastError());
return 1;
}
if ((EventArray[0] = WSACreateEvent()) == WSA_INVALID_EVENT) {
printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
return 1;
}
// 创建子线程,用来处理异步任务的结果
if (CreateThread(NULL, 0, ProcessIO, NULL, 0, &ThreadId) == NULL) {
printf("CreateThread() failed with error %d\n", GetLastError());
return 1;
}
EventTotal = 1;
while(TRUE) {
// 阻塞的接收新的客户端连接
if ((AcceptSocket = accept(ListenSocket, NULL, NULL)) == INVALID_SOCKET) {
printf("accept() failed with error %d\n", WSAGetLastError());
return 1;
}
EnterCriticalSection(&CriticalSection);
// 新连接到来,为该新连接创建的必要的数据结构,维护该SOCKET的信息
if ((SocketArray[EventTotal] = (LPSOCKET_CONTEXT) GlobalAlloc(GPTR, sizeof(SOCKET_CONTEXT))) == NULL) {
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return 1;
}
// 为该 SOCKE 创建关联的 OVERLAPPED 结构体,
// 初始化 DataBuf 字段,无论我们是接收数据还是发送数据,我们都会使用它
SocketArray[EventTotal]->Socket = AcceptSocket;
ZeroMemory(&(SocketArray[EventTotal]->Overlapped), sizeof(OVERLAPPED));
SocketArray[EventTotal]->BytesSEND = 0;
SocketArray[EventTotal]->BytesRECV = 0;
SocketArray[EventTotal]->DataBuf.len = DATA_BUFSIZE;
SocketArray[EventTotal]->DataBuf.buf = SocketArray[EventTotal]->Buffer;
// 初始化该 Overlapped 结构体的 hEvent 字段,我们异步完成时,我们便可以通过该事件得到通知
// 这样我们便不需要轮询该异步任务的结果,而是直接等到该 Event 被触发,然后区处理便可.
if ((SocketArray[EventTotal]->Overlapped.hEvent = EventArray[EventTotal] = WSACreateEvent()) == WSA_INVALID_EVENT) {
printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
return 1;
}
// 从该连接上读取数据.
Flags = 0;
if (WSARecv(SocketArray[EventTotal]->Socket, &(SocketArray[EventTotal]->DataBuf), 1, &RecvBytes, &Flags, &(SocketArray[EventTotal]->Overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != ERROR_IO_PENDING) {
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 1;
}
// else 表示我们已经成功的提交了异步读任务,该任务目前还在进行中。
// 当它完成时,我们在子线程中处理
}
// else 说明我们已经成功的读取到了数据,
// 读取到的数据存储在 DataBuf 中,
// RecvBytes 存储接收到的数据长度
EventTotal++;
LeaveCriticalSection(&CriticalSection);
if (WSASetEvent(EventArray[0]) == FALSE) {
printf("WSASetEvent() failed with error %d\n", WSAGetLastError());
return 1;
}
}
}
DWORD WINAPI ProcessIO(LPVOID lpParameter) {
DWORD Index;
DWORD Flags;
LPSOCKET_CONTEXT SocketContext;
DWORD BytesTransferred;
DWORD i;
DWORD RecvBytes, SendBytes;
while(TRUE) {
// 等待我们提交的异步任务完成的事件
if ((Index = WSAWaitForMultipleEvents(EventTotal, EventArray, FALSE, WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED) {
printf("WSAWaitForMultipleEvents() failed %d\n", WSAGetLastError());
return 0;
}
if ((Index - WSA_WAIT_EVENT_0) == 0) {
WSAResetEvent(EventArray[0]);
continue;
}
SocketContext = SocketArray[Index - WSA_WAIT_EVENT_0];
WSAResetEvent(EventArray[Index - WSA_WAIT_EVENT_0]); // ResetEvent,以便后边重用该事件
// 获取当前完成的异步任务的结果
if (WSAGetOverlappedResult(SocketContext->Socket, &(SocketContext->Overlapped), &BytesTransferred, FALSE, &Flags) == FALSE || BytesTransferred == 0) {
printf("Closing socket %d\n", SocketContext->Socket);
if (closesocket(SocketContext->Socket) == SOCKET_ERROR) {
printf("closesocket() failed with error %d\n", WSAGetLastError());
}
GlobalFree(SocketContext);
WSACloseEvent(EventArray[Index - WSA_WAIT_EVENT_0]);
// Cleanup SocketArray and EventArray by removing the socket event handle
// and socket information structure if they are not at the end of the array
EnterCriticalSection(&CriticalSection);
if ((Index - WSA_WAIT_EVENT_0) + 1 != EventTotal)
for (i = Index - WSA_WAIT_EVENT_0; i < EventTotal; i++) {
EventArray[i] = EventArray[i + 1];
SocketArray[i] = SocketArray[i + 1];
}
EventTotal--;
LeaveCriticalSection(&CriticalSection);
continue;
}
if (SocketContext->BytesRECV == 0) {
SocketContext->BytesRECV = BytesTransferred;
SocketContext->BytesSEND = 0;
} else {
SocketContext->BytesSEND += BytesTransferred;
}
if (SocketContext->BytesRECV > SocketContext->BytesSEND) {
// 重置 Overlapped 结构体,我们要重用这个结构体,提交一个新的任务
ZeroMemory(&(SocketContext->Overlapped), sizeof(WSAOVERLAPPED));
SocketContext->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
SocketContext->DataBuf.buf = SocketContext->Buffer + SocketContext->BytesSEND;
SocketContext->DataBuf.len = SocketContext->BytesRECV - SocketContext->BytesSEND;
if (WSASend(SocketContext->Socket, &(SocketContext->DataBuf), 1, &SendBytes, 0, &(SocketContext->Overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != ERROR_IO_PENDING) {
printf("WSASend() failed with error %d\n", WSAGetLastError());
return 0;
}
}
} else {
SocketContext->BytesRECV = 0;
// Now that there are no more bytes to send post another WSARecv() request
Flags = 0;
ZeroMemory(&(SocketContext->Overlapped), sizeof(WSAOVERLAPPED));
SocketContext->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
SocketContext->DataBuf.len = DATA_BUFSIZE;
SocketContext->DataBuf.buf = SocketContext->Buffer;
if (WSARecv(SocketContext->Socket, &(SocketContext->DataBuf), 1, &RecvBytes, &Flags, &(SocketContext->Overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != ERROR_IO_PENDING) {
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
}
}
END !!!
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/zhaoruixiang1111/article/details/109684786