文章详情
iocp类封装以及调用示例
Posted on 2022-02-03 16:59:28 by 主打一个C++
CIocpServer.h
#pragma once
#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <functional>
#include <memory>
#include <thread>
#include <ws2tcpip.h>
#include <ctime>
#include <string>
#include <iostream>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
class CIocpServer {
public:
// 操作类型枚举
enum class OperationType {
ACCEPT,
RECV,
SEND
};
// 重叠结构体,用于异步操作
struct OverlappedEx {
OVERLAPPED overlapped;
OperationType operationType;
SOCKET socket;
WSABUF wsaBuf;
char buffer[4096];
int bytesTransferred;
};
// 客户端连接信息
struct ClientContext {
SOCKET socket;
sockaddr_in clientAddr;
std::vector<char> recvBuffer;
std::mutex sendMutex;
};
// 回调函数类型定义
using OnConnectCallback = std::function<void(SOCKET, const sockaddr_in&)>;
using OnDisconnectCallback = std::function<void(SOCKET)>;
using OnRecvCallback = std::function<void(SOCKET, const char*, int)>;
public:
CIocpServer();
~CIocpServer();
// 初始化服务器
bool Initialize(int port, int maxConnections = 1000);
// 启动服务器
bool Start(int workerThreads = 0);
// 停止服务器
void Stop();
// 发送数据
bool Send(SOCKET clientSocket, const char* data, int length);
// 关闭客户端连接
void CloseClient(SOCKET clientSocket);
// 设置回调函数
void SetOnConnectCallback(OnConnectCallback callback) { m_onConnect = callback; }
void SetOnDisconnectCallback(OnDisconnectCallback callback) { m_onDisconnect = callback; }
void SetOnRecvCallback(OnRecvCallback callback) { m_onRecv = callback; }
public:
// 格式化客户端地址
std::string FormatAddress(const sockaddr_in& addr) {
char buffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &addr.sin_addr, buffer, INET_ADDRSTRLEN);
return std::string(buffer) + ":" + std::to_string(ntohs(addr.sin_port));
}
// 获取当前时间字符串
std::string GetTimeString() {
time_t now = time(nullptr);
struct tm timeinfo;
char buffer[80];
localtime_s(&timeinfo, &now);
strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &timeinfo);
return buffer;
}
private:
// 工作线程函数
void WorkerThread();
// 投递接受连接请求
bool PostAccept();
// 投递接收数据请求
bool PostRecv(SOCKET clientSocket);
// 处理接受连接完成
void HandleAccept(OverlappedEx* overlapped, DWORD bytesTransferred);
// 处理接收数据完成
void HandleRecv(OverlappedEx* overlapped, DWORD bytesTransferred);
// 处理发送数据完成
void HandleSend(OverlappedEx* overlapped, DWORD bytesTransferred);
private:
SOCKET m_listenSocket; // 监听套接字
HANDLE m_ioCompletionPort; // 完成端口句柄
std::vector<std::thread> m_workerThreads; // 工作线程
bool m_running; // 运行标志
std::unordered_map<SOCKET, std::shared_ptr<ClientContext>> m_clients; // 客户端连接
std::mutex m_clientsMutex; // 客户端连接互斥锁
LPFN_ACCEPTEX m_acceptEx; // AcceptEx 函数指针
LPFN_GETACCEPTEXSOCKADDRS m_getAcceptExSockAddrs; // GetAcceptExSockaddrs 函数指针
OnConnectCallback m_onConnect; // 连接回调
OnDisconnectCallback m_onDisconnect; // 断开连接回调
OnRecvCallback m_onRecv; // 接收数据回调
};
CIocpServer.cpp
#include "CIocpServer.h"
#include <iostream>
CIocpServer::CIocpServer()
: m_listenSocket(INVALID_SOCKET)
, m_ioCompletionPort(NULL)
, m_running(false)
, m_acceptEx(nullptr)
, m_getAcceptExSockAddrs(nullptr)
{
}
CIocpServer::~CIocpServer() {
Stop();
}
bool CIocpServer::Initialize(int port, int maxConnections) {
// 初始化 Winsock
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "WSAStartup failed" << std::endl;
return false;
}
// 创建监听套接字
m_listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (m_listenSocket == INVALID_SOCKET) {
std::cerr << "Create listen socket failed: " << WSAGetLastError() << std::endl;
WSACleanup();
return false;
}
// 绑定地址
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddr.sin_port = htons(port);
if (bind(m_listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
std::cerr << "Bind failed: " << WSAGetLastError() << std::endl;
closesocket(m_listenSocket);
WSACleanup();
return false;
}
// 开始监听
if (listen(m_listenSocket, SOMAXCONN) == SOCKET_ERROR) {
std::cerr << "Listen failed: " << WSAGetLastError() << std::endl;
closesocket(m_listenSocket);
WSACleanup();
return false;
}
// 创建完成端口
m_ioCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_ioCompletionPort == NULL) {
std::cerr << "Create IOCP failed: " << GetLastError() << std::endl;
closesocket(m_listenSocket);
WSACleanup();
return false;
}
// 将监听套接字关联到完成端口
if (CreateIoCompletionPort((HANDLE)m_listenSocket, m_ioCompletionPort, (ULONG_PTR)m_listenSocket, 0) == NULL) {
std::cerr << "Associate listen socket to IOCP failed: " << GetLastError() << std::endl;
CloseHandle(m_ioCompletionPort);
closesocket(m_listenSocket);
WSACleanup();
return false;
}
// 获取 AcceptEx 和 GetAcceptExSockaddrs 函数指针
GUID guidAcceptEx = WSAID_ACCEPTEX;
GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD dwBytes = 0;
if (WSAIoctl(m_listenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx, sizeof(guidAcceptEx),
&m_acceptEx, sizeof(m_acceptEx),
&dwBytes, NULL, NULL) == SOCKET_ERROR) {
std::cerr << "Get AcceptEx function failed: " << WSAGetLastError() << std::endl;
CloseHandle(m_ioCompletionPort);
closesocket(m_listenSocket);
WSACleanup();
return false;
}
if (WSAIoctl(m_listenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockAddrs, sizeof(guidGetAcceptExSockAddrs),
&m_getAcceptExSockAddrs, sizeof(m_getAcceptExSockAddrs),
&dwBytes, NULL, NULL) == SOCKET_ERROR) {
std::cerr << "Get GetAcceptExSockaddrs function failed: " << WSAGetLastError() << std::endl;
CloseHandle(m_ioCompletionPort);
closesocket(m_listenSocket);
WSACleanup();
return false;
}
return true;
}
bool CIocpServer::Start(int workerThreads) {
if (m_running) {
return false;
}
m_running = true;
// 确定工作线程数量
if (workerThreads <= 0) {
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
workerThreads = sysInfo.dwNumberOfProcessors * 2;
}
// 创建工作线程
for (int i = 0; i < workerThreads; i++) {
m_workerThreads.emplace_back(&CIocpServer::WorkerThread, this);
}
// 投递初始的接受连接请求
if (!PostAccept()) {
Stop();
return false;
}
std::cout << "IOCP Server started with " << workerThreads << " worker threads" << std::endl;
return true;
}
void CIocpServer::Stop() {
if (!m_running) {
return;
}
m_running = false;
// 关闭所有客户端连接
{
std::lock_guard<std::mutex> lock(m_clientsMutex);
for (auto& client : m_clients) {
closesocket(client.first);
}
m_clients.clear();
}
// 关闭完成端口,使工作线程退出
if (m_ioCompletionPort != NULL) {
CloseHandle(m_ioCompletionPort);
m_ioCompletionPort = NULL;
}
// 等待所有工作线程退出
for (auto& thread : m_workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
m_workerThreads.clear();
// 关闭监听套接字
if (m_listenSocket != INVALID_SOCKET) {
closesocket(m_listenSocket);
m_listenSocket = INVALID_SOCKET;
}
WSACleanup();
std::cout << "IOCP Server stopped" << std::endl;
}
bool CIocpServer::Send(SOCKET clientSocket, const char* data, int length) {
std::shared_ptr<ClientContext> client;
{
std::lock_guard<std::mutex> lock(m_clientsMutex);
auto it = m_clients.find(clientSocket);
if (it == m_clients.end()) {
return false;
}
client = it->second;
}
std::lock_guard<std::mutex> lock(client->sendMutex);
// 创建重叠结构
OverlappedEx* overlapped = new OverlappedEx();
ZeroMemory(overlapped, sizeof(OverlappedEx));
overlapped->operationType = OperationType::SEND;
overlapped->socket = clientSocket;
// 复制数据到缓冲区
memcpy(overlapped->buffer, data, length);
overlapped->wsaBuf.buf = overlapped->buffer;
overlapped->wsaBuf.len = length;
// 发送数据
DWORD bytesSent = 0;
DWORD flags = 0;
if (WSASend(clientSocket, &overlapped->wsaBuf, 1, &bytesSent, flags,
(LPWSAOVERLAPPED)overlapped, NULL) == SOCKET_ERROR) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
std::cerr << "WSASend failed: " << error << std::endl;
delete overlapped;
return false;
}
}
return true;
}
void CIocpServer::CloseClient(SOCKET clientSocket) {
std::shared_ptr<ClientContext> client;
{
std::lock_guard<std::mutex> lock(m_clientsMutex);
auto it = m_clients.find(clientSocket);
if (it == m_clients.end()) {
return;
}
client = it->second;
m_clients.erase(it);
}
// 调用断开连接回调
if (m_onDisconnect) {
m_onDisconnect(clientSocket);
}
// 关闭套接字
closesocket(clientSocket);
}
void CIocpServer::WorkerThread() {
while (m_running) {
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = 0;
LPOVERLAPPED overlapped = NULL;
// 等待完成端口通知
BOOL result = GetQueuedCompletionStatus(
m_ioCompletionPort,
&bytesTransferred,
&completionKey,
&overlapped,
INFINITE);
// 检查是否有错误或服务器已停止
if (!m_running) {
break;
}
if (overlapped == NULL) {
continue;
}
// 转换为自定义重叠结构
OverlappedEx* overlappedEx = (OverlappedEx*)overlapped;
// 处理 I/O 操作结果
if (!result || (result && bytesTransferred == 0 &&
overlappedEx->operationType != OperationType::ACCEPT)) {
// 连接已关闭或出错
CloseClient(overlappedEx->socket);
delete overlappedEx;
continue;
}
// 根据操作类型处理
switch (overlappedEx->operationType) {
case OperationType::ACCEPT:
HandleAccept(overlappedEx, bytesTransferred);
break;
case OperationType::RECV:
HandleRecv(overlappedEx, bytesTransferred);
break;
case OperationType::SEND:
HandleSend(overlappedEx, bytesTransferred);
break;
}
}
}
bool CIocpServer::PostAccept() {
// 创建接受套接字
SOCKET acceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (acceptSocket == INVALID_SOCKET) {
std::cerr << "Create accept socket failed: " << WSAGetLastError() << std::endl;
return false;
}
// 创建重叠结构
OverlappedEx* overlapped = new OverlappedEx();
ZeroMemory(overlapped, sizeof(OverlappedEx));
overlapped->operationType = OperationType::ACCEPT;
overlapped->socket = acceptSocket;
// 准备接受连接
DWORD bytesReceived = 0;
if (!m_acceptEx(m_listenSocket, acceptSocket, overlapped->buffer, 0,
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &bytesReceived,
(LPOVERLAPPED)overlapped)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
std::cerr << "AcceptEx failed: " << error << std::endl;
closesocket(acceptSocket);
delete overlapped;
return false;
}
}
return true;
}
bool CIocpServer::PostRecv(SOCKET clientSocket) {
// 创建重叠结构
OverlappedEx* overlapped = new OverlappedEx();
ZeroMemory(overlapped, sizeof(OverlappedEx));
overlapped->operationType = OperationType::RECV;
overlapped->socket = clientSocket;
overlapped->wsaBuf.buf = overlapped->buffer;
overlapped->wsaBuf.len = sizeof(overlapped->buffer);
// 投递接收请求
DWORD bytesReceived = 0;
DWORD flags = 0;
if (WSARecv(clientSocket, &overlapped->wsaBuf, 1, &bytesReceived, &flags,
(LPWSAOVERLAPPED)overlapped, NULL) == SOCKET_ERROR) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
std::cerr << "WSARecv failed: " << error << std::endl;
delete overlapped;
return false;
}
}
return true;
}
void CIocpServer::HandleAccept(OverlappedEx* overlapped, DWORD bytesTransferred) {
SOCKET clientSocket = overlapped->socket;
// 获取客户端地址
sockaddr_in* localAddr = NULL;
sockaddr_in* remoteAddr = NULL;
int localAddrLen = sizeof(sockaddr_in);
int remoteAddrLen = sizeof(sockaddr_in);
m_getAcceptExSockAddrs(overlapped->buffer, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,
(sockaddr**)&localAddr, &localAddrLen, (sockaddr**)&remoteAddr, &remoteAddrLen);
// 设置套接字选项
setsockopt(clientSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char*)&m_listenSocket, sizeof(m_listenSocket));
// 将客户端套接字关联到完成端口
if (CreateIoCompletionPort((HANDLE)clientSocket, m_ioCompletionPort, (ULONG_PTR)clientSocket, 0) == NULL) {
std::cerr << "Associate client socket to IOCP failed: " << GetLastError() << std::endl;
closesocket(clientSocket);
delete overlapped;
PostAccept(); // 继续投递接受连接请求
return;
}
// 创建客户端上下文
auto clientContext = std::make_shared<ClientContext>();
clientContext->socket = clientSocket;
clientContext->clientAddr = *remoteAddr;
// 添加到客户端列表
{
std::lock_guard<std::mutex> lock(m_clientsMutex);
m_clients[clientSocket] = clientContext;
}
// 调用连接回调
if (m_onConnect) {
m_onConnect(clientSocket, *remoteAddr);
}
// 投递接收请求
PostRecv(clientSocket);
// 继续投递接受连接请求
PostAccept();
delete overlapped;
}
void CIocpServer::HandleRecv(OverlappedEx* overlapped, DWORD bytesTransferred) {
SOCKET clientSocket = overlapped->socket;
// 调用接收数据回调
if (m_onRecv) {
m_onRecv(clientSocket, overlapped->buffer, bytesTransferred);
}
// 继续投递接收请求
PostRecv(clientSocket);
delete overlapped;
}
void CIocpServer::HandleSend(OverlappedEx* overlapped, DWORD bytesTransferred) {
// 发送完成,释放资源
delete overlapped;
}
服务端测试cpp
#include "../public/CIocpServer.h"
#include <iostream>
#include <string>
int main() {
// 创建 IOCP 服务器
CIocpServer server;
// 初始化服务器,监听 8888 端口
if (!server.Initialize(1989)) {
std::cerr << "Failed to initialize server" << std::endl;
return 1;
}
// 设置连接回调
server.SetOnConnectCallback([&server](SOCKET clientSocket, const sockaddr_in& clientAddr) {
std::cout << "[" << server.GetTimeString() << "] Client connected: "
<< server.FormatAddress(clientAddr) << " (Socket: " << clientSocket << ")" << std::endl;
// 发送欢迎消息
std::string welcomeMsg = "Welcome to IOCP Server!\r\n";
server.Send(clientSocket, welcomeMsg.c_str(), welcomeMsg.length());
});
// 设置断开连接回调
server.SetOnDisconnectCallback([&server](SOCKET clientSocket) {
std::cout << "[" << server.GetTimeString() << "] Client disconnected: Socket "
<< clientSocket << std::endl;
});
// 设置接收数据回调
server.SetOnRecvCallback([&server](SOCKET clientSocket, const char* data, int length) {
// 打印接收到的数据
std::string message(data, length);
std::cout << "[" << server.GetTimeString() << "] Received from Socket "
<< clientSocket << " (" << length << " bytes): " << message << std::endl;
// 回显数据给客户端
std::string response = "Echo: " + message;
server.Send(clientSocket, response.c_str(), response.length());
});
// 启动服务器
if (!server.Start()) {
std::cerr << "Failed to start server" << std::endl;
return 1;
}
std::cout << "Server started on port 8888. Press Enter to exit." << std::endl;
// 等待用户按下回车键退出
std::cin.get();
// 停止服务器
server.Stop();
return 0;
}
客户端测试cpp
#include "../public/CIocpServer.h"
int main() {
// 初始化 Winsock
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "WSAStartup failed" << std::endl;
return 1;
}
// 创建套接字
SOCKET clientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (clientSocket == INVALID_SOCKET) {
std::cerr << "Create socket failed: " << WSAGetLastError() << std::endl;
WSACleanup();
return 1;
}
// 连接服务器
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(1989);
inet_pton(AF_INET, "127.0.0.1", &serverAddr.sin_addr);
if (connect(clientSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
std::cerr << "Connect failed: " << WSAGetLastError() << std::endl;
closesocket(clientSocket);
WSACleanup();
return 1;
}
std::cout << "Connected to server. Type messages to send (type 'exit' to quit):" << std::endl;
// 接收欢迎消息
char recvBuffer[4096];
int bytesReceived = recv(clientSocket, recvBuffer, sizeof(recvBuffer), 0);
if (bytesReceived > 0) {
std::cout << "Server: " << std::string(recvBuffer, bytesReceived) << std::endl;
}
// 发送和接收消息
std::string message;
while (true) {
std::cout << "> ";
std::getline(std::cin, message);
if (message == "exit") {
break;
}
// 发送消息
if (send(clientSocket, message.c_str(), message.length(), 0) == SOCKET_ERROR) {
std::cerr << "Send failed: " << WSAGetLastError() << std::endl;
break;
}
// 接收响应
bytesReceived = recv(clientSocket, recvBuffer, sizeof(recvBuffer), 0);
if (bytesReceived <= 0) {
std::cerr << "Connection closed" << std::endl;
break;
}
std::cout << "Server: " << std::string(recvBuffer, bytesReceived);
}
// 关闭连接
closesocket(clientSocket);
WSACleanup();
return 0;
}
*转载请注明出处:原文链接:https://cpp.vin/page/131.html