简介

前文已经介绍了异步操作的api,今天写一个简单的异步echo服务器,以应答为主

Session类

Session类主要是处理客户端消息收发的会话类,为了简单起见,我们不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。

class Session
{
public:
    Session(boost::asio::io_context& ioc):_socket(ioc){
    }
    tcp::socket& Socket() {
        return _socket;
    }

    void Start();
private:
    void handle_read(const boost::system::error_code & error, size_t bytes_transfered);
    void handle_write(const boost::system::error_code& error);
    tcp::socket _socket;
    enum {max_length = 1024};
    char _data[max_length];
};

1   _data用来接收客户端传递的数据 2   _socket为单独处理客户端读写的socket。 3   handle_read和handle_write分别为读回调函数和写回调函数。 接下来我们实现Session类

void Session::Start(){
    memset(_data, 0, max_length);
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
        std::bind(&Session::handle_read, this, placeholders::_1,
            placeholders::_2)
    );
}

在Start方法中我们调用异步读操作,监听对端发送的消息。当对端发送数据后,触发handle_read函数

void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {

    if (!error) {
        cout << "server receive data is " << _data << endl;
        boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered), 
            std::bind(&Session::handle_write, this, placeholders::_1));
    }
    else {
        delete this;
    }
}

handle_read函数内将收到的数据发送给对端,当发送完成后触发handle_write回调函数。

void Session::handle_write(const boost::system::error_code& error) {
    if (!error) {
        memset(_data, 0, max_length);
        _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,
            this, placeholders::_1, placeholders::_2));
    }
    else {
        delete this;
    }
}

handle_write函数内又一次监听了读事件,如果对端有数据发送过来则触发handle_read,我们再将收到的数据发回去。从而达到应答式服务的效果。

Server类

Server类为服务器接收连接的管理类

class Server {
public:
    Server(boost::asio::io_context& ioc, short port);
private:
    void start_accept();
    void handle_accept(Session* new_session, const boost::system::error_code& error);
    boost::asio::io_context& _ioc;
    tcp::acceptor _acceptor;
};

start_accept将要接收连接的acceptor绑定到服务上,其内部就是将accpeptor对应的socket描述符绑定到epoll或iocp模型上,实现事件驱动。 handle_accept为新连接到来后触发的回调函数。 下面是具体实现

Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
    start_accept();
}

void Server::start_accept() {
    Session* new_session = new Session(_ioc);
    _acceptor.async_accept(new_session->Socket(),
        std::bind(&Server::handle_accept, this, new_session, placeholders::_1));
}
void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {
    if (!error) {
        new_session->Start();
    }
    else {
        delete new_session;
    }

    start_accept();
}

客户端

客户端的设计用之前的同步模式即可,客户端不需要异步的方式,因为客户端并不是以并发为主,当然写成异步收发更好一些。

#include <iostream>
#include <boost/asio.hpp>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024;
int main()
{
    try {
        //创建上下文服务
        boost::asio::io_context   ioc;
        //构造endpoint
        tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 10086);
        tcp::socket  sock(ioc);
        boost::system::error_code   error = boost::asio::error::host_not_found; ;
        sock.connect(remote_ep, error);
        if (error) {
            cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
            return 0;
        }

        std::cout << "Enter message: ";
        char request[MAX_LENGTH];
        std::cin.getline(request, MAX_LENGTH);
        size_t request_length = strlen(request);
        boost::asio::write(sock, boost::asio::buffer(request, request_length));

        char reply[MAX_LENGTH];
        size_t reply_length = boost::asio::read(sock,
            boost::asio::buffer(reply, request_length));
        std::cout << "Reply is: ";
        std::cout.write(reply, reply_length);
        std::cout << "\n";
    }
    catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }
    return 0;
}

运行服务器之后再运行客户端,输入字符串后,就可以收到服务器应答的字符串了。

隐患

该demo示例为仿照asio官网编写的,其中存在隐患,就是当服务器即将发送数据前(调用async_write前),此刻客户端中断,服务器此时调用async_write会触发发送回调函数,判断ec为非0进而执行delete this逻辑回收session。但要注意的是客户端关闭后,在tcp层面会触发读就绪事件,服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0,进而再次执行delete操作,从而造成二次析构,这是极度危险的。

总结

本文介绍了异步的应答服务器设计,但是这种服务器并不会在实际生产中使用,主要有两个原因: 1   因为该服务器的发送和接收以应答的方式交互,而并不能做到应用层想随意发送的目的,也就是未做到完全的收发分离(全双工逻辑)。 2   该服务器未处理粘包,序列化,以及逻辑和收发线程解耦等问题。 3   该服务器存在二次析构的风险。 这些问题我们会在接下来的文章中不断完善 源码链接 https://gitee.com/secondtonone1/boostasio-learn

results matching ""

    No results matching ""