优雅退出
服务器优雅退出一直是服务器设计必须考虑的一个方向,意在能通过捕获信号使服务器安全退出。我们可以通过asio提供的信号机制绑定回调函数即可实现优雅退出。在主函数中我们添加
int main()
{
try {
boost::asio::io_context io_context;
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&io_context](auto, auto) {
io_context.stop();
});
CServer s(io_context, 10086);
io_context.run();
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
}
利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
。
单例模板类
接下来我们实现一个单例模板类,因为服务器的逻辑处理需要单例模式,后期可能还会有一些模块的设计也需要单例模式,所以先实现一个单例模板类,然后其他想实现单例类只需要继承这个模板类即可。
#include <memory>
#include <mutex>
#include <iostream>
using namespace std;
template <typename T>
class Singleton {
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = shared_ptr<T>(new T);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << endl;
}
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;
单例模式模板类将无参构造,拷贝构造,拷贝赋值都设定为protected属性,其他的类无法访问,其实也可以设置为私有属性。析构函数设置为公有的,其实设置为私有的更合理一点。
Singleton
有一个static类型的属性_instance
, 它是我们实际要开辟类型的智能指针类型。
s_flag
是函数GetInstance
内的局部静态变量,该变量在函数GetInstance
第一次调用时被初始化。以后无论调用多少次GetInstance
s_flag
都不会被重复初始化,而且s_flag
存在静态区,会随着进程结束而自动释放。
call_once
只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance
内部还会调用call_once
, 只是call_once
判断s_flag
已经被初始化了就不执行初始化智能指针的操作了。
LogicSystem单例类
我们实现逻辑系统的单例类,继承自Singleton<LogicSystem>
,这样LogicSystem的构造函数和拷贝构造函数就都变为私有的了,因为基类的构造函数和拷贝构造函数都是私有的。另外LogicSystem也用了基类的成员_instance
和GetInstance
函数。从而达到单例效果。
typedef function<void(shared_ptr<CSession>, short msg_id, string msg_data)> FunCallBack;
class LogicSystem:public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>;
public:
~LogicSystem();
void PostMsgToQue(shared_ptr < LogicNode> msg);
private:
LogicSystem();
void DealMsg();
void RegisterCallBacks();
void HelloWordCallBack(shared_ptr<CSession>, short msg_id, string msg_data);
std::thread _worker_thread;
std::queue<shared_ptr<LogicNode>> _msg_que;
std::mutex _mutex;
std::condition_variable _consume;
bool _b_stop;
std::map<short, FunCallBack> _fun_callbacks;
};
1 FunCallBack
为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。
2 _msg_que
为逻辑队列
3 _mutex
为保证逻辑队列安全的互斥量
4 _consume
表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
5 _fun_callbacks
表示回调函数的map,根据id查找对应的逻辑处理函数。
6 _worker_thread
表示工作线程,用来从逻辑队列中取数据并执行回调函数。
7 _b_stop
表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。
LogicNode
定义在CSession.h中
class LogicNode {
friend class LogicSystem;
public:
LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:
shared_ptr<CSession> _session;
shared_ptr<RecvNode> _recvnode;
};
其包含算了会话类的智能指针,主要是为了实现伪闭包,防止session被释放。 其次包含了接收消息的节点类的智能指针。 实现如下
LogicNode::LogicNode(shared_ptr<CSession> session,
shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {
}
LogicSystem
的构造函数如下
LogicSystem::LogicSystem():_b_stop(false){
RegisterCallBacks();
_worker_thread = std::thread (&LogicSystem::DealMsg, this);
}
构造函数中将停止信息初始化为false,注册消息处理函数并且启动了一个工作线程,工作线程执行DealMsg逻辑。 注册消息处理函数的逻辑如下
void LogicSystem::RegisterCallBacks() {
_fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,
placeholders::_1, placeholders::_2, placeholders::_3);
}
MSG_HELLO_WORD
定义在const.h中
enum MSG_IDS {
MSG_HELLO_WORD = 1001
};
MSG_HELLO_WORD
表示消息id,HelloWordCallBack
为对应的回调处理函数
void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, short msg_id, string msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
session->Send(return_str, root["id"].asInt());
}
在HelloWordCallBack
里我们根据消息id和收到的消息,做了相应的处理并且回应给客户端。
工作线程的处理函数DealMsg
逻辑
void LogicSystem::DealMsg() {
for (;;) {
std::unique_lock<std::mutex> unique_lk(_mutex);
//判断队列为空则用条件变量阻塞等待,并释放锁
while (_msg_que.empty() && !_b_stop) {
_consume.wait(unique_lk);
}
//判断是否为关闭状态,把所有逻辑执行完后则退出循环
if (_b_stop ) {
while (!_msg_que.empty()) {
auto msg_node = _msg_que.front();
cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;
auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
if (call_back_iter == _fun_callbacks.end()) {
_msg_que.pop();
continue;
}
call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
break;
}
//如果没有停服,且说明队列中有数据
auto msg_node = _msg_que.front();
cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;
auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
if (call_back_iter == _fun_callbacks.end()) {
_msg_que.pop();
continue;
}
call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
}
1 DealMsg
逻辑中初始化了一个unique_lock,主要是用来控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。
2 我们判断队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。
LogicSystem的析构函数需要等待工作线程处理完再退出,但是工作线程可能处于挂起状态,所以要发送一个激活信号唤醒工作线程。并且将_b_stop标记设置为true。
LogicSystem::~LogicSystem(){
_b_stop = true;
_consume.notify_one();
_worker_thread.join();
}
因为网络层收到消息后我们需要将消息投递给逻辑队列进行处理,那么LogicSystem就要封装一个投递函数
void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {
std::unique_lock<std::mutex> unique_lk(_mutex);
_msg_que.push(msg);
//由0变为1则发送通知信号
if (_msg_que.size() == 1) {
_consume.notify_one();
}
}
在Session收到数据时这样调用
LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
再次启动服务器,编译启动,和之前一样可以看到数据收发正常。如下图:
总结
本文实现了服务器的逻辑类,包括并发控制等手段。
视频连接https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101