简介

本文概述基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。

服务器架构设计

之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图

https://cdn.llfc.club/1685620269385.jpg

我们接下来要设计的服务器结构是这样的

https://cdn.llfc.club/1685621283099.jpg

消息头完善

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。 之前我们设计的消息结构是这样的

https://cdn.llfc.club/1683368829739.jpg

现在将其完善为如下的样子

https://cdn.llfc.club/1683367901552.jpg

为了减少耦合和歧义,我们重新设计消息节点。 MsgNode表示消息节点的基类,头部的消息用这个结构存储。 RecvNode表示接收消息的节点。 SendNode表示发送消息的节点。 我们将上述结构定义在MsgNode.h中

class MsgNode
{
public:
    MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
        _data = new char[_total_len + 1]();
        _data[_total_len] = '\0';
    }

    ~MsgNode() {
        std::cout << "destruct MsgNode" << endl;
        delete[] _data;
    }

    void Clear() {
        ::memset(_data, 0, _total_len);
        _cur_len = 0;
    }

    short _cur_len;
    short _total_len;
    char* _data;
};

class RecvNode :public MsgNode {
public:
    RecvNode(short max_len, short msg_id);
private:
    short _msg_id;
};

class SendNode:public MsgNode {
public:
    SendNode(const char* msg,short max_len, short msg_id);
private:
    short _msg_id;
};

实现如下

#include "MsgNode.h"
RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
_msg_id(msg_id){

}


SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){
    //先发送id, 转为网络字节序
    short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
    memcpy(_data, &msg_id_host, HEAD_ID_LEN);
    //转为网络字节序
    short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
    memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
    memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

SendNode发送节点构造时,先将id转为网络字节序,然后写入_data数据域。 然后将要发送数据的长度转为大端字节序,写入_data数据域,注意要偏移HEAD_ID_LEN长度。 最后将要发送的数据msg写入_data数据域,注意要偏移HEAD_ID_LEN+HEAD_DATA_LEN

Session类改写

因为消息结构改变了,所以我们接收和发送数据的逻辑要做对应的修改,我们先修改Session类中收发消息结构如下

    std::queue<shared_ptr<MsgNode> > _send_que;
    std::mutex _send_lock;
    //收到的消息结构
    std::shared_ptr<MsgNode> _recv_msg_node;
    bool _b_head_parse;
    //收到的头部结构
    std::shared_ptr<MsgNode> _recv_head_node;

因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN(4字节)大小。

CSession::CSession(boost::asio::io_context& io_context, CServer* server):
    _socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
    boost::uuids::uuid  a_uuid = boost::uuids::random_generator()();
    _uuid = boost::uuids::to_string(a_uuid);
    _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}

发送时我们构造发送节点,放到队列中即可

void CSession::Send(char* msg, short max_length, short msgid) {
    std::lock_guard<std::mutex> lock(_send_lock);
    int send_que_size = _send_que.size();
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return;
    }

    _send_que.push(make_shared<SendNode>(msg, max_length, msgid));
    if (send_que_size>0) {
        return;
    }
    auto& msgnode = _send_que.front();
    boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), 
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

当然我们也实现了一个重载版本

void CSession::Send(std::string msg, short msgid) {
    std::lock_guard<std::mutex> lock(_send_lock);
    int send_que_size = _send_que.size();
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return;
    }

    _send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
    if (send_que_size > 0) {
        return;
    }
    auto& msgnode = _send_que.front();
    boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

在接收数据时我们解析头部也要解析id字段

void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
    try {
        if (!error) {
            //已经移动的字符数
            int copy_len = 0;
            while (bytes_transferred > 0) {
                if (!_b_head_parse) {
                    //收到的数据不足头部大小
                    if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
                        memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_head_node->_cur_len += bytes_transferred;
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return;
                    }
                    //收到的数据比头部多
                    //头部剩余未复制的长度
                    int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
                    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
                    //更新已处理的data长度和剩余未处理的长度
                    copy_len += head_remain;
                    bytes_transferred -= head_remain;
                    //获取头部MSGID数据
                    short msg_id = 0;
                    memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
                    //网络字节序转化为本地字节序
                    msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
                    std::cout << "msg_id is " << msg_id << endl;
                    //id非法
                    if (msg_id > MAX_LENGTH) {
                        std::cout << "invalid msg_id is " << msg_id << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }
                    short msg_len = 0;
                    memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
                    //网络字节序转化为本地字节序
                    msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
                    std::cout << "msg_len is " << msg_len << endl;
                    //id非法
                    if (msg_len > MAX_LENGTH) {
                        std::cout << "invalid data length is " << msg_len << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }

                    _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

                    //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
                    if (bytes_transferred < msg_len) {
                        memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_msg_node->_cur_len += bytes_transferred;
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        //头部处理完成
                        _b_head_parse = true;
                        return;
                    }

                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
                    _recv_msg_node->_cur_len += msg_len;
                    copy_len += msg_len;
                    bytes_transferred -= msg_len;
                    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                    //cout << "receive data is " << _recv_msg_node->_data << endl;
                    //此处可以调用Send发送测试
                    Json::Reader reader;
                    Json::Value root;
                    reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
                    std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
                        << root["data"].asString() << endl;
                    root["data"] = "server has received msg, msg data is " + root["data"].asString();
                    std::string return_str = root.toStyledString();
                    Send(return_str, root["id"].asInt());
                    //继续轮询剩余未处理数据
                    _b_head_parse = false;
                    _recv_head_node->Clear();
                    if (bytes_transferred <= 0) {
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return;
                    }
                    continue;
                }

                //已经处理完头部,处理上次未接受完的消息数据
                //接收的数据仍不足剩余未处理的
                int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
                if (bytes_transferred < remain_msg) {
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                    _recv_msg_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
                _recv_msg_node->_cur_len += remain_msg;
                bytes_transferred -= remain_msg;
                copy_len += remain_msg;
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                //cout << "receive data is " << _recv_msg_node->_data << endl;
                    //此处可以调用Send发送测试
                Json::Reader reader;
                Json::Value root;
                reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
                std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
                    << root["data"].asString() << endl;
                root["data"] = "server has received msg, msg data is " + root["data"].asString();
                std::string return_str = root.toStyledString();
                Send(return_str, root["id"].asInt());
                //继续轮询剩余未处理数据
                _b_head_parse = false;
                _recv_head_node->Clear();
                if (bytes_transferred <= 0) {
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                continue;
            }
        }
        else {
            std::cout << "handle read failed, error is " << error.what() << endl;
            Close();
            _server->ClearSession(_uuid);
        }
    }
    catch (std::exception& e) {
        std::cout << "Exception code is " << e.what() << endl;
    }
}

先解析头部id,再解析长度,然后根据id和长度构造消息节点,copy剩下的消息体, 把上面代码中处理消息头的逻辑截取如下

    //获取头部MSGID数据
    short msg_id = 0;
    memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
    //网络字节序转化为本地字节序
    msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
    std::cout << "msg_id is " << msg_id << endl;
    //id非法
    if (msg_id > MAX_LENGTH) {
        std::cout << "invalid msg_id is " << msg_id << endl;
        _server->ClearSession(_uuid);
        return;
    }

    short msg_len = 0;
    memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
    //网络字节序转化为本地字节序
    msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
    std::cout << "msg_len is " << msg_len << endl;
    //id非法
    if (msg_len > MAX_LENGTH) {
        std::cout << "invalid data length is " << msg_len << endl;
        _server->ClearSession(_uuid);
        return;
    }

    _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

其余的没什么变动。

总结

本文介绍了服务器逻辑和网络层的设计,并且基于这个架构,完善了消息发送结构,下一篇带着大家设计逻辑类和逻辑队列。

源码链接https://gitee.com/secondtonone1/boostasio-learn

results matching ""

    No results matching ""