优雅退出

服务器优雅退出一直是服务器设计必须考虑的一个方向,意在能通过捕获信号使服务器安全退出。我们可以通过asio提供的信号机制绑定回调函数即可实现优雅退出。在主函数中我们添加

int main()
{
    try {
        boost::asio::io_context  io_context;
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&io_context](auto, auto) {
            io_context.stop();
            });
        CServer s(io_context, 10086);
        io_context.run();
    }
    catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }
}

利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop

单例模板类

接下来我们实现一个单例模板类,因为服务器的逻辑处理需要单例模式,后期可能还会有一些模块的设计也需要单例模式,所以先实现一个单例模板类,然后其他想实现单例类只需要继承这个模板类即可。

#include <memory>
#include <mutex>
#include <iostream>
using namespace std;
template <typename T>
class Singleton {
protected:
    Singleton() = default;
    Singleton(const Singleton<T>&) = delete;
    Singleton& operator=(const Singleton<T>& st) = delete;

    static std::shared_ptr<T> _instance;
public:
    static std::shared_ptr<T> GetInstance() {
        static std::once_flag s_flag;
        std::call_once(s_flag, [&]() {
            _instance = shared_ptr<T>(new T);
            });

        return _instance;
    }
    void PrintAddress() {
        std::cout << _instance.get() << endl;
    }
    ~Singleton() {
        std::cout << "this is singleton destruct" << std::endl;
    }
};

template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

单例模式模板类将无参构造,拷贝构造,拷贝赋值都设定为protected属性,其他的类无法访问,其实也可以设置为私有属性。析构函数设置为公有的,其实设置为私有的更合理一点。 Singleton有一个static类型的属性_instance, 它是我们实际要开辟类型的智能指针类型。 s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstance s_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。 call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance 内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。

LogicSystem单例类

我们实现逻辑系统的单例类,继承自Singleton<LogicSystem>,这样LogicSystem的构造函数和拷贝构造函数就都变为私有的了,因为基类的构造函数和拷贝构造函数都是私有的。另外LogicSystem也用了基类的成员_instanceGetInstance函数。从而达到单例效果。

typedef  function<void(shared_ptr<CSession>, short msg_id, string msg_data)> FunCallBack;
class LogicSystem:public Singleton<LogicSystem>
{
    friend class Singleton<LogicSystem>;
public:
    ~LogicSystem();
    void PostMsgToQue(shared_ptr < LogicNode> msg);
private:
    LogicSystem();
    void DealMsg();
    void RegisterCallBacks();
    void HelloWordCallBack(shared_ptr<CSession>, short msg_id, string msg_data);
    std::thread _worker_thread;
    std::queue<shared_ptr<LogicNode>> _msg_que;
    std::mutex _mutex;
    std::condition_variable _consume;
    bool _b_stop;
    std::map<short, FunCallBack> _fun_callbacks;
};

1   FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。

2   _msg_que为逻辑队列

3   _mutex 为保证逻辑队列安全的互斥量

4   _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。

5   _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。

6   _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。

7   _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

LogicNode定义在CSession.h中

class LogicNode {
    friend class LogicSystem;
public:
    LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:
    shared_ptr<CSession> _session;
    shared_ptr<RecvNode> _recvnode;
};

其包含算了会话类的智能指针,主要是为了实现伪闭包,防止session被释放。 其次包含了接收消息的节点类的智能指针。 实现如下

LogicNode::LogicNode(shared_ptr<CSession>  session, 
    shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {

}

LogicSystem的构造函数如下

LogicSystem::LogicSystem():_b_stop(false){
    RegisterCallBacks();
    _worker_thread = std::thread (&LogicSystem::DealMsg, this);
}

构造函数中将停止信息初始化为false,注册消息处理函数并且启动了一个工作线程,工作线程执行DealMsg逻辑。 注册消息处理函数的逻辑如下

void LogicSystem::RegisterCallBacks() {
    _fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,
        placeholders::_1, placeholders::_2, placeholders::_3);
}

MSG_HELLO_WORD定义在const.h中

enum MSG_IDS {
    MSG_HELLO_WORD = 1001
};

MSG_HELLO_WORD表示消息id,HelloWordCallBack为对应的回调处理函数

void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, short msg_id, string msg_data) {
    Json::Reader reader;
    Json::Value root;
    reader.parse(msg_data, root);
    std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
        << root["data"].asString() << endl;
    root["data"] = "server has received msg, msg data is " + root["data"].asString();
    std::string return_str = root.toStyledString();
    session->Send(return_str, root["id"].asInt());
}

HelloWordCallBack里我们根据消息id和收到的消息,做了相应的处理并且回应给客户端。

工作线程的处理函数DealMsg逻辑

void LogicSystem::DealMsg() {
    for (;;) {
        std::unique_lock<std::mutex> unique_lk(_mutex);
        //判断队列为空则用条件变量阻塞等待,并释放锁
        while (_msg_que.empty() && !_b_stop) {
            _consume.wait(unique_lk);
        }

        //判断是否为关闭状态,把所有逻辑执行完后则退出循环
        if (_b_stop ) {
            while (!_msg_que.empty()) {
                auto msg_node = _msg_que.front();
                cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
                auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
                if (call_back_iter == _fun_callbacks.end()) {
                    _msg_que.pop();
                    continue;
                }
                call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
                    std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
                _msg_que.pop();
            }
            break;
        }

        //如果没有停服,且说明队列中有数据
        auto msg_node = _msg_que.front();
        cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
        auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
        if (call_back_iter == _fun_callbacks.end()) {
            _msg_que.pop();
            continue;
        }
        call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, 
            std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
        _msg_que.pop();
    }
}

1   DealMsg逻辑中初始化了一个unique_lock,主要是用来控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。 2   我们判断队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。

LogicSystem的析构函数需要等待工作线程处理完再退出,但是工作线程可能处于挂起状态,所以要发送一个激活信号唤醒工作线程。并且将_b_stop标记设置为true。

LogicSystem::~LogicSystem(){
    _b_stop = true;
    _consume.notify_one();
    _worker_thread.join();
}

因为网络层收到消息后我们需要将消息投递给逻辑队列进行处理,那么LogicSystem就要封装一个投递函数

void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {
    std::unique_lock<std::mutex> unique_lk(_mutex);
    _msg_que.push(msg);
    //由0变为1则发送通知信号
    if (_msg_que.size() == 1) {
        _consume.notify_one();
    }
}

在Session收到数据时这样调用

LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

再次启动服务器,编译启动,和之前一样可以看到数据收发正常。如下图:

https://cdn.llfc.club/1685849713914.jpg

总结

本文实现了服务器的逻辑类,包括并发控制等手段。

视频连接https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101

源码链接https://gitee.com/secondtonone1/boostasio-learn

results matching ""

    No results matching ""