如何使用c++,借助完成端口完成大并发服务器的搭建,是今天要讨论的问题,套路如下:
套路总结一下:
创建完成端口
依据CPU核数创建一定数量的线程
线程中不断调用GetQueuedCompletionStatus检查完成端口状态,分别给予处理
创建一个socket,绑定IP和端口
将这个socket绑定到第一步创建的完成端口上
获取当前服务器的CPU核数,根据核数创建一定数量的线程
GetQueuedCompletionStatus这个函数非常重要,上面创建的线程不断检查完成端口的状态,有则处理
void CIocpCtrl::OnExecute()
{SPerHandleData* pstPerHandleData;SPerIoData* pstPerIoData;CCPSock* poSock;CCpListener* poListener;BOOL bRet;DWORD dwByteTrabsferred;//通过while(true)循环,不断的检查while(true) {pstPerHandleData = NULL;pstPerIoData = NULL;dwByteTrabsferred = 0;//最重要的就是GetQueuedCompletionStatus这个函数bRet = GetQueuedCompletionStatus(m_hCompletionPort,&dwByteTrabsferred,(PULONG_PTR)&pstPerHandleData,(LPOVERLAPPED*)&pstPerIoData,INFINITE);// 检查是否是线程退出if(NULL == pstPerHandleData){return;}if(pstPerHandleData->bListen){// for listen eventpoListener = (CCpListener*)pstPerHandleData->ptr; //注意这里if(NULL != poListener && NULL != pstPerIoData){poListener->OnAccept(bRet, pstPerIoData); //接收一个新连接}else {SDASSERT(false);}}else {//for non-listen event poSock = (CCPSock*)pstPerHandleData->ptr; //注意这里if ( NULL == poSock ){continue;}if( FALSE == bRet || NULL == pstPerIoData ){ if (::WSAGetLastError()!=ERROR_IO_PENDING){INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"), MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());poSock->OnClose();} }else{ //判断类型switch(pstPerIoData->nOp){case IOCP_RECV:{poSock->DecPostRecv();if (dwByteTrabsferred > 0){poSock->OnRecv(dwByteTrabsferred); //处理接收到的数据}else{INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it, socket :%d "), MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError(), poSock->GetSock());poSock->OnClose();}}break;case IOCP_SEND:{poSock->DecPostSend();if (dwByteTrabsferred > 0){poSock->OnSend(dwByteTrabsferred); //处理发送的数据}else{INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"), MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());poSock->OnClose();}}break;case IOCP_CLOSE:{poSock->OnClose(false); //关闭连接}break;default:;}}}}
}
//提供一个入口,供其它socket绑定完成端口
bool CIocpCtrl::AssociateWithIocp(SOCKET hSock, SPerHandleData* pstData)
{if (NULL == m_hCompletionPort){return false;}if(NULL == CreateIoCompletionPort((HANDLE)hSock, m_hCompletionPort, (ULONG_PTR)pstData, 0)){WARN(_SDT("CIocpCtrl::AssociateWithIocp, failed, errno %d"), WSAGetLastError());return false;}return true;
}
下面就是一个接收处理:
void CCpListener::OnAccept(BOOL bSucc, SPerIoData* pstPerIoData)
{SOCKET hSock = pstPerIoData->hSock; //取出socketif(false == m_bStart){InterlockedIncrement((volatile LONG*)&m_dwReleaseCount);closesocket(hSock);return;}this->PostAcceptEx(pstPerIoData); //接收一个连接之后,就要顺道再投递一个AcceptEx if(FALSE == bSucc){WARN(_SDT("CCpListener::OnAccept, accept failed, errno %d"), ::WSAGetLastError());closesocket(hSock);}else{//下面就是封装这个连接,以及在这个新连接上投递接收请求CConnData* pConnData = CConnDataMgr::Instance()->Alloc(m_dwRecvBufSize, m_dwSendBufSize);CCPSock * poSock = &pConnData->sock; CUCConnection* poConnection = & pConnData->connection;//+lcjif (0 != ::setsockopt(hSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&m_hListenSock, sizeof(SOCKET))){WARN(_SDT("setsockopt for new socket on UpdateConetext failed, errno=%d"), ::WSAGetLastError());}if (g_bNodelay){const CHAR szOpt = 1;if (0 != ::setsockopt(hSock, IPPROTO_TCP, TCP_NODELAY, (char *)&szOpt, sizeof(char))){WARN(_SDT("setsockopt for new socket on UpdateConetext failed, errno=%d"), ::WSAGetLastError());}}poSock->SetSock(hSock);poSock->SetPacketParser(m_poPacketParser);poConnection->SetAccept(true);poConnection->SetParentID(0);sockaddr_in RemoteAddr, LocalAddr;GetSockAddress(pstPerIoData, RemoteAddr, LocalAddr);poConnection->SetLocalIP(LocalAddr.sin_addr.s_addr);poConnection->SetLocalPort(SDNtohs(LocalAddr.sin_port));poConnection->SetRemoteIP(RemoteAddr.sin_addr.s_addr);poConnection->SetRemotePort(SDNtohs(RemoteAddr.sin_port));//注意这里,session和Connection连接起来ISDSession* poSession = m_poSessionFactory->CreateSession(poConnection);if(NULL == poSession){DBG(_SDT("CCpListener::OnAccept, CreateSession failed"));closesocket(hSock);CConnDataMgr::Instance()->Release(pConnData);return;}poConnection->SetSession(poSession);poSock->SetConnect(TRUE);
#ifndef SDENT_HAS_RECV_QUEUEpoConnection->OnAssociate();
#endifif(false == poSock->AssociateWithIocp()) //将新接收的socket绑定完成端口{poSock->DoClose();}else{if(false == poSock->PostRecv()) //投递一个接收请求{poSock->DoClose();}}}
FR:海涛高软(hunk Xu)