C++实现集群聊天服务器_c++集群聊天服务器-程序员宅基地

技术标签: c++  服务器  开发语言  

写在前面

所有代码加起来两千行,里面用到了语言、数据结构、计网、操作系统、数据库、redis等知识,可谓是麻雀虽小五脏俱全。

在这里插入图片描述

1、项目需求

  1. 客户端新用户注册
  2. 客户端用户登录
  3. 添加好友和添加群组
  4. 好友聊天
  5. 群组聊天
  6. 离线消息
  7. nginx配置tcp负载均衡
  8. 集群聊天系统支持客户端跨服务器通信

2、Json

Json是一种轻量级的数据交换格式。独立于编程语言、宜上手等特点使Json能够有效地提高网路传输效率。

接下来介绍Json的使用

#include "json.hpp"  
using json = nlohmann::json;

#include <iostream>
#include <string>
using namespace std;

int main()
{
    
    json js;
    js["id"] = 1;
    js["name"] = "zhang san";
    cout << "js: " << js << endl;

    string s = js.dump();           // 将json转为string
    cout << "s: " << s << endl;

    json js2 = json::parse(s);      // 将string转为json
    cout << "js2: " << js2 << endl;

    int id = js["id"].get<int>();   // 处理json里面的int
    cout << id << endl;
    return 0;
}

3、muduo

muduo 是一个网络库,给用户提供了两个主要的类:

  1. TcpServer: 用于编写服务端程序
  2. TcpClient: 用于编写客户端程序

muduo 的使用可以暂时理解为直接套板子即可。

它的搭板子流程大致如下:

3. 组合TcpServer对象
4. 创建eventloop事件循环对象的指针
5. 明确TcpServer构造函数需要什么参数,输出ChatServer的构造函数
6. 在当前服务器类的构造函数当中,注册处理连接的回调函数和处理读写的回调函数
7. 设置合适的服务端线程数量,muduo库会自己划分I/O线程和worker线程

下面提供一个测试代码

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <functional>
#include <string>

using namespace std;
using namespace muduo;
using namespace muduo::net;
using namespace placeholders;

class ChatServer
{
    
public:
    ChatServer(EventLoop* loop,                 //事件循环
            const InetAddress& listenAddr,      //IP + Port
            const string& nameArg)              //服务器名字
            :_server(loop, listenAddr, nameArg)
            ,_loop(loop)
    {
    
        //给服务器注册用户连接的创建和断开回调
        _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));

        //给服务器注册用户读写事件回调
        _server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
        
        //设置服务端的线程数量 1个I/O线程 3个worker线程
        _server.setThreadNum(4);
    }

    //开启事件循环
    void start()
    {
    
        _server.start();
    }
private:
    //专门处理用户的连接创建和断开
    void onConnection(const TcpConnectionPtr&conn)
    {
    
        if (conn->connected()) {
    
            cout << conn->peerAddress().toIpPort() << "->" <<
                conn->localAddress().toIpPort() << " state: online" << endl;
        } else {
    
            cout << conn->peerAddress().toIpPort() << "->" <<
                conn->localAddress().toIpPort() << " state: offline" << endl;
            conn->shutdown();
        }
    }

    //专门处理用户的读写事件
    void onMessage(const TcpConnectionPtr&conn, //连接
                    Buffer *buffer,                //缓冲区
                    Timestamp time)  
    {
    
        string buf = buffer->retrieveAllAsString();
        cout << "recv data:" << buf << "time: " << time.toString() << endl;
        string sendbuf = "来自服务端的消息: " + buf;
        conn->send(sendbuf);    //发送给客户端
    }   

    TcpServer _server; 
    EventLoop *_loop;
};

int main()
{
    
    EventLoop loop;     //epoll
    InetAddress addr("127.0.0.1", 6000);
    ChatServer server(&loop, addr, "ChatServer");

    server.start();
    loop.loop();

    return 0;
}

编译方式

g++ -o testmuduo my_muduo_server.cpp -lmuduo_net -lmuduo_base -lpthread

运行结果

在这里插入图片描述


分析上面的测试代码可知有几个是在写死了的板子以外的API

  • conn->connected() 是否连接
  • conn->peerAddress().toIpPort()conn->localAddress().toIpPort() 来源和本地的ip+端口
  • string buf = buffer->retrieveAllAsString() 服务端接收客户端传过来的数据
  • conn->send(sendbuf) 服务端将sendbuf发给客户端

4、CMake

CMake 相比于手写 Makefile 友好太多了。手写 Makefile 是一场噩梦。

CMake 使用起来就是指下编译器去哪个文件夹找文件,对于我做的这个项目来说还是挺容易的。

没有太大的难度,用的时候查下就行了,贴两个具有代表性的上来。

# 主入口
cmake_minimum_required(VERSION 3.0)
project(chat)

# 配置编译选项
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g)

# 配置最终的可执行文件输出的路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)

# 配置头文件的搜索路径
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_SOURCE_DIR}/include/server)
include_directories(${PROJECT_SOURCE_DIR}/include/server/db)
include_directories(${PROJECT_SOURCE_DIR}/include/server/model)
include_directories(${PROJECT_SOURCE_DIR}/include/server/redis)
include_directories(${PROJECT_SOURCE_DIR}/thirdparty)

# 加载子目录
add_subdirectory(src)
# src/server
# 定义一个SRC_LIST变量,包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
aux_source_directory(./db DB_LIST)
aux_source_directory(./model MODEL_LIST)
aux_source_directory(./redis REDIS_LIST)

# 指定生成可执行文件
add_executable(ChatServer ${SRC_LIST} ${DB_LIST} ${MODEL_LIST} ${REDIS_LIST})
# 指定可执行文件链接时需要依赖的库文件
target_link_libraries(ChatServer muduo_net muduo_base mysqlclient hiredis pthread)

5、MySQL

MySQL 模块只用关心调用数据库的 API 实现。

这里我用的是 数据库连接池 实现数据库的连接

可以点击上面的链接查看详细介绍。

6、网络模块

chatserver.hppchatserver.cpp。和testmuduo的代码几乎一样,主要处理连接事件和读写事件的成功接收发送。

连接:如果客户端断开了连接,从map表删除用户的连接信息、将用户更新为下线。
消息:接收所有消息后反序列化,通过解析js["msgid"]来获得一个业务处理器handler,再调用相应的函数。

//chatserver.cpp
#include "chatserver.hpp"
#include "json.hpp"
#include "chatservice.hpp"

#include <iostream>
#include <functional>
#include <string>
using namespace std;
using namespace placeholders;
using json = nlohmann::json;

ChatServer::ChatServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg)
            :_server(loop, listenAddr, nameArg) ,_loop(loop)
{
    
    //注册连接回调
    _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
    
    //注册消息回调
    _server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
    
    //设置线程数量
    _server.setThreadNum(4);
}

//启动服务
void ChatServer::start()
{
    
    _server.start();
}

// 上报连接相关信息的回调函数
void ChatServer::onConnection(const TcpConnectionPtr& conn)
{
    
    // 客户端断开连接
    if (!conn->connected()) 
	{
    
        ChatService::instance()->clientCloseException(conn);
        conn->shutdown();
    }
}

// 上报读写事件相关信息的回调函数
void ChatServer::onMessage(const TcpConnectionPtr& conn,
                        Buffer* buffer,
                        Timestamp time)
{
    
    string buf = buffer->retrieveAllAsString();

    cout << buf << endl;
    // 数据的反序列化
    json js = json::parse(buf);
    // 目的: 完全解耦网络模块的代码和业务模块的代码
    // 通过js["msgid"] 获取一个业务处理器handler 
    auto msgHandler = ChatService::instance()->getHandler(js["msgid"].get<int>()); //json类型转成int
    // 回调消息绑定好的事件处理器, 来执行相应的业务
    msgHandler(conn, js, time);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
}

7、业务模块

前面的数据库模块和网络模块解决了我们调用的API,接下来就是业务逻辑了!直接上代码说明下业务模块需要解决的问题

//chatservice.hpp
#ifndef CHATSERVICE_H
#define CHATSERVICE_H

#include <muduo/net/TcpConnection.h>
#include <unordered_map>
#include <functional>
#include <mutex>
using namespace std;
using namespace muduo;
using namespace muduo::net;

#include "redis.hpp"
#include "groupmodel.hpp"
#include "friendmodel.hpp"
#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "json.hpp"

using json = nlohmann::json;
//处理消息的事件回调方法类型
using MsgHandler = std::function<void(const TcpConnectionPtr &conn, json &js, Timestamp)>;

//聊天服务器业务类
class ChatService
{
    
public:
    // 获取单例对象的接口函数
    static ChatService* instance();
    // 处理登录业务
    void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 处理注册业务
    void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 一对一聊天业务
    void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 添加好友业务
    void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 创建群组业务
    void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 加入群组业务
    void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 群组聊天业务
    void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 处理客户端异常退出
    void clientCloseException(const TcpConnectionPtr &conn);
    // 服务器异常,业务重置方法
    void reset();
	// 获取消息对应的处理器
    MsgHandler getHandler(int msgid);
    // 从redis消息队列中获取订阅的消息
    void handleRedisSubscribeMessage(int, string);

private:
    ChatService()
    // 存储消息id和其对应的业务处理方法
    unordered_map<int, MsgHandler> _msgHandlerMap;
    // 存储在线用户的通信连接 线程安全
    unordered_map<int, TcpConnectionPtr> _userConnMap;
    // 定义互斥锁,保证_userConnMap的线程安全
    mutex _connMutex;
    // 数据操作类对象
    UserModel _userModel;
    OfflineMsgModel _offlineMsgModel;
    FriendModel _friendModel;
    GroupModel _groupModel;
    // redis操作对象
    Redis _redis;
};

#endif

再处理单个业务时先解析json字符串得到数据,再通过这些数据进行相应的处理。需要注意的是STL本身是线程不安全的,所以在处理STL时需要加锁。


我这里具体分析下一对一聊天业务,其它的业务处理流程大同小异。

在构造函数里会对业务相关的事件处理注册回调函数

_msgHandlerMap.insert({
    ONE_CHAT_MSG, std::bind(&ChatService::oneChat, this, _1, _2, _3)});
// 还有很多,这里不一一列举

服务器接收到json服务端和客户端会相互约定好发送格式)后会解析出信息。比如一对一聊天的json格式为:

json js;
js["msgid"] = ONE_CHAT_MSG;
js["id"] = g_currentUser.getId();
js["name"] = g_currentUser.getName();
js["toid"] = friendid;
js["msg"] = message;
js["time"] = getCurrentTime();
string buffer = js.dump();

然后再根据toid是否在线选择及时发送消息还是存储到离线消息里面。

// 一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
    
    int toid = js["toid"].get<int>();
    {
    
        lock_guard<mutex> lock(_connMutex);
        auto it = _userConnMap.find(toid);
        if (it != _userConnMap.end()) 
        {
    
            // toid 在线,转发消息  服务器主动推送消息给toid用户
            it->second->send(js.dump());
            return;
        }
    }
    // 查询toid是否在线 
    User user = _userModel.query(toid);
    if (user.getState() == "online")
    {
    
        _redis.publish(toid, js.dump());
        return;
    }
    // toid 不在线,存储离线消息
    _offlineMsgModel.insert(toid, js.dump());
}

8、ngnix

ngnix 是个负载均衡器,用于服务器集群。需要自行配置tcp负载均衡。

root 用户下进行如下配置:
在这里插入图片描述

# ngnix tcp loadbalance config
stream {
    
	upstream MyServer {
    
		server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
		server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
	}

	server {
    
		proxy_connect_timeout 1s;
		#proxy_timeout 3s;
		listen 8000;
		proxy_pass MyServer;
		tcp_nodelay on;
	}
}

再重启(ngnix支持平滑重启,可是我没试成功)下就ok了。

效果如下
在这里插入图片描述

9、redis

ngnix 实现了集群,可是如果有两个用户登录在了不同的服务器,他们应该怎样通信呢?最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统
松耦合,提高服务器的响应能力,节省服务器的带宽资源。
在这里插入图片描述

redis 采用的 发布-订阅 模式,本质上是一个存储 键值对 的缓存数据库。

redis 的简易使用
在这里插入图片描述
在这里插入图片描述


redis 要实现的功能如下

//redis.hpp
#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;

/*
redis作为集群服务器通信的基于发布-订阅消息队列时,会遇到两个难搞的bug问题
https://blog.csdn.net/QIANGWEIYUAN/article/details/97895611
*/
class Redis
{
    
public:
    Redis();
    ~Redis();
    // 连接redis服务器 
    bool connect();
    // 向redis指定的通道channel发布消息
    bool publish(int channel, string message);
    // 向redis指定的通道subscribe订阅消息
    bool subscribe(int channel);
    // 向redis指定的通道unsubscribe取消订阅消息
    bool unsubscribe(int channel);
    // 在独立线程中接收订阅通道中的消息
    void observer_channel_message();
    // 初始化向业务层上报通道消息的回调对象
    void init_notify_handler(function<void(int, string)> fn);
private:
    // hiredis同步上下文对象,负责publish消息
    redisContext *_publish_context;
    // hiredis同步上下文对象,负责subscribe消息
    redisContext *_subcribe_context;
    // 回调操作,收到订阅的消息,给service层上报
    function<void(int, string)> _notify_message_handler;
};
#endif

连接redis服务器: 发布消息和订阅消息绑定ip+端口;开个单独的线程用于监听通道上的事件。
发布消息: 直接调用 redisCommand。这个api包含三步:

#1 redisAppendCommand 把消息写到本地缓存
#2 redisBufferWrite 发送给服务器
#3 redisGetReply 阻塞等待消息

订阅消息: 等待消息是阻塞的,所以不要在这个函数里面阻塞等待。只进行前两步。
取消订阅: 和订阅的大致步骤一样
接收订阅的消息: 独立线程。调用ChatService在构造函数传过来的函数名handleRedisSubscribeMessage

10、表设计

User

字段名称 字段类型 字段说明 约束
id INT 用户id PRIMARY KEY、AUTO_INCREMENT
name VARCHAR(50) 用户名 NOT NULL, UNIQUE
password VARCHAR(50) 用户密码 NOT NULL
state ENUM(‘online’, ‘offline’) 当前登录状态 DEFAULT ‘offline’

Friend

字段名称 字段类型 字段说明 约束
userid INT 用户id NOT NULL、联合主键
friendid INT 好友id NOT NULL、联合主键

AllGroup

字段名称 字段类型 字段说明 约束
id INT 组id PRIMARY KEY、AUTO_INCREMENT
groupname VARCHAR(50) 组名称 NOT NULL,UNIQUE
groupdesc VARCHAR(200) 组功能描述 DEFAULT ‘’

GroupUser

字段名称 字段类型 字段说明 约束
groupid INT 组id NOT NULL、联合主键
userid INT 组员id NOT NULL、联合主键
grouprole ENUM(‘creator’, ‘normal’) 组内角色 DEFAULT ‘normal’

OfflineMessage

字段名称 字段类型 字段说明 约束
userid INT 用户id NOT NULL
message VARCHAR(500) 离线消息(存储Json字符串) NOT NULL
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_51368613/article/details/128719758

智能推荐

在Vue项目中引入 ECharts 3D 路径图 Flights GL(需安装echarts_echart flights gl-程序员宅基地

文章浏览阅读264次,点赞5次,收藏2次。(3)已全局引入,单页面中无需再引。(1)注意此处有坑,如果我们直接使用 npm 命令下载 echarts-gl 是无法下载的,会报错(名字冲突,拒绝下载),所以我们下载 echarts-gl 的低版本;(2)然后,同样在 src 下 main.js 中全局引入 echarts-gl;(3)已全局引入,单页面中无需再引。(1)通过 npm 安装 jQuery;(2)在 src 下 main.js 中引入 jQuery;(3)在需要使用到 jQuery 的页面中引入;_echart flights gl

MIMO技术中的各种增益分析_mimo增益两-3db 三-5db-程序员宅基地

文章浏览阅读2.1w次,点赞6次,收藏42次。1.阵列增益由于接收机对接收信号做的相干合并而获取的平均接收信噪比的提高。在发送端不知道信道信息的情况下,MIMO信道可以获得的阵列增益与接收天线数成正比。2.分集增益接收分集:被用于SIMO信道,分集阶数的最大值等于接收天线的数目。发射分集:常用于MISO信道,可以在发射机已知或未知下行信道状态信息的情况下进行。时空编码技术是一种特殊的发射分集,依靠特定编码方案,在无下行信道信息情..._mimo增益两-3db 三-5db

activiti实战之springboot示例及在线编辑器集成(三)_spring boot serviceimpl在线编辑-程序员宅基地

文章浏览阅读3k次。写在前面:从《库表与服务》这篇中的‘服务’可以看出来,processEngine是核心关键 通过流程引擎创建出对应需要的服务。从使用RepositoryService部署流程 可以创建模型并转化成部署文件进行部署,部署完成启动后 可以使用RuntimeService查看运行状态的示例,接下来任务的流程可以使用TaskService进行任务的签收 办理 指派。最后 可以使用HistoryServic..._spring boot serviceimpl在线编辑

tdd 测试_让我们用TDD,Mocha,Chai和jsdom测试React组件-程序员宅基地

文章浏览阅读312次。tdd 测试by Anthony Ng 由Anthony Ng 让我们用TDD,Mocha,Chai和jsdom测试React组件 (Let’s test React components with TDD, Mocha, Chai, and jsdom)In this tutorial, we’ll learn how to write tests for React Components..._mocha tdd

dockerfile volume 添加多个卷踩坑_dockerfile 多个volume-程序员宅基地

文章浏览阅读1.7k次。dockerfile 在使用volume添加多个卷时分隔符直接’,'不能加空格例如:VOLUME ['/web/xxx/conf','/web/xxx/log']不能是:VOLUME ['/web/xxx/conf', '/web/xxx/log']多加的空格会导致卷目录下出现名称为 “]” 的文件夹(或者是其他名称奇怪的文件夹)当使用单引号(’)而不是双引号(")将卷包含起来时,docker run -v /web/xxx:/web/xxx --name test image-name:i_dockerfile 多个volume

即学即用的 30 段 Python 实用代码-程序员宅基地

文章浏览阅读95次。图片来自 Jantine Doornbos on Unsplash原标题 |30 Helpful Python Snippets That You Can Learn in 30 Se..._python 将字符串转换为小写,然后从中删除非字母数字字符。

随便推点

java常用数据结构有哪些_java数据结构有哪些-程序员宅基地

文章浏览阅读9.5k次,点赞12次,收藏88次。java数据结构有:1、数组;2、链表,一种递归的数据结构;3、栈,按照“后进先出”、“先进后出”的原则来存储数据;4、队列;5、树,是由 n(n>0)个有限节点组成的一个具有层次关系的集合;6、堆;7、图;8、哈希表。本教程操作环境:windows7系统、java8版、DELL G3电脑。Java常见数据结构这 8 种数据结构有什么区别呢?①、数组优点:按照索引查询元素的速度很快;按照索引遍历数组也很方便。缺点:数组的大小在创建后就确定了,无法扩容;数组只能存储一._java数据结构有哪些

Django之通用类视图DetailView_django detailview-程序员宅基地

文章浏览阅读1.3k次,点赞2次,收藏5次。本文参考于Django2.2文档视图函数(或简称视图)只是一个Python函数,它接受Web请求并返回Web响应。该响应可以是网页的HTML内容,重定向,404错误,XML文档或图像。。。真的。视图本身包含返回该响应所需的任何任意逻辑。该代码可以存在于您想要的任何地方,只要它在Python路径上即可。可以说,没有其他要求-没有“魔术”。为了将代码放在某处,约定是将视图放在一个名为的文件中vie..._django detailview

世界国家及中国各省市级地图ArcGIS MXD/SHP/QGIS/JSON/SQL数据文件【免费下载】_行政区划图mxd-程序员宅基地

文章浏览阅读7.4w次,点赞14次,收藏101次。参考来自:http://www.ourd3js.com/世界地图和主要国家的 JSON 文件世界地图: world.json美洲:美国:USA.json加拿大:Canada.json巴西:Brazil.json大洋洲:澳大利亚:Australia.json新西兰:NewZealand.json亚洲:印度:Ind_行政区划图mxd

Cannot connect to the Maven process_cannot connect to the maven process. try again lat-程序员宅基地

文章浏览阅读1w次,点赞2次,收藏2次。maven配置的原因修改settings文件的mirrors <mirrors> <!-- mirror | Specifies a repository mirror site to use instead of a given repository. The repository that | this mirror serves has an ID that matches the mirrorOf element of this mirror_cannot connect to the maven process. try again later. if the problem persist

Matlab自带图库及添加自己的图像_matlab自带的图像库-程序员宅基地

文章浏览阅读1.5w次,点赞11次,收藏36次。matlab自带图像存在在安装目录里面:D:\Program Files\MATLAB\R2019b\toolbox\images\imdata(视安装路径不同而不同):可以在matlab中直接使用:>> imshow(imread('coins.png'));可以通过自己往改目录中添加图像实现直接使用。例如lena图像已从matlab自带库中删除了:>> imshow(imread('lena.png'));则报错:Error using imread>_matlab自带的图像库

不用u盘 装deepin系统_安装Deepin国产操作系统?只需一个U盘就够了,超简单-程序员宅基地

文章浏览阅读7.3k次。距离Win7系统停止服务已经3个多月马上4个月了,与此同时,人们对国产操作系统的热度和期望值也是越来越高。在诸多国产操作系统中,Deepin系统是目前知名度最高的一个,而且无论在使用人数还是系统的性能上都是比较有优势的。为了让一些喜爱国产操作的小伙伴早日体验Deepin,接下来小雨就教大家如何安装Deepin国产操作系统?只需一个U盘就够了!Deepin系统对硬件的要求要使系统能够流畅地运行,就必..._win安装deepin无u盘

推荐文章

热门文章

相关标签