认证服务
我们的认证服务要给邮箱发送验证码,所以用nodejs较为合适,nodejs是一门IO效率很高而且生态完善的语言,用到发送邮件的库也方便。
nodejs可以去官网下载https://nodejs.org/en,一路安装就可以了
我们新建VarifyServer文件夹,在文件夹内部初始化server要用到的nodejs库的配置文件
npm init
根据提示同意会创建一个package.json文件
接下来安装grpc-js包,也可以安装grpc,grpc是C++版本,grpc-js是js版本,C++版本停止维护了。所以用grpc-js版本。
安装过程出现了错误,因为淘宝镜像地址过期了
清除之前npm镜像地址
npm cache clean --force
重新设置新的淘宝镜像
npm config set registry https://registry.npmmirror.com
接着下载grpc-js就成功了
接着安装proto-loader用来动态解析proto文件
npm install @grpc/proto-loader
我们再安装email处理的库
npm install nodemailer
我们将proto文件放入VarifyServer文件夹,并且新建一个proto.js用来解析proto文件
const path = require('path')
const grpc = require('@grpc/grpc-js')
const protoLoader = require('@grpc/proto-loader')
const PROTO_PATH = path.join(__dirname, 'message.proto')
const packageDefinition = protoLoader.loadSync(PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true })
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition)
const message_proto = protoDescriptor.message
module.exports = message_proto
keepCase: 如果为 true,则保留字段名的原始大小写。如果为 false,则将所有字段名转换为驼峰命名法。
longs: 控制如何表示 Protocol Buffers 中的 long 类型。如果设置为 String,则长整数会被转换为字符串,以避免 JavaScript 中的整数溢出问题。
enums: 控制如何表示 Protocol Buffers 中的枚举类型。如果设置为 String,则枚举值会被转换为字符串。
defaults: 如果为 true,则为未明确设置的字段提供默认值。
oneofs: 如果为 true,则支持 Protocol Buffers 中的 oneof 特性。
在写代码发送邮件之前,我们先去邮箱开启smtp服务。我用的163邮箱,在邮箱设置中查找smtp服务器地址,需要开启smtp服务。这个是固定的,不需要修改。
网易163邮箱的 SMTP 服务器地址为: smtp.163.com
发送邮件,建议使用授权码(有的邮箱叫 独立密码),确保邮箱密码的安全性。授权码在邮箱设置中进行设置。如果开启了授权码,发送邮件的时候,必须使用授权码。
这里设置开启smtp服务和授权码。我这里已经是设置好的。
新增一个授权码用于发邮件
读取配置
因为我们要实现参数可配置,所以要读取配置,先在文件夹内创建一个config.json文件
{
"email": {
"user": "secondtonone1@163.com",
"pass": "CRWTAZOSNCWDDQQTllfc"
},
}
user是我们得邮箱地址,pass是邮箱得授权码,只有有了授权码才能用代码发邮件。大家记得把授权码改为你们自己的,否则用我的无法发送成功。
另外我们也要用到一些常量和全局得变量,所以我们定义一个const.js
let code_prefix = "code_";
const Errors = {
Success : 0,
RedisErr : 1,
Exception : 2,
};
module.exports = {code_prefix,Errors}
新建config.js用来读取配置
const fs = require('fs');
let config = JSON.parse(fs.readFileSync('config.json', 'utf8'));
let email_user = config.email.user;
let email_pass = config.email.pass;
let mysql_host = config.mysql.host;
let mysql_port = config.mysql.port;
let redis_host = config.redis.host;
let redis_port = config.redis.port;
let redis_passwd = config.redis.passwd;
let code_prefix = "code_";
module.exports = {email_pass, email_user, mysql_host, mysql_port,redis_host, redis_port, redis_passwd, code_prefix}
接下来封装发邮件的模块,新建一个email.js文件
const nodemailer = require('nodemailer');
const config_module = require("./config")
/**
* 创建发送邮件的代理
*/
let transport = nodemailer.createTransport({
host: 'smtp.163.com',
port: 465,
secure: true,
auth: {
user: config_module.email_user, // 发送方邮箱地址
pass: config_module.email_pass // 邮箱授权码或者密码
}
});
接下来实现发邮件函数
/**
* 发送邮件的函数
* @param {*} mailOptions_ 发送邮件的参数
* @returns
*/
function SendMail(mailOptions_){
return new Promise(function(resolve, reject){
transport.sendMail(mailOptions_, function(error, info){
if (error) {
console.log(error);
reject(error);
} else {
console.log('邮件已成功发送:' + info.response);
resolve(info.response)
}
});
})
}
module.exports.SendMail = SendMail
因为transport.SendMail相当于一个异步函数,调用该函数后发送的结果是通过回调函数通知的,所以我们没办法同步使用,需要用Promise封装这个调用,抛出Promise给外部,那么外部就可以通过await或者then catch的方式处理了。
我们新建server.js,用来启动grpc server
async function GetVarifyCode(call, callback) {
console.log("email is ", call.request.email)
try{
uniqueId = uuidv4();
console.log("uniqueId is ", uniqueId)
let text_str = '您的验证码为'+ uniqueId +'请三分钟内完成注册'
//发送邮件
let mailOptions = {
from: 'secondtonone1@163.com',
to: call.request.email,
subject: '验证码',
text: text_str,
};
let send_res = await emailModule.SendMail(mailOptions);
console.log("send res is ", send_res)
callback(null, { email: call.request.email,
error:const_module.Errors.Success
});
}catch(error){
console.log("catch error is ", error)
callback(null, { email: call.request.email,
error:const_module.Errors.Exception
});
}
}
function main() {
var server = new grpc.Server()
server.addService(message_proto.VarifyService.service, { GetVarifyCode: GetVarifyCode })
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
server.start()
console.log('grpc server started')
})
}
main()
GetVarifyCode声明为async是为了能在内部调用await。
提升GateServer并发
添加ASIO IOContext Pool 结构,让多个iocontext跑在不同的线程中
#include <vector>
#include <boost/asio.hpp>
#include "Singleton.h"
class AsioIOServicePool:public Singleton<AsioIOServicePool>
{
friend Singleton<AsioIOServicePool>;
public:
using IOService = boost::asio::io_context;
using Work = boost::asio::io_context::work;
using WorkPtr = std::unique_ptr<Work>;
~AsioIOServicePool();
AsioIOServicePool(const AsioIOServicePool&) = delete;
AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
// 使用 round-robin 的方式返回一个 io_service
boost::asio::io_context& GetIOService();
void Stop();
private:
AsioIOServicePool(std::size_t size = 2/*std::thread::hardware_concurrency()*/);
std::vector<IOService> _ioServices;
std::vector<WorkPtr> _works;
std::vector<std::thread> _threads;
std::size_t _nextIOService;
};
实现
#include "AsioIOServicePool.h"
#include <iostream>
using namespace std;
AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
_works(size), _nextIOService(0){
for (std::size_t i = 0; i < size; ++i) {
_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
}
//遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
for (std::size_t i = 0; i < _ioServices.size(); ++i) {
_threads.emplace_back([this, i]() {
_ioServices[i].run();
});
}
}
AsioIOServicePool::~AsioIOServicePool() {
Stop();
std::cout << "AsioIOServicePool destruct" << endl;
}
boost::asio::io_context& AsioIOServicePool::GetIOService() {
auto& service = _ioServices[_nextIOService++];
if (_nextIOService == _ioServices.size()) {
_nextIOService = 0;
}
return service;
}
void AsioIOServicePool::Stop(){
//因为仅仅执行work.reset并不能让iocontext从run的状态中退出
//当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。
for (auto& work : _works) {
//把服务先停止
work->get_io_context().stop();
work.reset();
}
for (auto& t : _threads) {
t.join();
}
}
修改CServer处Start逻辑, 改为每次从IOServicePool连接池中获取连接
void CServer::Start()
{
auto self = shared_from_this();
auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();
std::shared_ptr<HttpConnection> new_con = std::make_shared<HttpConnection>(io_context);
_acceptor.async_accept(new_con->GetSocket(), [self, new_con](beast::error_code ec) {
try {
//出错则放弃这个连接,继续监听新链接
if (ec) {
self->Start();
return;
}
//处理新链接,创建HpptConnection类管理新连接
new_con->Start();
//继续监听
self->Start();
}
catch (std::exception& exp) {
std::cout << "exception is " << exp.what() << std::endl;
self->Start();
}
});
}
为了方便读取配置文件,将ConfigMgr改为单例, 将构造函数变成私有,添加Inst函数
static ConfigMgr& Inst() {
static ConfigMgr cfg_mgr;
return cfg_mgr;
}
VerifyGrpcClient.cpp中添加
class RPConPool {
public:
RPConPool(size_t poolSize, std::string host, std::string port)
: poolSize_(poolSize), host_(host), port_(port), b_stop_(false) {
for (size_t i = 0; i < poolSize_; ++i) {
std::shared_ptr<Channel> channel = grpc::CreateChannel(host+":"+port,
grpc::InsecureChannelCredentials());
connections_.push(VarifyService::NewStub(channel));
}
}
~RPConPool() {
std::lock_guard<std::mutex> lock(mutex_);
Close();
while (!connections_.empty()) {
connections_.pop();
}
}
std::unique_ptr<VarifyService::Stub> getConnection() {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] {
if (b_stop_) {
return true;
}
return !connections_.empty();
});
//如果停止则直接返回空指针
if (b_stop_) {
return nullptr;
}
auto context = std::move(connections_.front());
connections_.pop();
return context;
}
void returnConnection(std::unique_ptr<VarifyService::Stub> context) {
std::lock_guard<std::mutex> lock(mutex_);
if (b_stop_) {
return;
}
connections_.push(std::move(context));
cond_.notify_one();
}
void Close() {
b_stop_ = true;
cond_.notify_all();
}
private:
atomic<bool> b_stop_;
size_t poolSize_;
std::string host_;
std::string port_;
std::queue<std::unique_ptr<VarifyService::Stub>> connections_;
std::mutex mutex_;
std::condition_variable cond_;
};
我们在VerifyGrpcClient类中添加成员
std::unique_ptr<RPConPool> pool_;
修改构造函数
VerifyGrpcClient::VerifyGrpcClient() {
auto& gCfgMgr = ConfigMgr::Inst();
std::string host = gCfgMgr["VarifyServer"]["Host"];
std::string port = gCfgMgr["VarifyServer"]["Port"];
pool_.reset(new RPConPool(5, host, port));
}
当我们想连接grpc server端时,可以通过池子获取连接,用完之后再返回连接给池子
GetVarifyRsp GetVarifyCode(std::string email) {
ClientContext context;
GetVarifyRsp reply;
GetVarifyReq request;
request.set_email(email);
auto stub = pool_->getConnection();
Status status = stub->GetVarifyCode(&context, request, &reply);
if (status.ok()) {
pool_->returnConnection(std::move(stub));
return reply;
}
else {
pool_->returnConnection(std::move(stub));
reply.set_error(ErrorCodes::RPCFailed);
return reply;
}
}
总结
到本节为止我们完成nodejs搭建的grpc server, 修改package.json中的脚本
"scripts": {
"serve": "node server.js"
},
接着命令行执行 npm run serve即可启动grpc 服务。