L o a d i n g . . .
主打一个C++
文章详情

iocp类封装以及调用示例

Posted on 2022-02-03 16:59:28 by 主打一个C++

CIocpServer.h


#pragma once

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <functional>
#include <memory>
#include <thread>
#include <ws2tcpip.h>
#include <ctime>
#include <string>
#include <iostream>

#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")

class CIocpServer {
public:
    // 操作类型枚举
    enum class OperationType {
        ACCEPT,
        RECV,
        SEND
    };

    // 重叠结构体,用于异步操作
    struct OverlappedEx {
        OVERLAPPED overlapped;
        OperationType operationType;
        SOCKET socket;
        WSABUF wsaBuf;
        char buffer[4096];
        int bytesTransferred;
    };

    // 客户端连接信息
    struct ClientContext {
        SOCKET socket;
        sockaddr_in clientAddr;
        std::vector<char> recvBuffer;
        std::mutex sendMutex;
    };

    // 回调函数类型定义
    using OnConnectCallback = std::function<void(SOCKET, const sockaddr_in&)>;
    using OnDisconnectCallback = std::function<void(SOCKET)>;
    using OnRecvCallback = std::function<void(SOCKET, const char*, int)>;

public:
    CIocpServer();
    ~CIocpServer();

    // 初始化服务器
    bool Initialize(int port, int maxConnections = 1000);
    
    // 启动服务器
    bool Start(int workerThreads = 0);
    
    // 停止服务器
    void Stop();
    
    // 发送数据
    bool Send(SOCKET clientSocket, const char* data, int length);
    
    // 关闭客户端连接
    void CloseClient(SOCKET clientSocket);
    
    // 设置回调函数
    void SetOnConnectCallback(OnConnectCallback callback) { m_onConnect = callback; }
    void SetOnDisconnectCallback(OnDisconnectCallback callback) { m_onDisconnect = callback; }
    void SetOnRecvCallback(OnRecvCallback callback) { m_onRecv = callback; }

public:
    // 格式化客户端地址
    std::string FormatAddress(const sockaddr_in& addr) {
        char buffer[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &addr.sin_addr, buffer, INET_ADDRSTRLEN);
        return std::string(buffer) + ":" + std::to_string(ntohs(addr.sin_port));
    }
    // 获取当前时间字符串
    std::string GetTimeString() {
        time_t now = time(nullptr);
        struct tm timeinfo;
        char buffer[80];
        localtime_s(&timeinfo, &now);
        strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &timeinfo);
        return buffer;
    }
private:
    // 工作线程函数
    void WorkerThread();
    
    // 投递接受连接请求
    bool PostAccept();
    
    // 投递接收数据请求
    bool PostRecv(SOCKET clientSocket);
    
    // 处理接受连接完成
    void HandleAccept(OverlappedEx* overlapped, DWORD bytesTransferred);
    
    // 处理接收数据完成
    void HandleRecv(OverlappedEx* overlapped, DWORD bytesTransferred);
    
    // 处理发送数据完成
    void HandleSend(OverlappedEx* overlapped, DWORD bytesTransferred);

private:
    SOCKET m_listenSocket;                                  // 监听套接字
    HANDLE m_ioCompletionPort;                              // 完成端口句柄
    std::vector<std::thread> m_workerThreads;               // 工作线程
    bool m_running;                                         // 运行标志
    
    std::unordered_map<SOCKET, std::shared_ptr<ClientContext>> m_clients;  // 客户端连接
    std::mutex m_clientsMutex;                              // 客户端连接互斥锁
    
    LPFN_ACCEPTEX m_acceptEx;                               // AcceptEx 函数指针
    LPFN_GETACCEPTEXSOCKADDRS m_getAcceptExSockAddrs;       // GetAcceptExSockaddrs 函数指针
    
    OnConnectCallback m_onConnect;                          // 连接回调
    OnDisconnectCallback m_onDisconnect;                    // 断开连接回调
    OnRecvCallback m_onRecv;                                // 接收数据回调
}; 

CIocpServer.cpp

#include "CIocpServer.h"
#include <iostream>

CIocpServer::CIocpServer() 
    : m_listenSocket(INVALID_SOCKET)
    , m_ioCompletionPort(NULL)
    , m_running(false)
    , m_acceptEx(nullptr)
    , m_getAcceptExSockAddrs(nullptr)
{
}

CIocpServer::~CIocpServer() {
    Stop();
}

bool CIocpServer::Initialize(int port, int maxConnections) {
    // 初始化 Winsock
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "WSAStartup failed" << std::endl;
        return false;
    }

    // 创建监听套接字
    m_listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (m_listenSocket == INVALID_SOCKET) {
        std::cerr << "Create listen socket failed: " << WSAGetLastError() << std::endl;
        WSACleanup();
        return false;
    }

    // 绑定地址
    sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serverAddr.sin_port = htons(port);

    if (bind(m_listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        std::cerr << "Bind failed: " << WSAGetLastError() << std::endl;
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    // 开始监听
    if (listen(m_listenSocket, SOMAXCONN) == SOCKET_ERROR) {
        std::cerr << "Listen failed: " << WSAGetLastError() << std::endl;
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    // 创建完成端口
    m_ioCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (m_ioCompletionPort == NULL) {
        std::cerr << "Create IOCP failed: " << GetLastError() << std::endl;
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    // 将监听套接字关联到完成端口
    if (CreateIoCompletionPort((HANDLE)m_listenSocket, m_ioCompletionPort, (ULONG_PTR)m_listenSocket, 0) == NULL) {
        std::cerr << "Associate listen socket to IOCP failed: " << GetLastError() << std::endl;
        CloseHandle(m_ioCompletionPort);
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    // 获取 AcceptEx 和 GetAcceptExSockaddrs 函数指针
    GUID guidAcceptEx = WSAID_ACCEPTEX;
    GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
    DWORD dwBytes = 0;

    if (WSAIoctl(m_listenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidAcceptEx, sizeof(guidAcceptEx),
        &m_acceptEx, sizeof(m_acceptEx),
        &dwBytes, NULL, NULL) == SOCKET_ERROR) {
        std::cerr << "Get AcceptEx function failed: " << WSAGetLastError() << std::endl;
        CloseHandle(m_ioCompletionPort);
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    if (WSAIoctl(m_listenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidGetAcceptExSockAddrs, sizeof(guidGetAcceptExSockAddrs),
        &m_getAcceptExSockAddrs, sizeof(m_getAcceptExSockAddrs),
        &dwBytes, NULL, NULL) == SOCKET_ERROR) {
        std::cerr << "Get GetAcceptExSockaddrs function failed: " << WSAGetLastError() << std::endl;
        CloseHandle(m_ioCompletionPort);
        closesocket(m_listenSocket);
        WSACleanup();
        return false;
    }

    return true;
}

bool CIocpServer::Start(int workerThreads) {
    if (m_running) {
        return false;
    }

    m_running = true;

    // 确定工作线程数量
    if (workerThreads <= 0) {
        SYSTEM_INFO sysInfo;
        GetSystemInfo(&sysInfo);
        workerThreads = sysInfo.dwNumberOfProcessors * 2;
    }

    // 创建工作线程
    for (int i = 0; i < workerThreads; i++) {
        m_workerThreads.emplace_back(&CIocpServer::WorkerThread, this);
    }

    // 投递初始的接受连接请求
    if (!PostAccept()) {
        Stop();
        return false;
    }

    std::cout << "IOCP Server started with " << workerThreads << " worker threads" << std::endl;
    return true;
}

void CIocpServer::Stop() {
    if (!m_running) {
        return;
    }

    m_running = false;

    // 关闭所有客户端连接
    {
        std::lock_guard<std::mutex> lock(m_clientsMutex);
        for (auto& client : m_clients) {
            closesocket(client.first);
        }
        m_clients.clear();
    }

    // 关闭完成端口,使工作线程退出
    if (m_ioCompletionPort != NULL) {
        CloseHandle(m_ioCompletionPort);
        m_ioCompletionPort = NULL;
    }

    // 等待所有工作线程退出
    for (auto& thread : m_workerThreads) {
        if (thread.joinable()) {
            thread.join();
        }
    }
    m_workerThreads.clear();

    // 关闭监听套接字
    if (m_listenSocket != INVALID_SOCKET) {
        closesocket(m_listenSocket);
        m_listenSocket = INVALID_SOCKET;
    }

    WSACleanup();
    std::cout << "IOCP Server stopped" << std::endl;
}

bool CIocpServer::Send(SOCKET clientSocket, const char* data, int length) {
    std::shared_ptr<ClientContext> client;
    {
        std::lock_guard<std::mutex> lock(m_clientsMutex);
        auto it = m_clients.find(clientSocket);
        if (it == m_clients.end()) {
            return false;
        }
        client = it->second;
    }

    std::lock_guard<std::mutex> lock(client->sendMutex);

    // 创建重叠结构
    OverlappedEx* overlapped = new OverlappedEx();
    ZeroMemory(overlapped, sizeof(OverlappedEx));
    overlapped->operationType = OperationType::SEND;
    overlapped->socket = clientSocket;

    // 复制数据到缓冲区
    memcpy(overlapped->buffer, data, length);
    overlapped->wsaBuf.buf = overlapped->buffer;
    overlapped->wsaBuf.len = length;

    // 发送数据
    DWORD bytesSent = 0;
    DWORD flags = 0;
    if (WSASend(clientSocket, &overlapped->wsaBuf, 1, &bytesSent, flags, 
                (LPWSAOVERLAPPED)overlapped, NULL) == SOCKET_ERROR) {
        int error = WSAGetLastError();
        if (error != WSA_IO_PENDING) {
            std::cerr << "WSASend failed: " << error << std::endl;
            delete overlapped;
            return false;
        }
    }

    return true;
}

void CIocpServer::CloseClient(SOCKET clientSocket) {
    std::shared_ptr<ClientContext> client;
    {
        std::lock_guard<std::mutex> lock(m_clientsMutex);
        auto it = m_clients.find(clientSocket);
        if (it == m_clients.end()) {
            return;
        }
        client = it->second;
        m_clients.erase(it);
    }

    // 调用断开连接回调
    if (m_onDisconnect) {
        m_onDisconnect(clientSocket);
    }

    // 关闭套接字
    closesocket(clientSocket);
}

void CIocpServer::WorkerThread() {
    while (m_running) {
        DWORD bytesTransferred = 0;
        ULONG_PTR completionKey = 0;
        LPOVERLAPPED overlapped = NULL;

        // 等待完成端口通知
        BOOL result = GetQueuedCompletionStatus(
            m_ioCompletionPort,
            &bytesTransferred,
            &completionKey,
            &overlapped,
            INFINITE);

        // 检查是否有错误或服务器已停止
        if (!m_running) {
            break;
        }

        if (overlapped == NULL) {
            continue;
        }

        // 转换为自定义重叠结构
        OverlappedEx* overlappedEx = (OverlappedEx*)overlapped;

        // 处理 I/O 操作结果
        if (!result || (result && bytesTransferred == 0 && 
            overlappedEx->operationType != OperationType::ACCEPT)) {
            // 连接已关闭或出错
            CloseClient(overlappedEx->socket);
            delete overlappedEx;
            continue;
        }

        // 根据操作类型处理
        switch (overlappedEx->operationType) {
        case OperationType::ACCEPT:
            HandleAccept(overlappedEx, bytesTransferred);
            break;
        case OperationType::RECV:
            HandleRecv(overlappedEx, bytesTransferred);
            break;
        case OperationType::SEND:
            HandleSend(overlappedEx, bytesTransferred);
            break;
        }
    }
}

bool CIocpServer::PostAccept() {
    // 创建接受套接字
    SOCKET acceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (acceptSocket == INVALID_SOCKET) {
        std::cerr << "Create accept socket failed: " << WSAGetLastError() << std::endl;
        return false;
    }

    // 创建重叠结构
    OverlappedEx* overlapped = new OverlappedEx();
    ZeroMemory(overlapped, sizeof(OverlappedEx));
    overlapped->operationType = OperationType::ACCEPT;
    overlapped->socket = acceptSocket;

    // 准备接受连接
    DWORD bytesReceived = 0;
    if (!m_acceptEx(m_listenSocket, acceptSocket, overlapped->buffer, 0,
        sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &bytesReceived, 
        (LPOVERLAPPED)overlapped)) {
        int error = WSAGetLastError();
        if (error != WSA_IO_PENDING) {
            std::cerr << "AcceptEx failed: " << error << std::endl;
            closesocket(acceptSocket);
            delete overlapped;
            return false;
        }
    }

    return true;
}

bool CIocpServer::PostRecv(SOCKET clientSocket) {
    // 创建重叠结构
    OverlappedEx* overlapped = new OverlappedEx();
    ZeroMemory(overlapped, sizeof(OverlappedEx));
    overlapped->operationType = OperationType::RECV;
    overlapped->socket = clientSocket;
    overlapped->wsaBuf.buf = overlapped->buffer;
    overlapped->wsaBuf.len = sizeof(overlapped->buffer);

    // 投递接收请求
    DWORD bytesReceived = 0;
    DWORD flags = 0;
    if (WSARecv(clientSocket, &overlapped->wsaBuf, 1, &bytesReceived, &flags, 
                (LPWSAOVERLAPPED)overlapped, NULL) == SOCKET_ERROR) {
        int error = WSAGetLastError();
        if (error != WSA_IO_PENDING) {
            std::cerr << "WSARecv failed: " << error << std::endl;
            delete overlapped;
            return false;
        }
    }

    return true;
}

void CIocpServer::HandleAccept(OverlappedEx* overlapped, DWORD bytesTransferred) {
    SOCKET clientSocket = overlapped->socket;

    // 获取客户端地址
    sockaddr_in* localAddr = NULL;
    sockaddr_in* remoteAddr = NULL;
    int localAddrLen = sizeof(sockaddr_in);
    int remoteAddrLen = sizeof(sockaddr_in);

    m_getAcceptExSockAddrs(overlapped->buffer, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,
        (sockaddr**)&localAddr, &localAddrLen, (sockaddr**)&remoteAddr, &remoteAddrLen);

    // 设置套接字选项
    setsockopt(clientSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, 
               (char*)&m_listenSocket, sizeof(m_listenSocket));

    // 将客户端套接字关联到完成端口
    if (CreateIoCompletionPort((HANDLE)clientSocket, m_ioCompletionPort, (ULONG_PTR)clientSocket, 0) == NULL) {
        std::cerr << "Associate client socket to IOCP failed: " << GetLastError() << std::endl;
        closesocket(clientSocket);
        delete overlapped;
        PostAccept();  // 继续投递接受连接请求
        return;
    }

    // 创建客户端上下文
    auto clientContext = std::make_shared<ClientContext>();
    clientContext->socket = clientSocket;
    clientContext->clientAddr = *remoteAddr;

    // 添加到客户端列表
    {
        std::lock_guard<std::mutex> lock(m_clientsMutex);
        m_clients[clientSocket] = clientContext;
    }

    // 调用连接回调
    if (m_onConnect) {
        m_onConnect(clientSocket, *remoteAddr);
    }

    // 投递接收请求
    PostRecv(clientSocket);

    // 继续投递接受连接请求
    PostAccept();

    delete overlapped;
}

void CIocpServer::HandleRecv(OverlappedEx* overlapped, DWORD bytesTransferred) {
    SOCKET clientSocket = overlapped->socket;

    // 调用接收数据回调
    if (m_onRecv) {
        m_onRecv(clientSocket, overlapped->buffer, bytesTransferred);
    }

    // 继续投递接收请求
    PostRecv(clientSocket);

    delete overlapped;
}

void CIocpServer::HandleSend(OverlappedEx* overlapped, DWORD bytesTransferred) {
    // 发送完成,释放资源
    delete overlapped;
} 

服务端测试cpp

#include "../public/CIocpServer.h"
#include <iostream>
#include <string>






int main() {
    // 创建 IOCP 服务器
    CIocpServer server;

    // 初始化服务器,监听 8888 端口
    if (!server.Initialize(1989)) {
        std::cerr << "Failed to initialize server" << std::endl;
        return 1;
    }

    // 设置连接回调
    server.SetOnConnectCallback([&server](SOCKET clientSocket, const sockaddr_in& clientAddr) {
        std::cout << "[" << server.GetTimeString() << "] Client connected: "
            << server.FormatAddress(clientAddr) << " (Socket: " << clientSocket << ")" << std::endl;

        // 发送欢迎消息
        std::string welcomeMsg = "Welcome to IOCP Server!\r\n";
        server.Send(clientSocket, welcomeMsg.c_str(), welcomeMsg.length());
        });

    // 设置断开连接回调
    server.SetOnDisconnectCallback([&server](SOCKET clientSocket) {
        std::cout << "[" << server.GetTimeString() << "] Client disconnected: Socket "
            << clientSocket << std::endl;
        });

    // 设置接收数据回调
    server.SetOnRecvCallback([&server](SOCKET clientSocket, const char* data, int length) {
        // 打印接收到的数据
        std::string message(data, length);
        std::cout << "[" << server.GetTimeString() << "] Received from Socket "
            << clientSocket << " (" << length << " bytes): " << message << std::endl;

        // 回显数据给客户端
        std::string response = "Echo: " + message;
        server.Send(clientSocket, response.c_str(), response.length());
        });

    // 启动服务器
    if (!server.Start()) {
        std::cerr << "Failed to start server" << std::endl;
        return 1;
    }

    std::cout << "Server started on port 8888. Press Enter to exit." << std::endl;

    // 等待用户按下回车键退出
    std::cin.get();

    // 停止服务器
    server.Stop();

    return 0;
}

客户端测试cpp

#include "../public/CIocpServer.h"




int main() {
    // 初始化 Winsock
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "WSAStartup failed" << std::endl;
        return 1;
    }

    // 创建套接字
    SOCKET clientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (clientSocket == INVALID_SOCKET) {
        std::cerr << "Create socket failed: " << WSAGetLastError() << std::endl;
        WSACleanup();
        return 1;
    }

    // 连接服务器
    sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(1989);
    inet_pton(AF_INET, "127.0.0.1", &serverAddr.sin_addr);

    if (connect(clientSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        std::cerr << "Connect failed: " << WSAGetLastError() << std::endl;
        closesocket(clientSocket);
        WSACleanup();
        return 1;
    }

    std::cout << "Connected to server. Type messages to send (type 'exit' to quit):" << std::endl;

    // 接收欢迎消息
    char recvBuffer[4096];
    int bytesReceived = recv(clientSocket, recvBuffer, sizeof(recvBuffer), 0);
    if (bytesReceived > 0) {
        std::cout << "Server: " << std::string(recvBuffer, bytesReceived) << std::endl;
    }

    // 发送和接收消息
    std::string message;
    while (true) {
        std::cout << "> ";
        std::getline(std::cin, message);

        if (message == "exit") {
            break;
        }

        // 发送消息
        if (send(clientSocket, message.c_str(), message.length(), 0) == SOCKET_ERROR) {
            std::cerr << "Send failed: " << WSAGetLastError() << std::endl;
            break;
        }

        // 接收响应
        bytesReceived = recv(clientSocket, recvBuffer, sizeof(recvBuffer), 0);
        if (bytesReceived <= 0) {
            std::cerr << "Connection closed" << std::endl;
            break;
        }

        std::cout << "Server: " << std::string(recvBuffer, bytesReceived);
    }

    // 关闭连接
    closesocket(clientSocket);
    WSACleanup();

    return 0;
}


*转载请注明出处:原文链接:https://cpp.vin/page/131.html

作者近期文章
提示
×
确定
数据库执行: 8次 总耗时: 0.01s
页面加载耗时: 



wechat +447752296473
wechat cpp-blog