应用场景说明:完成端口在面向现实应用的许多网络通信中应用很广泛,例如大型多人在线游戏,大型即时通信系统,网吧管理系统以及企业管理系统等具有大量并发用户请求的场合。
实现目标说明:通过完成端口模型构建一款服务器软件,能够接受多个客户端的同时访问,并且每个客户端都可以给服务器发送数据,服务器能接收到客户发来的数据并且统计并打印字节数,并且把打印的字节数发送给客户端。
关键功能就是完成端口模型的设计,完成端口内部提供了线程池的管理,可以避免反复创建线程的开销,同时可以根据CPU的个数灵活地决定线程个数,减少线程调度的次数,从而提高了程序的并行处理能力。
整个程序开始后,完成端口进行初始化工作线程启动,套接字初始化连接初始化,然后将套接字与完成端口相关联,异步接收数据,然后对于结果结果进行判断,如果错误需要处理错误然后结束;如果正确并且没有新的完成端口对象则借书;如果正确并且有新的完成端口对象则继续轮回到将套接字与完成端口相关联这一步,继续完成程序。
1、程序设计的流程图

- 判断系统中安装了多少个处理器,创建n个工作线程,n一般取当前计算机中处理器个数。工作线程的主要功能是检测完成端口的状态,如果有来自客户的数据,则接收数据,处理请求;
- 初始化Windows Sockets环境,初始化套接字;
- 创建完成端口对象,将待处理网络请求的套接字与完成端口对象关联;
- 异步接收数据,无论能否接收到数据,都会直接返回。
2、主要接口函数说明
1.完成端口对象创建函数:
HANDLE WINAPI CreateIoCompletionPort(_in HANDLE FileHandle,_in HANDLE ExistingCompletionPort,_in ULONG_PTR CompletionKey,_in DWOR NumberOfConcurrentThreads);
FileHandle:是重叠I/O操作关联的文件句柄。
ExistingCompletionPort:是已经存在的完成端口句柄。
CompletionKey:包含在每个I/O完成数据包中用于指定文件句柄的单句柄数据,它将与FileHandle文件句柄关联在一起,应用程序可以再此存储任意类型的信息,通常是一个指针。
NumberOfConcurrentThreads:指定I/O完成端口上操作系统允许的并发处理I/O完成数据包的最大线程数量。
返回值含义:若函数执行成功,返回与套接字句柄相关联的I/O完成端口句柄;若函数执行失败,返回NULL。
2.等待重叠I/O操作结果函数:
BOOL WINAPI GetQueuedCompletionStatus(_in HANDLE CompletionPort,_out LPDWORD lpNumberOfBytes,_out PULONG_PTR lpCompletionKey,_out LPOVERLANPPED* lpOverlapped,_in DWORD dwMilliseconds)
CompletionPort:完成端口句柄。
lpNumberOfBytes:获取已经完成的I/O操作中传输的字节数
lpCompletionKey:或区域已经完成的I/O操作的文件句柄相关联的点句柄数据,在一个套接字首次与完成端口关联到一起的时候,那些数据便于一个特定的逃跑将诶自己并对应起来了,这些数据是运行CreateIoCompletionPort()函数时通过 CompletionKey参数传递的。
lpOverlapped:在完成的I/O操作开始时指定的重叠结构地址,在它后面跟对单I/O操作数据。
dwMilliseconds:函数在完成端口上等待的时间。
返回值含义:若函数从完成端口上获取到成功的I/O操作完成通知包,返回非0值;若函数从完成端口上获取到失败的I/O操作完成通知包或是函数调用超时,返回0值。
3.数据接收函数,覆盖了标准的recv函数,用于客户端对于服务器的数据发送功能。
WSARecv(_in SOCKET s,_inout LPWSABUF lpBuffers,_in WDORD dwBufferCount,_out LPDWORD lpNumberOfBytesRecvd,_inout LPDWORD lpFlags,_in LPWSAOVERLAPPED lpOverlapped,_in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine)
s:表示一个已连接套接字的描述符。
lpBuffers:一个指向WSABUF结构数组的指针,每个WSABUF结构包含缓冲区的指针和缓冲区的大小。
dwBufferCount:记录lpBuffers数组中的WSABUF结构的数目。
lpNumberOfBytesRecvd:是一个返回值,如果I/O操作立即完成,则该参数指令接收数据的字节数。
lpFlags:标志位。
lpOverlapped:指向WSAOVERLAPPED机构的指针。
lpCompletionRoutine:指向完成例程,只一个指向接收操作完成调用的完成例程的指针。
返回值含义:指示了实际接收到的的字节总数。
4.数据发送函数,在该程序中主要用于服务器对客户端回射服务功能。
Send(_in SOCKET s,_in const char *buf,_in int len,_in int flags)
S:已连接套接字的描述符,数据的发送将会通过它参考指向的套接字结构,获得数据通信的对方地址,然后把数据发送出去。Buf:指向要发送的字节序列。
Len:要发送的字节数。
Flags:提供了一种改变套接字调用默认行为的方式,在本次实验中默认为零。
返回值含义:指示了实际发送的字节总数。
3、主要的结构体说明
(1)PER_IO_DATA结构用于保存单I/O操作的相关数据,包含了重叠结构、缓冲区对象、缓冲区数组、接受的字节数等,定义如下:
//定义PER_IO_DATA结构typedef struct{OVERLAPPED Overlapped; //重叠结构WSABUF DataBuf; //缓冲区对象CHAR Buffer[DEFAULT_BUFLEN]; //缓冲区数组DWORD BytesRECV; //接受的字节数}PER_IO_DATA, * LPPER_IO_DATA;
(2)PER_HANBLE_DATA结构用于保存单句柄数据,此处为与客户进行通信的套接字,定义如下:
//PER_HANDLE_DATA结构typedef struct{SOCKET Socket;}PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
该系统主要使用了流式套接字进行通信。
进行基于流式套接字的服务器程序初始化,首先初始化Windos Sockets环境,然后创建流式套接字,将其绑定到本地地址的27015端口上。
在while循环上处理来自客户的连接请求,接收连接,并将得到的与客户进行通信的套接字保存在LPPER_HANDLE_DATA结构对象RerHandelData中,调用CreateIoCompletionPort()函数将AcceptSocket与前面的完成端口CompletionPort相关联,在AcceptSocket上调用WSARecv()函数,异步接收套接字上来自客户的数据,此时WSARecv()是异步调用的,另外,在工作线程中会检测完成端口的状态,并接收来自客户的数据。
在每个工作线程中调用GetQueuedCompletionStatus()函数检查完成端口端口的状态,参数BytesTransferred用于接收数据的字节数,并且运用send()函数对于统计得到的客户字节数进行回射服务发送给客户端。如果GetQueuedCompletionStatus()函数返回,单参数BytesTransferred为0,这说明客户端已经退出,则关闭与客户进行通信的套接字,释放占用资源。
PER_IO_DATA结构对象PerIoData用于保存I/O操作中的数据。如果其BytesRECV字段值非0,则打印接收到的字节数,之后再次调用WSARecv()函数,投递另一个重叠I/O操作。
4、程序运行结果展示
该系统主要是服务器能够接受多个客户端的同时访问,并且每个客户端都可以给服务器发送数据,服务器能接收到客户发来的数据并且统计并打印字节数然后进行回复。
如下图两个客户端先后与服务器进行连接,都能连通并且都能对服务器发送数据,也可以分别收到服务器的回复。


如下图服务器先收到了客户端1发送的数据并且成功统计打印字节数,然后收到客户端2发送的数据也成功统计并打印了字节数,并且成功发送回去。

客户端代码:
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdlib.h>
#include <stdio.h>
// 连接到WinSock 2对应的lib文件:Ws2_32.lib, Mswsock.lib, Advapi32.lib
#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "AdvApi32.lib")
// 定义默认的缓冲区长度和端口号
#define DEFAULT_BUFLEN 512
#define DEFAULT_PORT "27015"#pragma warning(disable:4996)int __cdecl main(int argc, char** argv)
{WSADATA wsaData;SOCKET ConnectSocket = INVALID_SOCKET;struct addrinfo* result = NULL, * ptr = NULL, hints;char sendbuf[DEFAULT_BUFLEN];char recvbuf[DEFAULT_BUFLEN];int iResultt;int recvbuflen = DEFAULT_BUFLEN;char servIP[20] = "127.0.0.1"; // 保存输入的服务器IP// 初始化套接字iResultt = WSAStartup(MAKEWORD(2, 2), &wsaData);if (iResultt != 0) {printf("WSAStartup failed with error: %d\n", iResultt);return 1;}ZeroMemory(&hints, sizeof(hints));hints.ai_family = AF_UNSPEC;hints.ai_socktype = SOCK_STREAM;hints.ai_protocol = IPPROTO_TCP;while (true){printf("请输入服务器域名/IP地址:\n>>>");fflush(stdout);scanf("%s", servIP); // 输入服务器IP// 解析服务器地址和端口号iResultt = getaddrinfo(servIP, DEFAULT_PORT, &hints, &result);if (iResultt != 0) {printf("getaddrinfo failed with error: %d\n", iResultt);fflush(stdout);continue;//WSACleanup();//return 1;}for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {// 创建套接字ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype,ptr->ai_protocol);if (ConnectSocket == INVALID_SOCKET) {printf("socket failed with error: %ld\n", WSAGetLastError());WSACleanup();return 1;}// 向服务器请求连接iResultt= connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);if (iResultt == SOCKET_ERROR) {closesocket(ConnectSocket);ConnectSocket = INVALID_SOCKET;continue;}break;}break;}freeaddrinfo(result);if (ConnectSocket == INVALID_SOCKET) {printf("Unable to connect to server!\n");WSACleanup();return 1;}printf(">>>连接到服务器(%s)...\n>>>连接成功...[ok]\n", servIP);struct sockaddr_in sa; // 客户端ipint len = sizeof(sa);if (!getsockname(ConnectSocket, (struct sockaddr*)&sa, &len)){printf("Client Ip:%s ", inet_ntoa(sa.sin_addr));printf("Client Port:%d \n\n", ntohs(sa.sin_port));}printf("====================================================\n");char buff[40] = "";sprintf(buff, "title client:[%s: %d]", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));system(buff); // 设置客户端标题// 发送数据bool flag = true;while (flag){printf("send: ");fflush(stdout);scanf("%s", sendbuf); // 键盘输入数据if (0 == strncmp(sendbuf, "end", 3)) // 输入end 结束发送{// 数据发送结束,调用shutdown()函数声明不再发送数据,此时客户端仍可以接收数据iResultt = shutdown(ConnectSocket, SD_SEND);if (iResultt == SOCKET_ERROR) {printf("shutdown failed with error: %d\n", WSAGetLastError());closesocket(ConnectSocket);WSACleanup();return 1;}flag = false;}else{iResultt = send(ConnectSocket, sendbuf, (int)strlen(sendbuf), 0);if (iResultt == SOCKET_ERROR) {printf("send failed with error: %d\n", WSAGetLastError());closesocket(ConnectSocket);WSACleanup();return 1;}printf("@%s: Bytes Sent: %ld\n-----------------------\n", servIP, iResultt);}// 持续接收数据,直到服务器关闭连接iResultt = recv(ConnectSocket, recvbuf, recvbuflen, 0);if (iResultt > 0)printf("Bytes received: %d\n\n", iResultt);else if (iResultt == 0)printf("Connection closed\n");elseprintf("recv failed with error: %d\n", WSAGetLastError());}// 关闭套接字closesocket(ConnectSocket);// 释放资源WSACleanup();return 0;
}
服务器代码:
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <Windows.h>
#include <stdio.h>
#include <conio.h>
#include <string.h>
#pragma comment (lib, "ws2_32.lib")
#define DEFAULT_BUFLEN 512
#define DEFAULT_PORT 27015
SOCKET AcceptSocket = INVALID_SOCKET; //与客户进行通信的套接字
//定义PER_IO_DATA结构
typedef struct
{OVERLAPPED Overlapped; //重叠结构WSABUF DataBuf; //缓冲区对象CHAR Buffer[DEFAULT_BUFLEN]; //缓冲区数组DWORD BytesRECV; //接受的字节数
}PER_IO_DATA, * LPPER_IO_DATA;//PER_HANDLE_DATA结构
typedef struct
{SOCKET Socket;
}PER_HANDLE_DATA, * LPPER_HANDLE_DATA;//实现工作线程ServerWorkerThread()
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
{HANDLE CompletionPort = (HANDLE)CompletionPortID;DWORD BytesTransferred;LPPER_HANDLE_DATA PerHandleData;LPPER_IO_DATA PerIoData;DWORD RecvBytes;DWORD Flags;int iResult;int d = 0;while (TRUE){//检查完成端口的状态if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0){printf("GetQueuedCompletionStatus failed! \n");return 0;}//如果数据传送完了,则退出if (BytesTransferred == 0){printf("Closing socket %d\n", PerHandleData->Socket);//关闭套接字if (closesocket(PerHandleData->Socket) == SOCKET_ERROR){printf("closesocket failed with error! %d\n", WSAGetLastError());return 0;}//释放结构资源GlobalFree(PerHandleData);GlobalFree(PerIoData);continue;}//如果还没有记录接受的数据数量,则将收到的字节数保存在PerIoData->BytesRECV中if (PerIoData->BytesRECV == 0){PerIoData->BytesRECV = BytesTransferred;}d++;//成功接受到数据printf("\nNo%d Client Bytes received :%d\n", d, BytesTransferred);printf("Bytes send: %d", PerIoData->BytesRECV);int isendResult = send(AcceptSocket, PerIoData->Buffer, PerIoData->BytesRECV, 0);//处理数据请求PerIoData->BytesRECV = 0;Flags = 0;ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));PerIoData->DataBuf.len = DEFAULT_BUFLEN;PerIoData->DataBuf.buf = PerIoData->Buffer;iResult = WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);if (iResult == SOCKET_ERROR){if (WSAGetLastError() != ERROR_IO_PENDING){printf("WSARecv() failed with error %d\n", WSAGetLastError());return 0;}}}
}//实现主函数
int main(int argc, char* agrv[])
{SOCKADDR_IN InternetAddr; //服务器地址
SOCKET ServerSocket = INVALID_SOCKET; //监听套接字HANDLE CompletionPort; //完成端口句柄SYSTEM_INFO SystemInfo; //系统信息(主要用于获取CPU数量)LPPER_HANDLE_DATA PerHandleData; //套接字句柄结构LPPER_IO_DATA PerIoData; //I/O操作结构DWORD RecvBytes; //接收到的字节数DWORD Flags; //WSARecv()函数中制定的标志位DWORD ThreadID; //工作线程编号WSADATA wsaData; //Windows Socket初始化信息DWORD Ret; //函数返回值int iResult;//创建新的完成端口if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL){printf("CreateIoCompletionPort failed! \n");return -1;}//获取系统信息GetSystemInfo(&SystemInfo);//根据CPU数量启动线程for (int i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++){HANDLE ThreadHandle;//创建线程,运行ServerWorkerThread()函数if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort, 0, &ThreadID)) == NULL){printf("CreateThread() failed with error! %d\n", GetLastError());return -1;}CloseHandle(ThreadHandle);}//初始化Windows Sockets环境if ((Ret = WSAStartup(0x0202, &wsaData)) != 0){printf("WSAStartup failed with error &d\n", Ret);return -1;}//创建监听套接字ServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);if (ServerSocket == INVALID_SOCKET){printf("WSASocket() failed with error %d\n", WSAGetLastError());return -1;}//绑定到本地地址的端口InternetAddr.sin_family = AF_INET;InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);InternetAddr.sin_port = htons(DEFAULT_PORT);iResult = bind(ServerSocket, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr));if (iResult == SOCKET_ERROR){printf("bind() failed with error %d\n", WSAGetLastError());return -1;}//开始监听if (listen(ServerSocket, 5) == SOCKET_ERROR){printf("listen() failed with error! %d\n", WSAGetLastError());return -1;}printf("TCP server starting\n");//监听端口打开,就开始在这里循环,一有套接字连上,WSAAccept就创建一个套接字,这个套接字和完成端口关联上sockaddr_in addrClient;int addrClientlen = sizeof(sockaddr_in);while (TRUE){//等待客户连接AcceptSocket = WSAAccept(ServerSocket, (sockaddr*)&addrClient, &addrClientlen, NULL, 0);if (AcceptSocket == SOCKET_ERROR){printf("WSAAccept() failed with error %d\n", WSAGetLastError());return -1;}//分配并设置套接字句柄结构PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));if (PerHandleData == NULL){printf("GlobalAlloc() failed with error! %d\n", GetLastError());return -1;}PerHandleData->Socket = AcceptSocket;//将与客户进行通信的套接字Accept与完成端口CompletionPort相关联if (CreateIoCompletionPort((HANDLE)AcceptSocket, CompletionPort, (DWORD)PerHandleData, 0) == NULL){printf("CreateIoCompletionPort failed! \n");return -1;}//为I/O操作结构分配内存空间PerIoData = (LPPER_IO_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_DATA));if (PerIoData == NULL){printf("GlobalAlloc() failed with error! %d\n", GetLastError());return -1;}//初始化I/O操作结构ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));PerIoData->BytesRECV = 0;PerIoData->DataBuf.len = DEFAULT_BUFLEN;PerIoData->DataBuf.buf = PerIoData->Buffer;Flags = 0;//接收数据,放到PerIoData中,通过工作线程函数取出iResult = WSARecv(AcceptSocket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);if (iResult == SOCKET_ERROR){if (WSAGetLastError() != ERROR_IO_PENDING){printf("WSARecv() failed! \n");return -1;}}}return 0;
}

















