独立网络线程
独立前
独立后
槽函数连接方式
直接连接
connect(发送者, 信号, [](信号参数){});
这种槽函数在发送者所在线程触发。
增加接收者
connect(发送者,信号,接收者,槽函数)如果发送者和接收者在同一个线程,则槽函数调用的线程就是发送者所在的线程。
发送者和接收者不在一个线程,connect默认采用队列连接方式
connect(发送者,信号,接收者,槽函数)
槽函数在接收者所在的线程触发。好处就是解耦合。
元对象系统
- 信号和槽
- 反射
- 动态增加函数和属性
当我们信号和槽连接方式采用队列连接,那么信号的参数会被封装为元对象,投递到队列中。
要想支持元对象有两种方式
- 继承于QObject,并且类内填写Q_OBJECT宏
- 声明并且注册元对象类
为了支持高并发情况下断点续传,考虑将目前项目中TcpMgr中网络模块独立到独立线程
封装TcpThread类
利用RAII思想封装线程启动和回收
class TcpThread:public std::enable_shared_from_this<TcpThread> {
public:
TcpThread();
~TcpThread();
private:
QThread* _tcp_thread;
};
具体实现
TcpThread::TcpThread()
{
_tcp_thread = new QThread();
TcpMgr::GetInstance()->moveToThread(_tcp_thread);
QObject::connect(_tcp_thread, &QThread::finished, _tcp_thread, &QObject::deleteLater);
_tcp_thread->start();
}
TcpThread::~TcpThread()
{
_tcp_thread->quit();
}
主函数启动时记得提前启动线程,将TcpMgr转移到独立线程中
//启动tcp线程
TcpThread tcpthread;
MainWindow w;
w.show();
return a.exec();
测试发现,登录卡住,检测是信号sig_connect_tcp发送了,槽函数slot_tcp_connect没触发。
//连接tcp连接请求的信号和槽函数
connect(this, &LoginDialog::sig_connect_tcp, TcpMgr::GetInstance().get(), &TcpMgr::slot_tcp_connect);
为了测试
先在TcpMgr中添加测试槽函数
void slot_test() {
qDebug() << "receve thread is " << QThread::currentThread();
qDebug() << "slot test......";
}
在LoginDialog中连接信号
connect(this, &LoginDialog::sig_test, TcpMgr::GetInstance().get(), &TcpMgr::slot_test);
在发送sig_connect_tcp处发送sig_test
void LoginDialog::initHttpHandlers()
{
//注册获取登录回包逻辑
_handlers.insert(ReqId::ID_LOGIN_USER, [this](QJsonObject jsonObj){
int error = jsonObj["error"].toInt();
if(error != ErrorCodes::SUCCESS){
showTip(tr("参数错误"),false);
enableBtn(true);
return;
}
auto email = jsonObj["email"].toString();
//发送信号通知tcpMgr发送长链接
ServerInfo si;
si.Uid = jsonObj["uid"].toInt();
si.Host = jsonObj["host"].toString();
si.Port = jsonObj["port"].toString();
si.Token = jsonObj["token"].toString();
_uid = si.Uid;
_token = si.Token;
qDebug()<< "email is " << email << " uid is " << si.Uid <<" host is "
<< si.Host << " Port is " << si.Port << " Token is " << si.Token;
emit sig_connect_tcp(si);
emit sig_test();
});
}
测试,是可以看到能触发slot_test函数得,而且线程id显示是子线程中触发得槽函数。
那么sig_connect_tcp信号发出,没有触发槽函数,就是因为信号得参数类型不支持元对象系统。
为了支持元对象系统,我们需要在信号的参数ServerInfo类实现默认构造,同时声明为元对象类型
struct ServerInfo{
public:
ServerInfo() = default;
ServerInfo(const ServerInfo& other):Host(other.Host),Port(other.Port),Token(other.Token),Uid(other.Uid){}
QString Host;
QString Port;
QString Token;
int Uid;
};
Q_DECLARE_METATYPE(ServerInfo)
在TcpMgr中注册这个元对象类型
qRegisterMetaType<ServerInfo>("ServerInfo");
再次测试就通过登录了,但是在发送后续得消息时,又遇到了自定义类型作为参数得情况,我们需要和上面一样,依次声明元对象类型并且注册。
如下列举一个,还有很多,不再详细列举
class SearchInfo {
public:
SearchInfo(int uid, QString name, QString nick, QString desc, int sex, QString icon);
SearchInfo() = default;
int _uid;
QString _name;
QString _nick;
QString _desc;
int _sex;
QString _icon;
};
Q_DECLARE_METATYPE(SearchInfo)
Q_DECLARE_METATYPE(std::shared_ptr<SearchInfo>)
TcpMgr封装注册元对象函数
void TcpMgr::registerMetaType() {
// 注册所有自定义类型
qRegisterMetaType<ServerInfo>("ServerInfo");
qRegisterMetaType<SearchInfo>("SearchInfo");
qRegisterMetaType<std::shared_ptr<SearchInfo>>("std::shared_ptr<SearchInfo>");
qRegisterMetaType<AddFriendApply>("AddFriendApply");
qRegisterMetaType<std::shared_ptr<AddFriendApply>>("std::shared_ptr<AddFriendApply>");
qRegisterMetaType<ApplyInfo>("ApplyInfo");
qRegisterMetaType<std::shared_ptr<AuthInfo>>("std::shared_ptr<AuthInfo>");
qRegisterMetaType<AuthRsp>("AuthRsp");
qRegisterMetaType<std::shared_ptr<AuthRsp>>("std::shared_ptr<AuthRsp>");
qRegisterMetaType<UserInfo>("UserInfo");
qRegisterMetaType<std::vector<std::shared_ptr<TextChatData>>>("std::vector<std::shared_ptr<TextChatData>>");
qRegisterMetaType<std::vector<std::shared_ptr<ChatThreadInfo>>>("std::vector<std::shared_ptr<ChatThreadInfo>>");
qRegisterMetaType<std::shared_ptr<ChatThreadData>>("std::shared_ptr<ChatThreadData>");
qRegisterMetaType<ReqId>("ReqId");
}
在构造函数中调用
TcpMgr::TcpMgr():_host(""),_port(0),_b_recv_pending(false),_message_id(0),_message_len(0)
{
registerMetaType();
//...
}
再次测试就通过了
这里给大家讲讲为什么单线程情况下,信号可以携带自定义类型作为参数,不用设定元对象就可以传输,而跨线程不可以。
在 Qt 的信号/槽机制中,信号参数的传递方式取决于连接(connect)的类型,而连接类型又由发信号对象和接收槽对象所在的线程决定:
同线程(Direct Connection)
- 如果信号和槽都在同一个线程里,默认使用 Direct Connection。
- Direct Connection 本质上就是一个普通的 C++ 函数调用,参数直接按值或按引用传递,编译时就已经知道了类型,不需要任何额外的元类型信息。
- 因此,即使你没有把
SearchInfo注册为QMetaType,编译器也能直接生成函数调用代码,信号里就可以直接传递SearchInfo。
跨线程(Queued Connection)
如果信号发送者和接收者不在同一个线程,Qt 会自动把连接转成 Queued Connection。
Queued Connection 的实现是:当信号发出时,Qt 会把信号参数打包成一个事件(
QEvent),然后把事件放到目标线程的事件队列里;目标线程的事件循环(QCoreApplication::processEvents())再把这个事件取出来,调用槽函数。这里的“打包”与“解包”就需要运行时才能确定参数类型,以及如何拷贝或序列化这个类型——这正是 Qt 元对象系统(
QMetaType)要干的事情。如果没有把
SearchInfo声明成一个元类型,Qt 就不知道如何在内部把它从一个线程“打包”到事件里,又如何在另一线程里还原。因此,跨线程传递自定义类型,必须在类型定义后加上:
Q_DECLARE_METATYPE(SearchInfo)并在运行时注册(通常在
main()里调用一次):qRegisterMetaType<SearchInfo>("SearchInfo");
小结
- 同线程:Direct Connection,编译时直接调用,不需要
Q_DECLARE_METATYPE。 - 跨线程:Queued Connection,需要运行时打包/解包参数,必须用
Q_DECLARE_METATYPE(以及qRegisterMetaType)来注册你的自定义类型。
添加发送队列
UserMgr线程安全
为了保证多线程情况下访问数据的安全性,对UserMgr类的操作加锁
std::mutex _mtx;
在获取数据和设置数据的地方都进行加锁, 比如
std::shared_ptr<UserInfo> UserMgr::GetUserInfo()
{
std::lock_guard<std::mutex> lock(_mtx);
return _user_info;
}
还有很多不再赘述
设置发送队列
默认情况下qt的socket都是非阻塞的。
所以调用socket.write(数据)可能会直接返回-1
返回-1表示网络出错,一般都是EWOULD_BLOCK/EAGAIN造成的。表示发送缓冲区已经满了,无法继续发送。
而我们之前的逻辑,无论在哪个线程,想要发送数据,统一发送信号
void sig_send_data(ReqId reqId, QByteArray data);
会触发TcpMgr的槽函数
void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes)
{
uint16_t id = reqId;
// 计算长度(使用网络字节序转换)
quint16 len = static_cast<quint16>(dataBytes.length());
// 创建一个QByteArray用于存储要发送的所有数据
QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
// 设置数据流使用网络字节序
out.setByteOrder(QDataStream::BigEndian);
// 写入ID和长度
out << id << len;
// 添加字符串数据
block.append(dataBytes);
qint64 written = _socket.write(block);
qDebug() << "tcp mgr send byte data is" << _current_block
<< ", write() returned" << written;
}
上述函数在网络情况良好的时候不会产生问题,但是如果网络发送情况频繁的时候,就容易出现written为-1的情况。
也就是发送缓冲区满了,导致发送失败。
对于这种情况,我们可以模仿我们的服务器写法,添加一个发送队列,然后将要发送的数据投递到发送队列
//发送队列
QQueue<QByteArray> _send_queue;
//正在发送的包
QByteArray _current_block;
//当前已发送的字节数
qint64 _bytes_sent;
//是否正在发送
bool _pending;
修改发送逻辑
void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes)
{
uint16_t id = reqId;
// 计算长度(使用网络字节序转换)
quint16 len = static_cast<quint16>(dataBytes.length());
// 创建一个QByteArray用于存储要发送的所有数据
QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
// 设置数据流使用网络字节序
out.setByteOrder(QDataStream::BigEndian);
// 写入ID和长度
out << id << len;
// 添加字符串数据
block.append(dataBytes);
//判断是否正在发送
if (_pending) {
//放入队列直接返回,因为目前有数据正在发送
_send_queue.enqueue(block);
return;
}
// 没有正在发送,把这包设为“当前块”,重置计数,并写出去
_current_block = block; // ← 保存当前正在发送的 block
_bytes_sent = 0; // ← 归零
_pending = true; // ← 标记正在发送
qint64 written = _socket.write(_current_block);
qDebug() << "tcp mgr send byte data is" << _current_block
<< ", write() returned" << written;
}
我们需要监听发送返回的数据,QT也提供了类似于asio的异步回调功能,只是在发送完成后返回一个信号void bytesWritten(qint64 bytes);
我们连接这个信号
QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {
//更新发送数据
_bytes_sent += bytes;
//未发送完整
if (_bytes_sent < _current_block.size()) {
//继续发送
auto data_to_send = _current_block.mid(_bytes_sent);
_socket.write(data_to_send);
return;
}
//发送完全,则查看队列是否为空
if (_send_queue.isEmpty()) {
//队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送
_current_block.clear();
_pending = false;
_bytes_sent = 0;
return;
}
//队列不为空,则取出队首元素
_current_block = _send_queue.dequeue();
_bytes_sent = 0;
_pending = true;
qint64 w2 = _socket.write(_current_block);
qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;
});
_pending控制发送还是放入队列。
断点续传思路
思路图
修改上传逻辑
原来的传输逻辑,采用的是循环上传,就是将一个文件拆分成多个报文段,循环上传,而不等待服务器每次回复
void MainWindow::on_uploadBtn_clicked()
{
ui->uploadBtn->setEnabled(false);
// 打开文件
QFile file(_file_name);
if (!file.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open file:" << file.errorString();
return;
}
// 保存当前文件指针位置
qint64 originalPos = file.pos();
QCryptographicHash hash(QCryptographicHash::Md5);
if (!hash.addData(&file)) {
qWarning() << "Failed to read data from file:" << _file_name;
return ;
}
_file_md5 = hash.result().toHex(); // 返回十六进制字符串
// 读取文件内容并发送
QByteArray buffer;
int seq = 0;
QFileInfo fileInfo(_file_name); // 创建 QFileInfo 对象
QString fileName = fileInfo.fileName(); // 获取文件名
qDebug() << "文件名是:" << fileName; // 输出文件名
int total_size = fileInfo.size();
int last_seq = 0;
if(total_size % MAX_FILE_LEN){
last_seq = (total_size/MAX_FILE_LEN)+1;
}else{
last_seq = total_size/MAX_FILE_LEN;
}
// 恢复文件指针到原来的位置
file.seek(originalPos);
while (!file.atEnd()) {
//每次读取2048字节发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject jsonObj;
// 将文件内容转换为 Base64 编码(可选)
QString base64Data = buffer.toBase64();
//qDebug() << "send data is " << base64Data;
++seq;
jsonObj["md5"] = _file_md5;
jsonObj["name"] = fileName;
jsonObj["seq"] = seq;
jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
jsonObj["total_size"] = total_size;
if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
jsonObj["last"] = 1;
}else{
jsonObj["last"] = 0;
}
jsonObj["data"]= base64Data;
jsonObj["last_seq"] = last_seq;
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
//startDelay(500);
}
//关闭文件
file.close();
}
现在需要改为分段上传,每次上传后,等待服务器返回响应后再上传下一个
void MainWindow::on_uploadBtn_clicked()
{
ui->uploadBtn->setEnabled(false);
ui->pauseBtn->setEnabled(true);
// 打开文件
QFile file(_file_name);
if (!file.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open file:" << file.errorString();
return;
}
// 保存当前文件指针位置
qint64 originalPos = file.pos();
QCryptographicHash hash(QCryptographicHash::Md5);
if (!hash.addData(&file)) {
qWarning() << "Failed to read data from file:" << _file_name;
return ;
}
_file_md5 = hash.result().toHex(); // 返回十六进制字符串
// 读取文件内容并发送
QByteArray buffer;
int seq = 0;
// 创建 QFileInfo 对象
auto fileInfo = std::make_shared<QFileInfo>(_file_name);
QString fileName = fileInfo->fileName(); // 获取文件名
qDebug() << "文件名是:" << fileName; // 输出文件名
int total_size = fileInfo->size();
int last_seq = 0;
if(total_size % MAX_FILE_LEN){
last_seq = (total_size/MAX_FILE_LEN)+1;
}else{
last_seq = total_size/MAX_FILE_LEN;
}
// 恢复文件指针到原来的位置
file.seek(originalPos);
//改为读取第一块并发送
//每次读取2048字节发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject jsonObj;
// 将文件内容转换为 Base64 编码(可选)
QString base64Data = buffer.toBase64();
//qDebug() << "send data is " << base64Data;
++seq;
jsonObj["md5"] = _file_md5;
jsonObj["name"] = fileName;
jsonObj["seq"] = seq;
jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
jsonObj["total_size"] = total_size;
if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
jsonObj["last"] = 1;
}else{
jsonObj["last"] = 0;
}
jsonObj["data"]= base64Data;
jsonObj["last_seq"] = last_seq;
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
LogicMgr::Inst()->AddMD5File(_file_md5, fileInfo);
//关闭文件
file.close();
}
收到响应后续传
当客户端收到服务器的回包后,解析后传递给LogicMgr, LogicMgr中需要将后续的报文段发送给服务器。我们封装如下逻辑
void LogicWorker::InitHandlers()
{
//注册上传消息
_handlers[ID_UPLOAD_FILE_RSP] = [this](QJsonObject obj){
auto err = obj["error"].toInt();
if(err != RSP_SUCCESS){
qDebug() << "upload msg rsp err is " << err;
return;
}
auto name = obj["name"].toString();
auto total_size = obj["total_size"].toInt();
auto trans_size = obj["trans_size"].toInt();
auto md5 = obj["md5"].toString();
auto seq = obj["seq"].toInt();
qDebug() << "recv : " << name << " file trans_size is " << trans_size;
emit sig_trans_size(trans_size);
//判断trans_size是否和total_size相等
if(total_size == trans_size){
return;
}
auto file_info = LogicMgr::Inst()->GetFileInfo(md5);
if(!file_info){
return;
}
//再次组织数据发送
QFile file(file_info->filePath());
if (!file.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open file:" << file.errorString();
return;
}
//文件偏移到已经发送的位置,继续读取发送
file.seek(trans_size);
if(LogicMgr::Inst()->Pause()){
return ;
}
QByteArray buffer;
seq++;
//每次读取2048字节发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject jsonObj;
// 将文件内容转换为 Base64 编码(可选)
QString base64Data = buffer.toBase64();
jsonObj["md5"] = md5;
jsonObj["name"] = file_info->fileName();
jsonObj["seq"] = seq;
jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
jsonObj["total_size"] = total_size;
if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
jsonObj["last"] = 1;
}else{
jsonObj["last"] = 0;
}
jsonObj["data"]= base64Data;
jsonObj["last_seq"] = obj["last_seq"].toInt();
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
file.close();
};
}
其中sig_trans_size为信号,通知主界面显示进度
暂停和续传
客户端需增加暂停和续传按钮,支持传说过程中暂停,点击后再继续上传等功能
void MainWindow::slot_pause_continue()
{
//续传状态或者初始状态,按下暂停按钮
if(_cur_state == INIT || _cur_state == CONTINUE){
//设置当前状态为暂停状态
_b_pause = true;
ui->pauseBtn->setText("继续");
_cur_state = PAUSE;
LogicMgr::Inst()->SetPause(true);
return;
}
//判断当前为暂停状态,则点击后开启续传
if(_cur_state == PAUSE){
_b_pause = false;
ui->pauseBtn->setText("暂停");
_cur_state = CONTINUE ;
LogicMgr::Inst()->SetPause(false);
//发送请求获取文件信息,继续上传
auto file_info = LogicMgr::Inst()->GetFileInfo(_file_md5);
QJsonObject jsonObj;
jsonObj["md5"] = _file_md5;
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
TcpClient::Inst().sendMsg(ID_SYNC_FILE_REQ, send_data);
return;
}
}
这里继续上传需要请求一下服务器,同步之前的上传进度。
我们添加了新的协议ID_SYNC_FILE_REQ, 服务器收到后将状态和进度返回,客户端响应
_handlers[ID_SYNC_FILE_RSP] = [this](QJsonObject obj){
auto err = obj["error"].toInt();
if(err != RSP_SUCCESS){
qDebug() << " msg rsp err is " << err;
return;
}
auto md5 = obj["md5"].toString();
auto seq = obj["seq"].toInt();
auto total_size = obj["total_size"].toInt();
auto file_info = LogicMgr::Inst()->GetFileInfo(md5);
if(!file_info){
qDebug() << "not found file" ;
return;
}
//再次组织数据发送
QFile file(file_info->filePath());
if (!file.open(QIODevice::ReadOnly)) {
qWarning() << "Could not open file:" << file.errorString();
return;
}
auto trans_size = obj["trans_size"].toInt();
//文件偏移到已经发送的位置,继续读取发送
file.seek(trans_size);
if(LogicMgr::Inst()->Pause()){
return ;
}
QByteArray buffer;
seq++;
//每次读取2048字节发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject jsonObj;
// 将文件内容转换为 Base64 编码(可选)
QString base64Data = buffer.toBase64();
jsonObj["md5"] = md5;
jsonObj["name"] = file_info->fileName();
jsonObj["seq"] = seq;
jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
jsonObj["total_size"] = total_size;
if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
jsonObj["last"] = 1;
}else{
jsonObj["last"] = 0;
}
jsonObj["data"]= base64Data;
jsonObj["last_seq"] = obj["last_seq"].toInt();
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
file.close();
};
客户端根据返回的进度,按照偏移量读取指定文件,并且继续上报。
如果健壮一点,可以判断服务器返回的错误信息,根据错误,提示主界面做出交互显示等。这里不再赘述。
到此客户端设计完成。
单线程服务器改造
单线程服务器改造不大,只需要增加同步文件进度信息的处理逻辑,以及优化之前的上传处理逻辑即可
_fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto data = root["data"].asString();
//std::cout << "recv file data is " << data << std::endl;
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_FILE_RSP);
});
// 解码
std::string decoded = base64_decode(data);
auto md5 = root["md5"].asString();
auto seq = root["seq"].asInt();
auto name = root["name"].asString();
auto total_size = root["total_size"].asInt();
auto trans_size = root["trans_size"].asInt();
auto file_path = ConfigMgr::Inst().GetFileOutPath();
auto file_path_str = (file_path / name).string();
std::cout << "file_path_str is " << file_path_str << std::endl;
if (seq != 1) {
auto iter = _map_md5_files.find(md5);
if (iter == _map_md5_files.end()) {
rtvalue["error"] = ErrorCodes::FileNotExists;
return;
}
}
std::ofstream outfile;
//第一个包
if (seq == 1) {
// 打开文件,如果存在则清空,不存在则创建
outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
//构造数据存储
auto file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = name;
file_info->_seq = seq;
file_info->_total_size = total_size;
file_info->_trans_size = trans_size;
std::lock_guard<std::mutex> lock(_file_mtx);
_map_md5_files[md5] = file_info;
}
else {
// 保存为文件
outfile.open(file_path_str, std::ios::binary | std::ios::app);
std::lock_guard<std::mutex> lock(_file_mtx);
auto file_info = _map_md5_files[md5];
file_info->_seq = seq;
file_info->_trans_size = trans_size;
}
if (!outfile) {
std::cerr << "无法打开文件进行写入。" << std::endl;
return ;
}
outfile.write(decoded.data(), decoded.size());
if (!outfile) {
std::cerr << "写入文件失败。" << std::endl;
return ;
}
outfile.close();
std::cout << "文件已成功保存为: " << name << std::endl;
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = total_size;
rtvalue["seq"] = seq;
rtvalue["name"] = name;
rtvalue["trans_size"] = trans_size;
rtvalue["md5"] = md5;
};
_fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_SYNC_FILE_RSP);
});
auto md5 = root["md5"].asString();
auto iter = _map_md5_files.find(md5);
if (iter == _map_md5_files.end()) {
rtvalue["error"] = ErrorCodes::FileNotExists;
return;
}
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = iter->second->_total_size;
rtvalue["seq"] = iter->second->_seq;
rtvalue["name"] = iter->second->_name;
rtvalue["trans_size"] = iter->second->_trans_size;
rtvalue["md5"] = md5;
};
多线程服务器
多线程服务器改造和单线程类似
只不过将处理逻辑放入LogicWorker中
void LogicWorker::RegisterCallBacks()
{
_fun_callbacks[ID_TEST_MSG_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto data = root["data"].asString();
std::cout << "recv test data is " << data << std::endl;
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_TEST_MSG_RSP);
});
rtvalue["error"] = ErrorCodes::Success;
rtvalue["data"] = data;
};
_fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto md5 = root["md5"].asString();
auto seq = root["seq"].asInt();
auto name = root["name"].asString();
auto total_size = root["total_size"].asInt();
auto trans_size = root["trans_size"].asInt();
auto last = root["last"].asInt();
auto file_data = root["data"].asString();
auto file_path = ConfigMgr::Inst().GetFileOutPath();
auto file_path_str = (file_path / name).string();
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_FILE_RSP);
});
// 使用 std::hash 对字符串进行哈希
std::hash<std::string> hash_fn;
size_t hash_value = hash_fn(name); // 生成哈希值
int index = hash_value % FILE_WORKER_COUNT;
std::cout << "Hash value: " << hash_value << std::endl;
//第一个包
if (seq == 1) {
//构造数据存储
auto file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = name;
file_info->_seq = seq;
file_info->_total_size = total_size;
file_info->_trans_size = trans_size;
LogicSystem::GetInstance()->AddMD5File(md5, file_info);
}
else {
auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
if (file_info == nullptr) {
rtvalue["error"] = ErrorCodes::FileNotExists;
return;
}
file_info->_seq = seq;
file_info->_trans_size = trans_size;
}
FileSystem::GetInstance()->PostMsgToQue(
std::make_shared<FileTask>(session, name, seq, total_size,
trans_size, last, file_data),
index
);
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = total_size;
rtvalue["seq"] = seq;
rtvalue["name"] = name;
rtvalue["trans_size"] = trans_size;
rtvalue["last"] = last;
rtvalue["md5"] = md5;
};
_fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_SYNC_FILE_RSP);
});
auto md5 = root["md5"].asString();
auto file = LogicSystem::GetInstance()->GetFileInfo(md5);
if (file == nullptr) {
rtvalue["error"] = ErrorCodes::FileNotExists;
return;
}
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = file->_total_size;
rtvalue["seq"] = file->_seq;
rtvalue["name"] = file->_name;
rtvalue["trans_size"] = file->_trans_size;
rtvalue["md5"] = md5;
};
}
将进度信息存储在LogicSystem中,后续可参考填写入redis,方便后续分布式扩展,注意如果填写了多个资源服务器,还有写入服务器信息,这个不再赘述和进阶,我们只用一个资源服务器做演示,后续读者可自己进阶分布式设计。
void LogicSystem::AddMD5File(std::string md5, std::shared_ptr<FileInfo> fileinfo) {
std::lock_guard<std::mutex> lock(_file_mtx);
_map_md5_files[md5] = fileinfo;
}
std::shared_ptr<FileInfo> LogicSystem::GetFileInfo(std::string md5) {
std::lock_guard<std::mutex> lock(_file_mtx);
auto iter = _map_md5_files.find(md5);
if (iter == _map_md5_files.end()) {
return nullptr;
}
return iter->second;
}
集成资源服务器
新架构形式
集成资源服务器后的架构为
将上述多线程服务器,整合到项目目录,同时设置资源属性表,复用之前的就可以了。
注意资源服务器配置要稍作修改
[GateServer]
Port = 8080
[VarifyServer]
Host = 127.0.0.1
Port = 50051
[StatusServer]
Host = 127.0.0.1
Port = 50052
[SelfServer]
Name = reserver
Host = 0.0.0.0
Port = 9090
RPCPort = 51055
[Mysql]
Host = 81.68.86.146
Port = 3308
User = root
Passwd = 123456.
Schema = llfc
[Redis]
Host = 81.68.86.146
Port = 6380
Passwd = 123456
[Static]
Path = static
[Output]
Path = bin
客户端新增资源网络类
构造函数解析
因为客户端需要长连接资源服务器,采用TCP方式上传文件,所以需要封装一个单例的FileTcpMgr类,用于上传资源。
class FileTcpMgr : public QObject, public Singleton<FileTcpMgr>,
public std::enable_shared_from_this<FileTcpMgr>
{
Q_OBJECT
public:
friend class Singleton<FileTcpMgr>;
~FileTcpMgr();
void SendData(ReqId reqId, QByteArray data);
void CloseConnection();
private:
void initHandlers();
explicit FileTcpMgr(QObject *parent = nullptr);
void registerMetaType();
void handleMsg(ReqId id, int len, QByteArray data);
QTcpSocket _socket;
QString _host;
uint16_t _port;
QByteArray _buffer;
bool _b_recv_pending;
quint16 _message_id;
quint32 _message_len;
QMap<ReqId, std::function<void(ReqId id, int len, QByteArray data)>> _handlers;
//发送队列
QQueue<QByteArray> _send_queue;
//正在发送的包
QByteArray _current_block;
//当前已发送的字节数
qint64 _bytes_sent;
//是否正在发送
bool _pending;
signals:
void sig_send_data(ReqId reqId, QByteArray data);
void sig_con_success(bool bsuccess);
void sig_connection_closed();
public slots:
void slot_send_data(ReqId reqId, QByteArray data);
void slot_tcp_connect(std::shared_ptr<ServerInfo> si);
};
构造函数具体实现
FileTcpMgr::FileTcpMgr(QObject *parent) : QObject(parent),
_host(""), _port(0), _b_recv_pending(false), _message_id(0), _message_len(0), _bytes_sent(0), _pending(false)
{
registerMetaType();
QObject::connect(&_socket, &QTcpSocket::connected, this, [&]() {
qDebug() << "Connected to server!";
emit sig_con_success(true);
});
QObject::connect(&_socket, &QTcpSocket::readyRead, this, [&]() {
// 当有数据可读时,读取所有数据
// 读取所有数据并追加到缓冲区
_buffer.append(_socket.readAll());
QDataStream stream(&_buffer, QIODevice::ReadOnly);
stream.setVersion(QDataStream::Qt_5_0);
forever {
//先解析头部
if(!_b_recv_pending){
// 检查缓冲区中的数据是否足够解析出一个消息头(消息ID + 消息长度)
if (_buffer.size() < FILE_UPLOAD_HEAD_LEN) {
return; // 数据不够,等待更多数据
}
// 预读取消息ID和消息长度,但不从缓冲区中移除
stream >> _message_id >> _message_len;
//将buffer 中的前六个字节移除
_buffer = _buffer.mid(FILE_UPLOAD_HEAD_LEN);
// 输出读取的数据
qDebug() << "Message ID:" << _message_id << ", Length:" << _message_len;
}
//buffer剩余长读是否满足消息体长度,不满足则退出继续等待接受
if(_buffer.size() < _message_len){
_b_recv_pending = true;
return;
}
_b_recv_pending = false;
// 读取消息体
QByteArray messageBody = _buffer.mid(0, _message_len);
qDebug() << "receive body msg is " << messageBody ;
_buffer = _buffer.mid(_message_len);
handleMsg(ReqId(_message_id),_message_len, messageBody);
}
});
//5.15 之后版本
// QObject::connect(&_socket, QOverload<QAbstractSocket::SocketError>::of(&QTcpSocket::errorOccurred), [&](QAbstractSocket::SocketError socketError) {
// Q_UNUSED(socketError)
// qDebug() << "Error:" << _socket.errorString();
// });
// 处理错误(适用于Qt 5.15之前的版本)
QObject::connect(&_socket, static_cast<void (QTcpSocket::*)(QTcpSocket::SocketError)>(&QTcpSocket::error),
this,
[&](QTcpSocket::SocketError socketError) {
qDebug() << "Error:" << _socket.errorString() ;
//todo... 根据错误类型做不同的处理
switch (socketError) {
case QTcpSocket::ConnectionRefusedError:
qDebug() << "Connection Refused!";
emit sig_con_success(false);
break;
case QTcpSocket::RemoteHostClosedError:
qDebug() << "Remote Host Closed Connection!";
break;
case QTcpSocket::HostNotFoundError:
qDebug() << "Host Not Found!";
emit sig_con_success(false);
break;
case QTcpSocket::SocketTimeoutError:
qDebug() << "Connection Timeout!";
emit sig_con_success(false);
break;
case QTcpSocket::NetworkError:
//qDebug() << "Network Error!";
break;
default:
//qDebug() << "Other Error!";
break;
}
});
// 处理连接断开
QObject::connect(&_socket, &QTcpSocket::disconnected, this,[&]() {
qDebug() << "Disconnected from server.";
emit sig_connection_closed();
});
//连接发送信号用来发送数据
QObject::connect(this, &FileTcpMgr::sig_send_data, this, &FileTcpMgr::slot_send_data);
//连接发送信号
QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {
//更新发送数据
_bytes_sent += bytes;
//未发送完整
if (_bytes_sent < _current_block.size()) {
//继续发送
auto data_to_send = _current_block.mid(_bytes_sent);
_socket.write(data_to_send);
return;
}
//发送完全,则查看队列是否为空
if (_send_queue.isEmpty()) {
//队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送
_current_block.clear();
_pending = false;
_bytes_sent = 0;
return;
}
//队列不为空,则取出队首元素
_current_block = _send_queue.dequeue();
_bytes_sent = 0;
_pending = true;
qint64 w2 = _socket.write(_current_block);
qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;
});
//连接
QObject::connect(this, &FileTcpMgr::sig_close, this, &FileTcpMgr::slot_tcp_close);
//注册消息
initHandlers();
}
简单描述下上述构造函数做的事情:
- 成功连接服务器后,会触发
QTcpSocket::connected信号,从而回调lambda表达式,发送sig_con_success信号 - 接收服务器传输的数据,会触发
QTcpSocket::readyRead信号,从而回调lambda表达式,在这里处理头部信息和包体信息。进行TLV协议解析后回调handleMsg。 - 捕获
QTcpSocket::SocketError信号,当出错后回调lambda表达式发送信号通知主界面错误。 - 捕获连接断开信号
QTcpSocket::disconnected,回调lambda表达式,通知主界面连接断开。 - 连接发送信号
sig_send_data,因为socket在独立线程,不能直接调用发送,所以改为异步发送,触发槽函数slot_send_data - 因为异步发送,可能存在未发送完全的情况,所以我们用
QTcpSocket::bytesWritten来检测发送了多少字节,通过lambda表达式回调处理,继续发送数据。 - 因为
socket被独立为单独线程,所以关闭也不能直接调用close(socket), 需要统一在槽函数中处理。
注册元对象系统的逻辑不再赘述。
连接槽函数
我们实现槽函数slot_tcp_connect用来创建客户端到资源服务器的连接
void FileTcpMgr::slot_tcp_connect(std::shared_ptr<ServerInfo> si)
{
qDebug()<< "receive tcp connect signal";
// 尝试连接到服务器
qDebug() << "Connecting to server...";
_host = si->_res_host;
_port = static_cast<uint16_t>(si->_res_port.toUInt());
_socket.connectToHost(_host, _port);
}
注册处理流程
注册上传头像回调逻辑
void FileTcpMgr::initHandlers()
{
//todo 接收上传用户头像回复
_handlers.insert(ID_UPLOAD_HEAD_ICON_RSP, [this](ReqId id, int len, QByteArray data){
Q_UNUSED(len);
qDebug()<< "handle id is "<< id ;
// 将QByteArray转换为QJsonDocument
QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
// 检查转换是否成功
if(jsonDoc.isNull()){
qDebug() << "Failed to create QJsonDocument.";
return;
}
QJsonObject recvObj = jsonDoc.object();
qDebug()<< "data jsonobj is " << recvObj ;
if(!recvObj.contains("error")){
int err = ErrorCodes::ERR_JSON;
qDebug() << "icon upload_failed, err is Json Parse Err" << err ;
//todo ... 提示上传失败
//emit upload_failed();
return;
}
int err = recvObj["error"].toInt();
if(err != ErrorCodes::SUCCESS){
qDebug() << "Login Failed, err is " << err ;
//emit upload_failed();
return;
}
auto md5 = recvObj["md5"].toString();
auto seq = recvObj["seq"].toInt();
auto trans_size = recvObj["trans_size"].toInt();
auto uid = recvObj["uid"].toInt();
auto total_size = recvObj["total_size"].toInt();
auto name = recvObj["name"].toString();
qDebug() << "recv : " << name << "file trans_size is " << trans_size;
//判断trans_size和total_size相等
if(total_size == trans_size){
return;
}
auto file_info = UserMgr::GetInstance()->GetFileInfoByMD5(md5);
if(!file_info){
return;
}
//再次组织数据发送
QFile file(file_info->filePath());
if(!file.open(QIODevice::ReadOnly)){
qWarning() << "Could not open file: " << file.errorString();
return;
}
//文件偏移到已经发送的位置,继续读取发送
file.seek(trans_size);
QByteArray buffer;
seq ++;
//每次读取2048字节发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject sendObj;
//将文件内容转换为base64编码
QString base64Data = buffer.toBase64();
sendObj["md5"] = md5;
sendObj["name"] = file_info->fileName();
sendObj["seq"] = seq;
sendObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
sendObj["total_size"] = total_size;
if(buffer.size() + (seq-1)*MAX_FILE_LEN >= total_size){
sendObj["last"] = 1;
}else{
sendObj["last"] = 0;
}
sendObj["data"] = base64Data;
sendObj["last_seq"] = recvObj["last_seq"].toInt();
sendObj["uid"] = uid;
QJsonDocument doc(sendObj);
auto send_data = doc.toJson();
SendData(ID_UPLOAD_HEAD_ICON_REQ, send_data);
file.close();
});
}
独立文件线程
对于上传我们独立到文件上报线程中
class FileTcpThread: public std::enable_shared_from_this<FileTcpThread>{
public:
FileTcpThread();
~FileTcpThread();
private:
QThread * _file_tcp_thread;
};
具体实现
FileTcpThread::FileTcpThread()
{
_file_tcp_thread = new QThread();
FileTcpMgr::GetInstance()->moveToThread(_file_tcp_thread);
QObject::connect(_file_tcp_thread, &QThread::finished, _file_tcp_thread, &QObject::deleteLater);
_file_tcp_thread->start();
}
FileTcpThread::~FileTcpThread()
{
_file_tcp_thread->quit();
}
主函数调用
#include "mainwindow.h"
#include <QApplication>
#include <QFile>
#include "global.h"
#include "tcpmgr.h"
#include "filetcpmgr.h"
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
QFile qss(":/style/stylesheet.qss");
if( qss.open(QFile::ReadOnly))
{
qDebug("open success");
QString style = QLatin1String(qss.readAll());
a.setStyleSheet(style);
qss.close();
}else{
qDebug("Open failed");
}
// 获取当前应用程序的路径
QString app_path = QCoreApplication::applicationDirPath();
// 拼接文件名
QString fileName = "config.ini";
QString config_path = QDir::toNativeSeparators(app_path +
QDir::separator() + fileName);
QSettings settings(config_path, QSettings::IniFormat);
QString gate_host = settings.value("GateServer/host").toString();
QString gate_port = settings.value("GateServer/port").toString();
gate_url_prefix = "http://"+gate_host+":"+gate_port;
//启动tcp线程
TcpThread tcpthread;
//启动资源网络线程
FileTcpThread file_tcp_thread;
MainWindow w;
w.show();
return a.exec();
}
原来的登录流程稍作修改,连接好ChatServer后,连接ResourceServer, 最后再让用户登录。
服务器逻辑
服务器新增文件上报逻辑处理, 在LogicWorker::RegisterCallBacks中添加
_fun_callbacks[ID_UPLOAD_HEAD_ICON_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto md5 = root["md5"].asString();
auto seq = root["seq"].asInt();
auto name = root["name"].asString();
auto total_size = root["total_size"].asInt();
auto trans_size = root["trans_size"].asInt();
auto last = root["last"].asInt();
auto file_data = root["data"].asString();
auto uid = root["uid"].asInt();
auto token = root["token"].asString();
auto last_seq = root["last_seq"].asInt();
//转化为字符串
auto uid_str = std::to_string(uid);
auto file_path = ConfigMgr::Inst().GetFileOutPath();
auto file_path_str = (file_path / uid_str / name).string();
Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
});
//第一个包校验一下token是否合理
if (seq == 1) {
//从redis获取用户token是否正确
std::string uid_str = std::to_string(uid);
std::string token_key = USERTOKENPREFIX + uid_str;
std::string token_value = "";
bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
if (!success) {
rtvalue["error"] = ErrorCodes::UidInvalid;
return;
}
if (token_value != token) {
rtvalue["error"] = ErrorCodes::TokenInvalid;
return;
}
}
// 使用 std::hash 对字符串进行哈希
std::hash<std::string> hash_fn;
size_t hash_value = hash_fn(name); // 生成哈希值
int index = hash_value % FILE_WORKER_COUNT;
std::cout << "Hash value: " << hash_value << std::endl;
//第一个包
if (seq == 1) {
//构造数据存储
auto file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = name;
file_info->_seq = seq;
file_info->_total_size = total_size;
file_info->_trans_size = trans_size;
LogicSystem::GetInstance()->AddMD5File(md5, file_info);
}
else {
auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
if (file_info == nullptr) {
rtvalue["error"] = ErrorCodes::FileNotExists;
return;
}
file_info->_seq = seq;
file_info->_trans_size = trans_size;
}
FileSystem::GetInstance()->PostMsgToQue(
std::make_shared<FileTask>(session, file_path_str, name, seq, total_size,
trans_size, last, file_data),
index
);
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = total_size;
rtvalue["seq"] = seq;
rtvalue["name"] = name;
rtvalue["trans_size"] = trans_size;
rtvalue["last"] = last;
rtvalue["md5"] = md5;
rtvalue["uid"] = uid;
rtvalue["last_seq"] = last_seq;
};
源码
https://gitee.com/secondtonone1/llfcchat
效果展示:
上传前
上传后
服务器存储成功
续传信息持久化
增加redis接口
bool RedisMgr::SetFileInfo(const std::string& md5, std::shared_ptr<FileInfo> file_info)
{
Json::Reader reader;
Json::Value root;
root["file_path_str"] = file_info->_file_path_str;
root["name"] = file_info->_name;
root["seq"] = file_info->_seq;
root["total_size"] = file_info->_total_size;
root["trans_size"] = file_info->_trans_size;
auto file_info_str = root.toStyledString();
auto redis_key = "file_upload_" + md5;
bool success = SetExp(redis_key, file_info_str, 3600);
return success;
}
新增超时设置
bool RedisMgr::SetExp(const std::string& key, const std::string& value, int expire_seconds) {
//执行redis命令行
auto connect = _con_pool->getConnection();
if (connect == nullptr) {
return false;
}
auto reply = (redisReply*)redisCommand(connect, "SETEX %s %d %s", key.c_str(),
expire_seconds,
value.c_str());
if (NULL == reply) {
std::cout << "Execute command [ SETEX " << key << " " << expire_seconds
<< " " << value << " ] failure ! " << std::endl;
_con_pool->returnConnection(connect);
return false;
}
if (!(reply->type == REDIS_REPLY_STATUS &&
(strcmp(reply->str, "OK") == 0 || strcmp(reply->str, "ok") == 0))) {
std::cout << "Execute command [ SETEX " << key << " " << expire_seconds
<< " " << value << " ] failure ! " << std::endl;
freeReplyObject(reply);
_con_pool->returnConnection(connect);
return false;
}
freeReplyObject(reply);
std::cout << "Execute command [ SETEX " << key << " " << expire_seconds
<< " " << value << " ] success ! " << std::endl;
_con_pool->returnConnection(connect);
return true;
}
每次收到上传信息后,更新上传进度到redis中
_fun_callbacks[ID_UPLOAD_HEAD_ICON_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto md5 = root["md5"].asString();
auto seq = root["seq"].asInt();
auto name = root["name"].asString();
auto total_size = root["total_size"].asInt();
auto trans_size = root["trans_size"].asInt();
auto last = root["last"].asInt();
auto file_data = root["data"].asString();
auto uid = root["uid"].asInt();
auto token = root["token"].asString();
auto last_seq = root["last_seq"].asInt();
//转化为字符串
auto uid_str = std::to_string(uid);
auto file_path = ConfigMgr::Inst().GetFileOutPath();
auto file_path_str = (file_path / uid_str / name).string();
Json::Value rtvalue;
auto callback = [=](const Json::Value& result) {
// 在异步任务完成后调用
Json::Value rtvalue = result;
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = total_size;
rtvalue["seq"] = seq;
rtvalue["name"] = name;
rtvalue["trans_size"] = trans_size;
rtvalue["last"] = last;
rtvalue["md5"] = md5;
rtvalue["uid"] = uid;
rtvalue["last_seq"] = last_seq;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
};
//第一个包校验一下token是否合理
if (seq == 1) {
//从redis获取用户token是否正确
std::string uid_str = std::to_string(uid);
std::string token_key = USERTOKENPREFIX + uid_str;
std::string token_value = "";
bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
if (!success) {
rtvalue["error"] = ErrorCodes::UidInvalid;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
if (token_value != token) {
rtvalue["error"] = ErrorCodes::TokenInvalid;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
// 使用 std::hash 对字符串进行哈希
std::hash<std::string> hash_fn;
size_t hash_value = hash_fn(name); // 生成哈希值
int index = hash_value % FILE_WORKER_COUNT;
std::cout << "Hash value: " << hash_value << std::endl;
//第一个包
if (seq == 1) {
//构造数据存储
auto file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = name;
file_info->_seq = seq;
file_info->_total_size = total_size;
file_info->_trans_size = trans_size;
//LogicSystem::GetInstance()->AddMD5File(md5, file_info);
//改为用redis存储
bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
if (!success) {
rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
else {
//auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
//改为从redis中加载
auto file_info = RedisMgr::GetInstance()->GetFileInfo(name);
if (file_info == nullptr) {
rtvalue["error"] = ErrorCodes::FileNotExists;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
file_info->_seq = seq;
file_info->_trans_size = trans_size;
bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
if (!success) {
rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
FileSystem::GetInstance()->PostMsgToQue(
std::make_shared<FileTask>(session, uid, file_path_str, name, seq, total_size,
trans_size, last, file_data, callback),
index
);
};
资源url更新
因为上头像传资源后,要将资源的路径存储到mysql数据库中,所以我们新增MysqlMgr,这个直接从ChatServer拷贝一份即可。
但是要注意,添加如下函数
bool MysqlMgr::UpdateUserIcon(int uid, const std::string& icon) {
return _dao.UpdateHeadInfo(uid, icon);
}
Dao层面实现更新头像逻辑
bool MysqlDao::UpdateHeadInfo(int uid, const std::string& icon)
{
auto con = pool_->getConnection();
if (!con) {
return false;
}
Defer defer([this, &con]() {
pool_->returnConnection(std::move(con));
});
auto& conn = con->_con;
try {
std::string update_sql =
"UPDATE user SET icon = ? WHERE uid = ?;";
std::unique_ptr<sql::PreparedStatement> pstmt(conn->prepareStatement(update_sql));
pstmt->setString(1, icon);
pstmt->setInt64(2, uid);
int affected_rows = pstmt->executeUpdate();
// 检查是否有行被更新(可选)
if (affected_rows == 0) {
std::cerr << "No user found with uid: " << uid << std::endl;
return false;
}
return true;
}
catch (sql::SQLException& e) {
std::cerr << "SQLException in UpdateHeadInfo: " << e.what() << std::endl;
return false;
}
return false;
}
封装异步回调
之前我们处理文件上传是异步方式,将要保存的文件投递给消息队列,交给独立线程后台保存。我们没有等待处理完成就直接将消息回传给客户端,这么做不是很好,所以改为异步方式,简单的方式就是通过回调函数处理,或者包装一个future等待。这里考虑保留异步结构,所以还是用回调处理
struct FileTask {
FileTask(std::shared_ptr<CSession> session, int uid, std::string path, std::string name,
int seq, int total_size, int trans_size, int last,
std::string file_data,
std::function<void(const Json::Value&)> callback) :_session(session), _uid(uid),
_seq(seq), _path(path), _name(name), _total_size(total_size),
_trans_size(trans_size), _last(last), _file_data(file_data), _callback(callback)
{}
~FileTask(){}
std::shared_ptr<CSession> _session;
int _uid;
int _seq ;
std::string _path;
std::string _name ;
int _total_size ;
int _trans_size ;
int _last ;
std::string _file_data;
std::function<void(const Json::Value&)> _callback; //添加回调函数
};
改进后的处理
_fun_callbacks[ID_UPLOAD_HEAD_ICON_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
const string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto md5 = root["md5"].asString();
auto seq = root["seq"].asInt();
auto name = root["name"].asString();
auto total_size = root["total_size"].asInt();
auto trans_size = root["trans_size"].asInt();
auto last = root["last"].asInt();
auto file_data = root["data"].asString();
auto uid = root["uid"].asInt();
auto token = root["token"].asString();
auto last_seq = root["last_seq"].asInt();
//转化为字符串
auto uid_str = std::to_string(uid);
auto file_path = ConfigMgr::Inst().GetFileOutPath();
auto file_path_str = (file_path / uid_str / name).string();
Json::Value rtvalue;
auto callback = [=](const Json::Value& result) {
// 在异步任务完成后调用
Json::Value rtvalue = result;
rtvalue["error"] = ErrorCodes::Success;
rtvalue["total_size"] = total_size;
rtvalue["seq"] = seq;
rtvalue["name"] = name;
rtvalue["trans_size"] = trans_size;
rtvalue["last"] = last;
rtvalue["md5"] = md5;
rtvalue["uid"] = uid;
rtvalue["last_seq"] = last_seq;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
};
//第一个包校验一下token是否合理
if (seq == 1) {
//从redis获取用户token是否正确
std::string uid_str = std::to_string(uid);
std::string token_key = USERTOKENPREFIX + uid_str;
std::string token_value = "";
bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
if (!success) {
rtvalue["error"] = ErrorCodes::UidInvalid;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
if (token_value != token) {
rtvalue["error"] = ErrorCodes::TokenInvalid;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
// 使用 std::hash 对字符串进行哈希
std::hash<std::string> hash_fn;
size_t hash_value = hash_fn(name); // 生成哈希值
int index = hash_value % FILE_WORKER_COUNT;
std::cout << "Hash value: " << hash_value << std::endl;
//第一个包
if (seq == 1) {
//构造数据存储
auto file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = name;
file_info->_seq = seq;
file_info->_total_size = total_size;
file_info->_trans_size = trans_size;
//LogicSystem::GetInstance()->AddMD5File(md5, file_info);
//改为用redis存储
bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
if (!success) {
rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
else {
//auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
//改为从redis中加载
auto file_info = RedisMgr::GetInstance()->GetFileInfo(name);
if (file_info == nullptr) {
rtvalue["error"] = ErrorCodes::FileNotExists;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
file_info->_seq = seq;
file_info->_trans_size = trans_size;
bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
if (!success) {
rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
return;
}
}
FileSystem::GetInstance()->PostMsgToQue(
std::make_shared<FileTask>(session, uid, file_path_str, name, seq, total_size,
trans_size, last, file_data, callback),
index
);
};
callback是我们封装的回调函数,投递给FileTask, 将来在后台线程处理FileTask时回调。
void FileWorker::task_callback(std::shared_ptr<FileTask> task)
{
// 解码
std::string decoded = base64_decode(task->_file_data);
auto file_path_str = task->_path;
auto last = task->_last;
//std::cout << "file_path_str is " << file_path_str << std::endl;
boost::filesystem::path file_path(file_path_str);
boost::filesystem::path dir_path = file_path.parent_path();
// 获取完整文件名(包含扩展名)
std::string filename = file_path.filename().string();
Json::Value result;
// Check if directory exists, if not, create it
if (!boost::filesystem::exists(dir_path)) {
if (!boost::filesystem::create_directories(dir_path)) {
std::cerr << "Failed to create directory: " << dir_path.string() << std::endl;
result["error"] = ErrorCodes::FileNotExists;
task->_callback(result);
return;
}
}
std::ofstream outfile;
//第一个包
if (task->_seq == 1) {
// 打开文件,如果存在则清空,不存在则创建
outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
}
else {
// 保存为文件
outfile.open(file_path_str, std::ios::binary | std::ios::app);
}
if (!outfile) {
std::cerr << "无法打开文件进行写入。" << std::endl;
result["error"] = ErrorCodes::FileWritePermissionFailed;
task->_callback(result);
return ;
}
outfile.write(decoded.data(), decoded.size());
if (!outfile) {
std::cerr << "写入文件失败。" << std::endl;
result["error"] = ErrorCodes::FileWritePermissionFailed;
task->_callback(result);
return ;
}
outfile.close();
if (last) {
std::cout << "文件已成功保存为: " << task->_name << std::endl;
//更新头像
MysqlMgr::GetInstance()->UpdateUserIcon(task->_uid, filename);
//获取用户信息
auto user_info = MysqlMgr::GetInstance()->GetUser(task->_uid);
if (user_info == nullptr) {
return ;
}
//将数据库内容写入redis缓存
Json::Value redis_root;
redis_root["uid"] = task->_uid;
redis_root["pwd"] = user_info->pwd;
redis_root["name"] = user_info->name;
redis_root["email"] = user_info->email;
redis_root["nick"] = user_info->nick;
redis_root["desc"] = user_info->desc;
redis_root["sex"] = user_info->sex;
redis_root["icon"] = user_info->icon;
std::string base_key = USER_BASE_INFO + std::to_string(task->_uid);
RedisMgr::GetInstance()->Set(base_key, redis_root.toStyledString());
}
if (task->_callback) {
task->_callback(result);
}
}
客户端上传逻辑修改
//上传头像
void UserInfoPage::slot_up_load()
{
// 1. 让对话框也能选 *.webp
QString filename = QFileDialog::getOpenFileName(
this,
tr("选择图片"),
QString(),
tr("图片文件 (*.png *.jpg *.jpeg *.bmp *.webp)")
);
if (filename.isEmpty())
return;
// 2. 直接用 QPixmap::load() 加载,无需手动区分格式
QPixmap inputImage;
if (!inputImage.load(filename)) {
QMessageBox::critical(
this,
tr("错误"),
tr("加载图片失败!请确认已部署 WebP 插件。"),
QMessageBox::Ok
);
return;
}
QPixmap image = ImageCropperDialog::getCroppedImage(filename, 600, 400, CropperShape::CIRCLE);
if (image.isNull())
return;
QPixmap scaledPixmap = image.scaled( ui->head_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
QString storageDir = QStandardPaths::writableLocation(
QStandardPaths::AppDataLocation);
// 2. 在其下再建一个 avatars 子目录
QDir dir(storageDir);
if (!dir.exists("avatars")) {
if (!dir.mkpath("avatars")) {
qWarning() << "无法创建 avatars 目录:" << dir.filePath("avatars");
QMessageBox::warning(
this,
tr("错误"),
tr("无法创建存储目录,请检查权限或磁盘空间。")
);
return;
}
}
// 3. 拼接最终的文件名 head.png
QString file_name = generateUniqueIconName();
QString filePath = dir.filePath("avatars" +
QString(QDir::separator()) + file_name);
// 4. 保存 scaledPixmap 为 PNG(无损、最高质量)
if (!scaledPixmap.save(filePath, "PNG")) {
QMessageBox::warning(
this,
tr("保存失败"),
tr("头像保存失败,请检查权限或磁盘空间。")
);
} else {
qDebug() << "头像已保存到:" << filePath;
// 以后读取直接用同一路径:storageDir/avatars/head.png
}
//实现头像上传
QFile file(filePath);
if(!file.open(QIODevice::ReadOnly)){
qWarning() << "Could not open file:" << file.errorString();
return;
}
//保存当前文件位置指针
qint64 originalPos = file.pos();
QCryptographicHash hash(QCryptographicHash::Md5);
if (!hash.addData(&file)) {
qWarning() << "Failed to read data from file:" << filePath;
return ;
}
// 5. 转化为16进制字符串
QString file_md5 = hash.result().toHex(); // 返回十六进制字符串
//读取文件内容并发送
QByteArray buffer;
int seq = 0;
//创建QFileInfo 对象
auto fileInfo = std::make_shared<QFileInfo>(filePath);
//获取文件名
QString fileName = fileInfo->fileName();
//文件名
qDebug() << "文件名是: " << fileName;
//获取文件大小
int total_size = fileInfo->size();
//最后一个发送序列
int last_seq = 0;
//获取最后一个发送序列
if(total_size % MAX_FILE_LEN){
last_seq = (total_size / MAX_FILE_LEN) +1;
}else{
last_seq = total_size / MAX_FILE_LEN;
}
// 恢复文件指针到原来的位置
file.seek(originalPos);
//每次读取MAX_FILE_LEN字节并发送
buffer = file.read(MAX_FILE_LEN);
QJsonObject jsonObj;
//将文件内容转化为Base64 编码(可选)
QString base64Data = buffer.toBase64();
++seq;
jsonObj["md5"] = file_md5;
jsonObj["name"] = file_name;
jsonObj["seq"] = seq;
jsonObj["trans_size"] = buffer.size() + (seq - 1) * MAX_FILE_LEN;
jsonObj["total_size"] = total_size;
jsonObj["token"] = UserMgr::GetInstance()->GetToken();
jsonObj["uid"] = UserMgr::GetInstance()->GetUid();
if (buffer.size() + (seq - 1) * MAX_FILE_LEN == total_size) {
jsonObj["last"] = 1;
} else {
jsonObj["last"] = 0;
}
jsonObj["data"] = base64Data;
jsonObj["last_seq"] = last_seq;
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
//将md5信息和文件信息关联存储
UserMgr::GetInstance()->AddNameFile(file_name, fileInfo);
//发送消息
FileTcpMgr::GetInstance()->SendData(ID_UPLOAD_HEAD_ICON_REQ, send_data);
file.close();
}
客户端加载头像
服务器将上传的头像信息保存为url更新到mysql中,接下来客户端登录需要加载新的头像
在ChatDialog的构造函数中将头像加载逻辑修改为
//模拟加载自己头像
QString head_icon = UserMgr::GetInstance()->GetIcon();
//使用正则表达式检查是否使用默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(head_icon);
if (match.hasMatch()) {
// 如果是默认头像(:/res/head_X.jpg 格式)
QPixmap pixmap(head_icon); // 加载默认头像图片
QPixmap scaledPixmap = pixmap.scaled(ui->side_head_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation);
ui->side_head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->side_head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
QString avatarPath = avatarsDir.filePath(QFileInfo(head_icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
QPixmap scaledPixmap = pixmap.scaled(ui->side_head_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation);
ui->side_head_lb->setPixmap(scaledPixmap);
ui->side_head_lb->setScaledContents(true);
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
聊天页面也需要修改头像加载逻辑
void ChatPage::AppendChatMsg(std::shared_ptr<ChatDataBase> msg)
{
auto self_info = UserMgr::GetInstance()->GetUserInfo();
ChatRole role;
if (msg->GetSendUid() == self_info->_uid) {
role = ChatRole::Self;
ChatItemBase* pChatItem = new ChatItemBase(role);
pChatItem->setUserName(self_info->_name);
// 使用正则表达式检查是否是默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(self_info->_icon);
if (match.hasMatch()) {
pChatItem->setUserIcon(QPixmap(self_info->_icon));
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
QString avatarPath = avatarsDir.filePath(QFileInfo(self_info->_icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
pChatItem->setUserIcon(pixmap);
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
QWidget* pBubble = nullptr;
if (msg->GetMsgType() == ChatMsgType::TEXT) {
pBubble = new TextBubble(role, msg->GetMsgContent());
}
pChatItem->setWidget(pBubble);
auto status = msg->GetStatus();
pChatItem->setStatus(status);
ui->chat_data_list->appendChatItem(pChatItem);
if (status == 0) {
_unrsp_item_map[msg->GetUniqueId()] = pChatItem;
}
}
else {
role = ChatRole::Other;
ChatItemBase* pChatItem = new ChatItemBase(role);
auto friend_info = UserMgr::GetInstance()->GetFriendById(msg->GetSendUid());
if (friend_info == nullptr) {
return;
}
pChatItem->setUserName(friend_info->_name);
// 使用正则表达式检查是否是默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(friend_info->_icon);
if (match.hasMatch()) {
pChatItem->setUserIcon(QPixmap(friend_info->_icon));
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
QString avatarPath = avatarsDir.filePath(QFileInfo(friend_info->_icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
pChatItem->setUserIcon(pixmap);
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
QWidget* pBubble = nullptr;
if (msg->GetMsgType() == ChatMsgType::TEXT) {
pBubble = new TextBubble(role, msg->GetMsgContent());
}
pChatItem->setWidget(pBubble);
auto status = msg->GetStatus();
pChatItem->setStatus(status);
ui->chat_data_list->appendChatItem(pChatItem);
if (status == 0) {
_unrsp_item_map[msg->GetUniqueId()] = pChatItem;
}
}
}
聊天列表中加载头像逻辑修改一下
void ChatUserWid::SetChatData(std::shared_ptr<ChatThreadData> chat_data) {
_chat_data = chat_data;
auto other_id = _chat_data->GetOtherId();
auto other_info = UserMgr::GetInstance()->GetFriendById(other_id);
// 加载图片
QString head_icon = UserMgr::GetInstance()->GetIcon();
// 使用正则表达式检查是否是默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(other_info->_icon);
if (match.hasMatch()) {
// 如果是默认头像(:/res/head_X.jpg 格式)
QPixmap pixmap(other_info->_icon); // 加载默认头像图片
QPixmap scaledPixmap = pixmap.scaled(ui->icon_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation);
ui->icon_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->icon_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
QString avatarPath = avatarsDir.filePath(QFileInfo(other_info->_icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
QPixmap scaledPixmap = pixmap.scaled(ui->icon_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation);
ui->icon_lb->setPixmap(scaledPixmap);
ui->icon_lb->setScaledContents(true);
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
ui->user_name_lb->setText(other_info->_name);
ui->user_chat_lb->setText(chat_data->GetLastMsg());
}
用户信息加载头像
UserInfoPage::UserInfoPage(QWidget *parent) :
QWidget(parent),
ui(new Ui::UserInfoPage)
{
ui->setupUi(this);
auto icon = UserMgr::GetInstance()->GetIcon();
qDebug() << "icon is " << icon ;
//使用正则表达式检查是否使用默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(icon);
if (match.hasMatch()) {
QPixmap pixmap(icon);
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
QString avatarPath = avatarsDir.filePath(QFileInfo(icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
//获取nick
auto nick = UserMgr::GetInstance()->GetNick();
//获取name
auto name = UserMgr::GetInstance()->GetName();
//描述
auto desc = UserMgr::GetInstance()->GetDesc();
ui->nick_ed->setText(nick);
ui->name_ed->setText(name);
ui->desc_ed->setText(desc);
//连接上
connect(ui->up_btn, &QPushButton::clicked, this, &UserInfoPage::slot_up_load);
}
测试效果
客户端断点下载资源
客户端请求下载
在客户端加载本地资源发现不存在的时候,需要请求服务器,获取资源。
如果资源比较大,需要分批下载,也就是支持断点下载。
我们先拿UserInfoPage举例
UserInfoPage::UserInfoPage(QWidget *parent) :
QWidget(parent),
ui(new Ui::UserInfoPage)
{
ui->setupUi(this);
auto icon = UserMgr::GetInstance()->GetIcon();
qDebug() << "icon is " << icon ;
//使用正则表达式检查是否使用默认头像
QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
QRegularExpressionMatch match = regex.match(icon);
if (match.hasMatch()) {
QPixmap pixmap(icon);
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
// 如果是用户上传的头像,获取存储目录
QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir avatarsDir(storageDir + "/avatars");
// 确保目录存在
if (avatarsDir.exists()) {
auto file_name = QFileInfo(icon).fileName();
QString avatarPath = avatarsDir.filePath(QFileInfo(icon).fileName()); // 获取上传头像的完整路径
QPixmap pixmap(avatarPath); // 加载上传的头像图片
if (!pixmap.isNull()) {
//判断是否正在下载
bool is_loading = UserMgr::GetInstance()->IsDownLoading(file_name);
if (is_loading) {
qWarning() << "正在下载: " << file_name;
//先加载默认的
QPixmap pixmap(":/res/head_1.jpg");
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
return;
}
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
}
else {
qWarning() << "无法加载上传的头像:" << avatarPath;
UserMgr::GetInstance()->AddLabelToReset(avatarPath, ui->head_lb);
//先加载默认的
QPixmap pixmap(":/res/head_1.jpg");
QPixmap scaledPixmap = pixmap.scaled(ui->head_lb->size(),
Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
ui->head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
ui->head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
//判断是否正在下载
bool is_loading = UserMgr::GetInstance()->IsDownLoading(file_name);
if (is_loading) {
qWarning() << "正在下载: " << file_name;
return;
}
//发送请求获取资源
auto download_info = std::make_shared<DownloadInfo>();
download_info->_name = file_name;
download_info->_current_size = 0;
download_info->_seq = 1;
download_info->_total_size = 0;
download_info->_client_path = avatarPath;
//添加文件到管理者
UserMgr::GetInstance()->AddDownloadFile(file_name, download_info);
//发送消息
FileTcpMgr::GetInstance()->SendDownloadInfo(download_info);
}
}
else {
qWarning() << "头像存储目录不存在:" << avatarsDir.path();
}
}
//获取nick
auto nick = UserMgr::GetInstance()->GetNick();
//获取name
auto name = UserMgr::GetInstance()->GetName();
//描述
auto desc = UserMgr::GetInstance()->GetDesc();
ui->nick_ed->setText(nick);
ui->name_ed->setText(name);
ui->desc_ed->setText(desc);
//连接上
connect(ui->up_btn, &QPushButton::clicked, this, &UserInfoPage::slot_up_load);
}
如果本地资源不存在,则需要向服务器请求。
封装请求资源接口
判断资源是否正在下载
bool UserMgr::IsDownLoading(QString name) {
std::lock_guard<std::mutex> lock(_down_load_mtx);
auto iter = _name_to_download_info.find(name);
if (iter == _name_to_download_info.end()) {
return false;
}
return true;
}
如果资源加载成功,很可能处于正在下载,所以也要判断一下。
如果资源未加载成功,则需要向服务器下载资源。先讲要下载的资源和要加载资源的空间缓存起来。
void UserMgr::AddLabelToReset(QString path, QLabel* label)
{
auto iter = _name_to_reset_labels.find(path);
if (iter == _name_to_reset_labels.end()) {
QList<QLabel*> list;
list.append(label);
_name_to_reset_labels.insert(path, list);
return;
}
iter->append(label);
}
结构如下图
更新正在下载资源
void UserMgr::AddDownloadFile(QString name,
std::shared_ptr<DownloadInfo> file_info) {
std::lock_guard<std::mutex> lock(_down_load_mtx);
_name_to_download_info[name] = file_info;
}
发送下载请求
void FileTcpMgr::SendDownloadInfo(std::shared_ptr<DownloadInfo> download) {
QJsonObject jsonObj;
jsonObj["name"] = download->_name;
jsonObj["seq"] = download->_seq;
jsonObj["trans_size"] = 0;
jsonObj["total_size"] = 0;
jsonObj["token"] = UserMgr::GetInstance()->GetToken();
jsonObj["uid"] = UserMgr::GetInstance()->GetUid();
jsonObj["client_path"] = download->_client_path;
QJsonDocument doc(jsonObj);
auto send_data = doc.toJson();
SendData(ID_DOWN_LOAD_FILE_REQ, send_data);
}
接收服务器回传
_handlers.insert(ID_DOWN_LOAD_FILE_RSP, [this](ReqId id, int len, QByteArray data) {
Q_UNUSED(len);
qDebug() << "handle id is " << id << " data is " << data;
// 将QByteArray转换为QJsonDocument
QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
// 检查转换是否成功
if (jsonDoc.isNull()) {
qDebug() << "Failed to create QJsonDocument.";
return;
}
QJsonObject jsonObj = jsonDoc.object();
if (!jsonObj.contains("error")) {
int err = ErrorCodes::ERR_JSON;
qDebug() << "parse create private chat json parse failed " << err;
return;
}
int err = jsonObj["error"].toInt();
if (err != ErrorCodes::SUCCESS) {
qDebug() << "get create private chat failed, error is " << err;
return;
}
qDebug() << "Receive download file info rsp success";
QString base64Data = jsonObj["data"].toString();
QString clientPath = jsonObj["client_path"].toString();
int seq = jsonObj["seq"].toInt();
bool is_last = jsonObj["is_last"].toBool();
QString total_size_str = jsonObj["total_size"].toString();
qint64 total_size = total_size_str.toLongLong(nullptr);
QString current_size_str = jsonObj["current_size"].toString();
qint64 current_size = current_size_str.toLongLong(nullptr);
QString name = jsonObj["name"].toString();
auto file_info = UserMgr::GetInstance()->GetDownloadInfo(name);
if (file_info == nullptr) {
qDebug() << "file: " << name << " not found";
return;
}
file_info->_current_size = current_size;
file_info->_total_size = total_size;
//Base64解码
QByteArray decodedData = QByteArray::fromBase64(base64Data.toUtf8());
QFile file(clientPath);
// 根据 seq 决定打开模式
QIODevice::OpenMode mode;
if (seq == 1) {
// 第一个包,覆盖写入
mode = QIODevice::WriteOnly;
}
else {
// 后续包,追加写入
mode = QIODevice::WriteOnly | QIODevice::Append;
}
if (!file.open(mode)) {
qDebug() << "Failed to open file for writing:" << clientPath;
qDebug() << "Error:" << file.errorString();
return;
}
qint64 bytesWritten = file.write(decodedData);
if (bytesWritten != decodedData.size()) {
qDebug() << "Failed to write all data. Written:" << bytesWritten
<< "Expected:" << decodedData.size();
}
file.close();
qDebug() << "Successfully wrote" << bytesWritten << "bytes to file";
qDebug() << "Progress:" << current_size << "/" << total_size
<< "(" << (current_size * 100 / total_size) << "%)";
if (is_last) {
qDebug() << "File download completed:" << clientPath;
UserMgr::GetInstance()->RmvDownloadFile(name);
//发送信号通知主界面重新加载label
emit sig_reset_label_icon(clientPath);
}
else {
//继续请求
file_info->_seq = seq+1;
FileTcpMgr::GetInstance()->SendDownloadInfo(file_info);
}
});
- 判断seq是否为1,如果为1则说明新的文件,需要创建并保存
- 如果seq不为1,则说明是续传文件,更新追加就可以了
- 如果
is_last字段为true,说明是最后一个包,那么移除缓存的下载信息,同时将信息发送到主界面更新图标
更新页面逻辑
在ChatDialog界面中响应这个信号
//重置label icon
connect(FileTcpMgr::GetInstance().get(), &FileTcpMgr::sig_reset_label_icon, this, &ChatDialog::slot_reset_icon);
槽函数处理
void ChatDialog::slot_reset_icon(QString path) {
UserMgr::GetInstance()->ResetLabelIcon(path);
}
UserMgr中封装重置icon逻辑
void UserMgr::ResetLabelIcon(QString path)
{
auto iter = _name_to_reset_labels.find(path);
if (iter == _name_to_reset_labels.end()) {
return;
}
for (auto ele_iter = iter.value().begin(); ele_iter != iter.value().end(); ele_iter++) {
QPixmap pixmap(path); // 加载上传的头像图片
if (!pixmap.isNull()) {
QPixmap scaledPixmap = pixmap.scaled((*ele_iter)->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation);
(*ele_iter)->setPixmap(scaledPixmap);
(*ele_iter)->setScaledContents(true);
}
else {
qWarning() << "无法加载上传的头像:" << path;
}
}
_name_to_reset_labels.erase(iter);
}
测试效果
服务器断点传输逻辑
增加下载worker
class DownloadWorker {
public:
DownloadWorker();
~DownloadWorker();
void PostTask(std::shared_ptr<DownloadTask> task);
private:
void task_callback(std::shared_ptr<DownloadTask>);
std::thread _work_thread;
std::queue<std::shared_ptr<DownloadTask>> _task_que;
std::atomic<bool> _b_stop;
std::mutex _mtx;
std::condition_variable _cv;
};
DownloadWorker处理逻辑和之前的FileWorker类似
DownloadWorker::DownloadWorker() :_b_stop(false)
{
_work_thread = std::thread([this]() {
while (!_b_stop) {
std::unique_lock<std::mutex> lock(_mtx);
_cv.wait(lock, [this]() {
if (_b_stop) {
return true;
}
if (_task_que.empty()) {
return false;
}
return true;
});
if (_b_stop) {
break;
}
auto task = _task_que.front();
_task_que.pop();
task_callback(task);
}
});
}
DownloadWorker::~DownloadWorker()
{
_b_stop = true;
_cv.notify_one();
_work_thread.join();
}
void DownloadWorker::PostTask(std::shared_ptr<DownloadTask> task)
{
{
std::lock_guard<std::mutex> lock(_mtx);
_task_que.push(task);
}
_cv.notify_one();
}
void DownloadWorker::task_callback(std::shared_ptr<DownloadTask> task)
{
// 解码
auto file_path_str = task->_file_path;
//std::cout << "file_path_str is " << file_path_str << std::endl;
boost::filesystem::path file_path(file_path_str);
// 获取完整文件名(包含扩展名)
std::string filename = file_path.filename().string();
Json::Value result;
result["error"] = ErrorCodes::Success;
if (!boost::filesystem::exists(file_path)) {
std::cerr << "文件不存在: " << file_path_str << std::endl;
result["error"] = ErrorCodes::FileNotExists;
task->_callback(result);
return;
}
std::ifstream infile(file_path_str, std::ios::binary);
if (!infile) {
std::cerr << "无法打开文件进行读取。" << std::endl;
result["error"] = ErrorCodes::FileReadPermissionFailed;
task->_callback(result);
return;
}
std::shared_ptr<FileInfo> file_info = nullptr;
if (task->_seq == 1) {
// 获取文件大小
infile.seekg(0, std::ios::end);
std::streamsize file_size = infile.tellg();
infile.seekg(0, std::ios::beg);
//如果为空,则创建FileInfo 构造数据存储
file_info = std::make_shared<FileInfo>();
file_info->_file_path_str = file_path_str;
file_info->_name = filename;
file_info->_seq = 1;
file_info->_total_size = file_size;
file_info->_trans_size = 0;
// 立即保存到 Redis,覆盖旧数据,设置过期时间
RedisMgr::GetInstance()->SetDownLoadInfo(filename, file_info);
std::cout << "[新下载] 文件: " << filename
<< ", 大小: " << file_size << " 字节" << std::endl;
}
else {
//断点续传,从 Redis 获取历史信息
file_info = RedisMgr::GetInstance()->GetDownloadInfo(filename);
if (file_info == nullptr) {
// Redis 中没有信息(可能过期了)
std::cerr << "断点续传失败,Redis 中无下载信息: " << filename << std::endl;
result["error"] = ErrorCodes::RedisReadErr;
task->_callback(result);
infile.close();
return;
}
// 验证序列号是否匹配
if (task->_seq != file_info->_seq) {
std::cerr << "序列号不匹配,期望: " << file_info->_seq
<< ", 实际: " << task->_seq << std::endl;
result["error"] = ErrorCodes::FileSeqInvalid;
task->_callback(result);
infile.close();
return;
}
std::cout << "[续传] 文件: " << filename
<< ", seq: " << task->_seq
<< ", 进度: " << file_info->_trans_size
<< "/" << file_info->_total_size << std::endl;
}
// 计算当前偏移量
std::streamsize offset = ((std::streamsize)task->_seq - 1) * MAX_FILE_LEN;
if (offset >= file_info->_total_size) {
std::cerr << "偏移量超出文件大小。" << std::endl;
result["error"] = ErrorCodes::FileOffsetInvalid;
task->_callback(result);
infile.close();
return;
}
// 定位到指定偏移量
infile.seekg(offset);
// 读取最多2048字节
char buffer[MAX_FILE_LEN];
infile.read(buffer, MAX_FILE_LEN);
//获取read实际读取多少字节
std::streamsize bytes_read = infile.gcount();
if (bytes_read <= 0) {
std::cerr << "读取文件失败。" << std::endl;
result["error"] = ErrorCodes::FileReadFailed;
task->_callback(result);
infile.close();
return;
}
// 将读取的数据进行base64编码
std::string data_to_encode(buffer, bytes_read);
std::string encoded_data = base64_encode(data_to_encode);
// 检查是否是最后一个包
std::streamsize current_pos = offset + bytes_read;
bool is_last = (current_pos >= file_info->_total_size);
// 设置返回结果
result["data"] = encoded_data;
result["seq"] = task->_seq;
result["total_size"] = std::to_string(file_info->_total_size);
result["current_size"] = std::to_string(current_pos);
result["is_last"] = is_last;
infile.close();
if (is_last) {
std::cout << "文件读取完成: " << file_path_str << std::endl;
RedisMgr::GetInstance()->DelDownLoadInfo(filename);
}
else {
//更新信息
file_info->_seq++;
file_info->_trans_size = offset + bytes_read;
//更新redis
RedisMgr::GetInstance()->SetDownLoadInfo(filename, file_info);
}
if (task->_callback) {
task->_callback(result);
}
}
在FileSystem中创建worker
FileSystem::FileSystem()
{
for (int i = 0; i < FILE_WORKER_COUNT; i++) {
_file_workers.push_back(std::make_shared<FileWorker>());
}
for (int i = 0; i < DOWN_LOAD_WORKER_COUNT; i++) {
_down_load_worker.push_back(std::make_shared<DownloadWorker>());
}
}
测试效果
将头像资源从本地删除后,重新登录时或者切换页面会引发资源重新加载,向服务器请求资源后再设置到界面显示。










