简介
本文概述基于boost::asio
实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。
服务器架构设计
之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图
我们接下来要设计的服务器结构是这样的
消息头完善
我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。 之前我们设计的消息结构是这样的
现在将其完善为如下的样子
为了减少耦合和歧义,我们重新设计消息节点。
MsgNode
表示消息节点的基类,头部的消息用这个结构存储。
RecvNode
表示接收消息的节点。
SendNode
表示发送消息的节点。
我们将上述结构定义在MsgNode.h中
class MsgNode
{
public:
MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
_data = new char[_total_len + 1]();
_data[_total_len] = '\0';
}
~MsgNode() {
std::cout << "destruct MsgNode" << endl;
delete[] _data;
}
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
short _cur_len;
short _total_len;
char* _data;
};
class RecvNode :public MsgNode {
public:
RecvNode(short max_len, short msg_id);
private:
short _msg_id;
};
class SendNode:public MsgNode {
public:
SendNode(const char* msg,short max_len, short msg_id);
private:
short _msg_id;
};
实现如下
#include "MsgNode.h"
RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
_msg_id(msg_id){
}
SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){
//先发送id, 转为网络字节序
short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
memcpy(_data, &msg_id_host, HEAD_ID_LEN);
//转为网络字节序
short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}
SendNode
发送节点构造时,先将id转为网络字节序,然后写入_data
数据域。
然后将要发送数据的长度转为大端字节序,写入_data
数据域,注意要偏移HEAD_ID_LEN
长度。
最后将要发送的数据msg
写入_data
数据域,注意要偏移HEAD_ID_LEN
+HEAD_DATA_LEN
Session类改写
因为消息结构改变了,所以我们接收和发送数据的逻辑要做对应的修改,我们先修改Session类中收发消息结构如下
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN
(4字节)大小。
CSession::CSession(boost::asio::io_context& io_context, CServer* server):
_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
发送时我们构造发送节点,放到队列中即可
void CSession::Send(char* msg, short max_length, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}
_send_que.push(make_shared<SendNode>(msg, max_length, msgid));
if (send_que_size>0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}
当然我们也实现了一个重载版本
void CSession::Send(std::string msg, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}
_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
if (send_que_size > 0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}
在接收数据时我们解析头部也要解析id字段
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
try {
if (!error) {
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred > 0) {
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
//id非法
if (msg_id > MAX_LENGTH) {
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}
short msg_len = 0;
memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;
//id非法
if (msg_len > MAX_LENGTH) {
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
//已经处理完头部,处理上次未接受完的消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cout << "Exception code is " << e.what() << endl;
}
}
先解析头部id,再解析长度,然后根据id和长度构造消息节点,copy剩下的消息体, 把上面代码中处理消息头的逻辑截取如下
//获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
//id非法
if (msg_id > MAX_LENGTH) {
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}
short msg_len = 0;
memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;
//id非法
if (msg_len > MAX_LENGTH) {
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
其余的没什么变动。
总结
本文介绍了服务器逻辑和网络层的设计,并且基于这个架构,完善了消息发送结构,下一篇带着大家设计逻辑类和逻辑队列。