1. boost库

1.1 概述

网络编程是现代软件开发中无可替代的一环,无论是构建庞大的分布式系统还是小型的桌面应用,都离不开网络的支持。Boost.Asio起源于Boost库,是一款专为网络I/O、定时器、串行端口通信设计的库,提供了同步和异步的编程模型,用以简化网络和低级I/O的操作。它的设计初衷是提供一套简洁、一致且功能全面的接口,以满足开发者在多样化网络编程场景下的需求。

1.2 库的特性与优势

Boost.Asio是一款功能全面的库,其主要特性与优势如下:

  • 异步编程模型:它通过异步操作和回调机制,允许程序在等待I/O操作完成时继续执行其他任务,从而提高了程序的效率和响应速度

  • 多协议支持: 它支持TCP、UDP、SSL等多种协议,可以帮助开发者快速实现各种网络应用

  • 跨平台兼容性: Boost.Asio可以运行在Windows、Linux、macOS等多个平台上,保证了代码的可移植性和可维护性

  • 可扩展性: 开发者可以基于Boost.Asio轻松实现自定义协议和服务,实现特定的业务逻辑

  • 高性能: Boost.Asio的设计充分考虑了性能因素,尤其在高并发环境下表现出色

2. 主要函数的创建

2.1 终端节点的创建

所谓终端节点就是用来通信的端对端的节点,可以通过ip地址和端口构造,其它节点可以连接这个终端节点做通信。

客户端构造终端节点:通过对端的ip和端口构造一个endpoint,用这个endpoint和其通信

1
2
3
4
5
6
7
8
9
10
11
12
13
int client_end_point() {
std::string raw_ip_address = "192.168.88.93"; //对端(服务端)的地址
unsigned short port_num = 9527; //对端(服务端)的端口号

boost::system::error_code ec; //定义一个错误关键字
asio::ip::address ip_address = asio::ip::address::from_string(raw_ip_address, ec); //将字符串形式的IP地址转换为 asio::ip::address 类型的对象
if (ec.value() != 0) { //如果转换失败,打印错误码和错误描述
std::cout << "Failed to parse the IP address.Error code = " << ec.value() << ".Message is" << ec.message();
return ec.value(); //转换失败直接退出返回错误码
}
asio::ip::tcp::endpoint ep(ip_address, port_num); //转换成功就生成端点
return 0;
}

服务端构造终端端点:则只需根据本地地址绑定就可以生成endpoint

1
2
3
4
5
6
int server_end_point() {
unsigned short port_num = 9527; //用于指定网络服务监听的端口
asio::ip::address ip_address = asio::ip::address_v6::any(); //表示它将接受发送到服务器上任何 IPv6 地址的数据。
asio::ip::tcp::endpoint ep(ip_address, port_num); //生成服务器的端点节点
return 0;
}

2.2 创建socket

客户端创建socket分为4步,创建上下文iocontext、选择协议、生成socket、打开socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//创建和配置一个 TCP 套接字
int create_tcp_socket() {
//创建了一个 asio::io_context 对象 ioc,它是 Asio 库中的核心组件,用于管理 I/O 操作和执行异步任务(上下文服务)
asio::io_context ioc;
//创建了一个 asio::ip::tcp 协议的实例 protocol,使用 v4() 指定使用 IPv4 协议
asio::ip::tcp protocol = asio::ip::tcp::v4();
//创建了一个asio::ip::tcp::socket 对象 sock,它是一个 TCP 套接字,与 ioc 关联以进行 I/O 操作
asio::ip::tcp::socket sock(ioc);
//声明了一个 boost::system::error_code 对象 ec,用于接收可能发生的错误代码
boost::system::error_code ec;
//调用open方法打开套接字,使用之前创建的protocol对象指定协议,如果操作成功,ec将不会被设置;如果发生错误,ec将包含错误代码
sock.open(protocol, ec);
if (ec.value() != 0) { //检查 ec 是否包含错误代码。如果 ec.value() 不等于 0,表示发生了错误。
std::cout<< "Failed to parse the IP address.Error code = " << ec.value() << ".Message:" << ec.message();
return ec.value(); //发生错误,返回错误码
}
return 0;
}

服务端创建socket,需要生成一个acceptor的socket,用来接收新的连接

1
2
3
4
5
6
int create_acceptor_socket() {
asio::io_context ios;
//下面表示服务端接收连接,只接收ipv4的连接,并且是发往本地ip,端口3333的
asio::ip::tcp::acceptor a(ios, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 3333)); //生成一个asio::ip::tcp::acceptor对象a,是v4的协议
return 0;
}

2.3 绑定acceptor

服务器要将其绑定到指定的断点,所有连接这个端点的连接都可以被接收到。

1
2
3
4
5
6
7
8
9
10
11
12
13
int bind_accept_socket() {
unsigned short port_num = 9527; //定义端口号
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num); //生成端点,接收任何地址的连接
asio::io_context ios; //创建服务,让服务器知道accepter是绑定在哪个服务上的
asio::ip::tcp::acceptor acceptor(ios, ep.protocol()); //生成accepter
boost::system::error_code ec;
acceptor.bind(ep, ec); //绑定(生成的acceptor,绑定了一个端点,端点是本地的任何一个地址和端口号9527)
if (ec.value() != 0) { //检查 ec 是否包含错误代码。如果 ec.value() 不等于 0,表示发生了错误。
std::cout << "Failed to parse the IP address.Error code = " << ec.value() << ".Message:" << ec.message();
return ec.value(); //发生错误,返回错误码
}
return 0;
}

连接指定的端点,作为客户端可以连接服务器指定的端点进行连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int connect_to_end() {
std::string raw_ip_address = "192.168.88.93"; //服务器的地址
unsigned short port_num = 9527; //服务器的端口
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); //生成了一个端点
asio::io_context ios;
asio::ip::tcp::socket sock(ios, ep.protocol()); //创建了个socket,这个sock是绑定在ios这个服务上的,并且它的协议是ep.protocol()
sock.connect(ep); //连接到端点,这样就连接到服务器了
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

服务器接收连接,当有客户端连接时,服务器需要接收连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int accept_new_connection() {
const int BACKLOG_SIZE = 30; //监听队列的大小为30
unsigned short port_num = 9527;
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num); //创建一个端点,可以接收任意ipv4的地址,自己绑定的本地地址
asio::io_context ios; //创建一个服务
try {
asio::ip::tcp::acceptor acceptor(ios, ep.protocol()); //生成一个socketor接收器,第一个参数是服务,第二个参数是服务器要处理的协议(ipv4)
acceptor.bind(ep); //接收器进行一个绑定,绑定这个端口
acceptor.listen(BACKLOG_SIZE); //服务器进行监听操作
asio::ip::tcp::socket sock(ios); //再创建一个socket,这个是与客户端通信用的
acceptor.accept(sock); //接收到的新连接,交给sock来处理
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

3. 同步读写

3.1 同步写write_some

boost::asio提供了几种同步写的api,write_some可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写这一部分,返回写入的字节数。

1
2
3
4
5
6
7
8
9
void write_to_socket(asio::ip::tcp::socket& sock) {
std::string buf = "Hello World"; //定义要发送的字符串
std::size_t total_bytes_written = 0; //记录成功发送数据的字节数
//循环发送
//write_som返回每次写入的字节数
while (total_bytes_written != buf.length()) { //write_some()函数不能一次性传完
total_bytes_written += sock.write_some(asio::buffer(buf.c_str() + total_bytes_written, buf.length() - total_bytes_written)); //write_some的第一个参数是buffer类型首地址,第二个参数是未发送的字节数
}
}

3.2 同步写send

write_some使用起来比较麻烦,需要多次调用,asio提供了send函数。send函数会一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int send_data_by_send() {
std::string raw_ip_address = "192.168.88.93";
unsigned short port_num = 9527;
try {
//创建一个服务端终端节点
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
//asio::io_service ios; //老版本
asio::io_context ioc;
asio::ip::tcp::socket sock(ioc, ep.protocol()); //生成一个tcp类型的socket
sock.connect(ep); //客户端请求连接对端
std::string buf = "HEllo world";
int send_length = sock.send(asio::buffer(buf.c_str(), buf.length())); //send函数是tcp没有发完指定数据就阻塞在这里,直到发完
if (send_length <= 0) { //表示发送失败
return 0;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

3.3 同步写write

类似send方法,asio还提供了一个write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int send_data_by_write() {
std::string raw_ip_address = "192.168.88.93";
unsigned short port_num = 9527;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ioc;
asio::ip::tcp::socket sock(ioc, ep.protocol()); //生成一个tcp类型的socket
sock.connect(ep); //客户端请求连接对端
std::string buf = "HEllo world";
int send_length = asio::write(sock, asio::buffer(buf.c_str(), buf.length())); //write函数是tcp没有发完指定数据就阻塞在这里,直到发完
if (send_length <= 0) { //表示发送失败
return 0;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

3.4 同步读read_some

同步读和同步写类似,提供了读取指定字节数的接口read_some,需要多次调用

1
2
3
4
5
6
7
8
9
std::string read_from_socket(asio::ip::tcp::socket& sock) {
const unsigned char MESSAGE_SIZE = 7; //要求读的长度
char buf[MESSAGE_SIZE];
std::size_t total_bytes_read = 0;
while (total_bytes_read != MESSAGE_SIZE) {
total_bytes_read += sock.read_some(asio::buffer(buf + total_bytes_read, MESSAGE_SIZE - total_bytes_read));
}
return std::string(buf, total_bytes_read);
}

3.5 同步读receive

可以一次性同步接收对方发送的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int read_data_by_receive() {
std::string raw_ip_address = "192.168.88.93";
unsigned short port_num = 9527;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ioc;
asio::ip::tcp::socket sock(ioc, ep.protocol()); //生成一个tcp类型的socket
sock.connect(ep); //客户端请求连接到对端上
const unsigned char BUFF_SIZE = 7;
char buffer_receive[BUFF_SIZE];
char receive_length = sock.receive(asio::buffer(buffer_receive, BUFF_SIZE)); //receive函数是接收完指定数据,不然tcp不返回,阻塞
if (receive_length <= 0) {
std::cout << "receive failed" << std::endl;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

3.6 同步读read

可以一次性同步读取对方发送的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int read_data_by_read() {
std::string raw_ip_address = "192.168.88.93";
unsigned short port_num = 9527;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ioc;
asio::ip::tcp::socket sock(ioc, ep.protocol()); //生成一个tcp类型的socket
sock.connect(ep); //客户端连接到对端上
const unsigned char BUFF_SIZE = 7;
char buffer_receive[BUFF_SIZE];
char receive_length = asio::read(sock,asio::buffer(buffer_receive, BUFF_SIZE)); //read函数是接收完指定数据,不然tcp不返回,阻塞
if (receive_length <= 0) {
std::cout << "receive failed" << std::endl;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ".Message:" << e.what();
return e.code().value();
}
}

4. 同步读写客户端和服务端

4.1 客户端设计

基本思路:根据服务器对端的ip和端口创建一个endpoint,然后创建socket连接这个endpoint,之后就可以用同步读写的方式发送和接收数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
using namespace boost::asio::ip;
int main()
{
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造endpoint
tcp::endpoint remote_ep(address::from_string("192.168.100.100"), 9527); //构造服务端的端点
//创建socket
tcp::socket sock(ioc); //第一个参数是服务ioc,第二个参数可以不写
boost::system::error_code error = boost::asio::error::host_not_found; //创建一个错误码,先赋值一个初值
//请求连接对端
sock.connect(remote_ep, error);
if (error) { //连接失败执行下面
cout << "connect failed, code is" << error.value() << "error msg is" << error.message() << endl;
return 0;
}
//连接成功
cout << "Enter message:";
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH); //每输入完一次按回车,就将信息存到request里面
size_t request_length = strlen(request); //获取request里面的长度
boost::asio::write(sock, boost::asio::buffer(request, request_length)); //向服务端发送request里面的信息(转换成buffer类型才能发送)
char reply[MAX_LENGTH]; //用来存放服务端传过来的信息
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length)); //接收从服务端发来的信息(转换成buffer类型才能接收)
cout << "Reply is:";
cout.write(reply, reply_length); //输出到终端
cout << "\n";
}
catch (std::exception& e) {
std::cerr << "Exception:" << e.what() << endl;
}
return 0;
}

4.2 服务端设计

server函数作用:根据服务器ip和端口创建服务器acceptor用来接收数据,用socket接收新的连接,然后为这个socket创建session

session函数作用:该函数为服务器处理客户端请求,每当我们获取客户端连接后就调用该函数,类似于回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
typedef std::shared_ptr<tcp::socket>socket_ptr;      //定义指向socket的智能指针类型重命名为socket_ptr
std::set<std::shared_ptr<std::thread>>thread_set; //线程集合,元素是指向线程的智能指针
void session(socket_ptr sock) { //参数是与客户端之间进行通信的通信描述符(sock智能指针)
try {
for (;;) {
char data[max_length]; //接收数据的数组
memset(data, '\0', max_length); //将该数组清空为0
boost::system::error_code error; //创建一个错误码
//size_t length = boost::asio::read(sockaddr_size, boost::asio::buffer(data, max_length), error); //读客户端发来的数据,要读到max_length才返回
size_t length = sock->read_some(boost::asio::buffer(data, max_length), error); //读到多少,返回多少
if (error == boost::asio::error::eof) { //eof表示对端关闭
std::cout << "connection closed by peer" << endl;
break;
}
else if (error) { //其它错误较严重,抛出异常
throw boost::system::system_error(error);
}

//打印对端的ip地址(转为字符串打印出来)
cout << "receive from " << sock->remote_endpoint().address().to_string() << endl; //sock->remote_endpoint()是对端的端点
cout << "receive message is" << data << endl; //打印对端发来的数据
//这里简单处理,直接将接收到的数据传给对端
boost::asio::write(*sock, boost::asio::buffer(data, length));
}
}
catch (exception& e) {
std::cerr << "Exception in thread:" << e.what() << "\n" << std::endl;
}
}

void server(boost::asio::io_context& io_context, unsigned short port) {
//定义一个accepter,是用来服务器接收客户端的连接的
tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port)); //第一个参数是上下文,第二个参数是用ipv4的地址绑定本地服务器的地址--->最终是端点形式
for (;;) {
socket_ptr socket(new tcp::socket(io_context)); //创建了一个socket,智能指针指向它,并且初始化好了
//调用了 tcp::acceptor 对象 a 的 accept 方法,这个方法会阻塞等待直到有一个新的TCP连接请求到达。一旦有客户端发起连接请求,accept 方法就会接受这个连接,并将新连接的套接字信息赋给 *socket,即 socket_ptr 智能指针所指向的 tcp::socket 对象
a.accept(*socket); //连接成功后,此时的socket智能指针就相当于是通信描述符了
auto t = std::make_shared<std::thread>(session, socket); //创建一个线程,并指向session函数
thread_set.insert(t); //将新创建的线程 t 插入到一个线程集合 thread_set 中,为了后续对线程进行管理或监控
}
}
int main()
{
try {
boost::asio::io_context ioc; //定义一个服务,用于处理I/O操作,如网络连接和文件操作
server(ioc, 10086); //将服务和端口传递给server函数
for (auto& t : thread_set) { //遍历线程回收,这样主线程就是最后退出的
t->join();
}
}
catch (std::exception& e) {
std::cerr << "Exception" << e.what() << "\n";
}
return 0;
}

4.3 同步读写的优劣

1.缺点:

  • 同步读写的缺陷在于读写是阻塞的,如果客户端对端不发送数据服务器的read操作是阻塞的,这将导致服务器处于阻塞等待状态
  • 可以通过开辟新的线程为新生成的连接处理读写,但是一个进程开辟的线程是有限的,约为2048个线程,在Linux环境可以通unlimit增加一个进程开辟的线程数,但是线程过多也会导致切换消耗的时间片较多
  • 该服务器和客户端为应答式,实际场景为全双工通信模式,发送和接收要独立分开
  • 该服务器和客户端未考虑粘包处理

2.优点:当客户端连接数不多,而且服务器并发性不高的场景,可以使用同步读写的方式,这样能简化编码难度

5. 异步读写

这里先封装一个Node信息结构体,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MsgNode {
public:
//发送结点时的构造方式
MsgNode(const char* msg, int total_len):_total_len(total_len), _cur_len(0) { //参数:一个字符串的首地址(字符串);长度。这里初始化了发送数据的总长度,以及当前发送的字节数为0
_msg = new char[total_len]; //开辟了一块要发送数据长度的空间
memcpy(_msg, msg, total_len); //直接将原来的内容copy到_msg中,现在_msg里面解释要发送的数据了
}
//接受结点时的构造方式
MsgNode(int total_len) :_total_len(total_len), _cur_len(0) { //参数就是读取数据的长度。这里初始化了要读取数据的总长度,以及当前读取的字节数为0
_msg = new char[total_len]; ////开辟了一块要读取数据长度的空间
}
~MsgNode() {
delete[] _msg; //析构掉手动创建的空间
}
int _total_len; //消息总长度
int _cur_len; //当前接收或发送的字节数
char* _msg; //消息的首地址
};

而Session类负责完成异步读和异步写的工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Session
{
public:
Session(std::shared_ptr<asio::ip::tcp::socket>socket);
void Connect(const asio::ip::tcp::endpoint& ep);

void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void WriteToSocket(const std::string& buf);

void WriteAllToSocket(const std::string& buf);
void WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

void ReadFromSocket();
void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

void ReadAllFromSocket();
void ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

private:
std::queue<std::shared_ptr<MsgNode>> _send_queue; //定义一个队列,里面的元素是指向信息结构体的智能指针
std::shared_ptr<asio::ip::tcp::socket>_socket; //定义一个指向socket智能指针
std::shared_ptr<MsgNode>_send_node; //定义一个指向信息结构体的智能指针(发送数据类型的)
bool _send_pending; //是否还有未发送完的数据

std::shared_ptr<MsgNode> _recv_node; //定义一个指向信息结构体的智能指针(接收数据类型的)
bool _recv_pending; //是否还有未接收完的数据
};

5.1 异步写async_write_some

该方法可能会多次调用回调函数,比如说我们要求这个_total_len发送12个字节数据,那么调用的回调函数为什么一直返回5个字节勒?这个是因为我们要求它发多长,但是tcp它有一个发送缓冲区,跟我们用户缓冲区是不一致的,tcp的发送缓冲区实际上空闲的空间比我们要求发送的总长度要小,所以它实际发送的长度要比我们要求的少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void Session::WriteToSocket(const std::string& buf) {      //参数是要发送的数据
//构造一个发送数据结构体结点,存放buf数据(要发送的),插入到队列里面(就算因为还有数据未发送完,但也已经插入到队列,这些数据迟早也会发送的)
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
if (_send_pending) { //如果有未发送完的数据,直接返回,就不要再调用异步发送了,因为多次调用异步发送会出问题
return;
}
//没有数据,就调用异步的发送队列
this->_socket->async_write_some(asio::buffer(buf),std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); //发送完不确定数据长度后,才会调用回调函数
_send_pending = true; //设为true,表示数据没有发送完
}

//参数是错误码和才发送完的数据长度(WriteCallBack函数是已经发送完bytes_transferred长度数据后才被调用的)
void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec.value() != 0) { //有错误,打印错误码和错误信息,然后直接退出
std::cout << "Error,code is " << ec.value() << ".Message is " << ec.message();
return;
}
auto& send_data = _send_queue.front(); //取出第一个元素(发送消息结构体)出来,因为第一个是我们正在发送的数据
send_data->_cur_len += bytes_transferred; //正在发送的数据,它的当前发送长度+传回来这次已经发送的长度--->更新成当前长度了
if (send_data->_cur_len < send_data->_total_len) { //更新后当前已发送的长度 < 要求发送的长度,就继续发送
//buffer里面要做偏移,接下来要发送的位置是数据首地址+已经发送完的数据量;剩余数据量是总的数据量-已经发送数据量
this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//如果没有执行上面if,就说明队列首元素消息结构体数据已经发送完毕了
_send_queue.pop(); //就从队列中抛出这个结点(每个结点都有它的任务,即要发送的数据)
if (_send_queue.empty()) { //如果这个结点队列为空了,说明发送完了
_send_pending = false; //将这个变量置为假,以后有数据要发送,就可以直接放入队列,马上执行异步发送的代码
}
if (!_send_queue.empty()) { //如果结点队列还有,就继续发送队列当前首元素的信息结构体里面的信息
auto& send_data = _send_queue.front(); //再次取出队列的第一个元素发出
//第一次发送send_data->_cur_len为0,但也要写上好一点,因为该变量是会更新的,后面调用就不一定是0了
this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
}
}

5.2 异步写async_send

socket里面还有另一个异步发送的函数async_send,它是boost::asio帮我们封装的,它是把数据全部发送完毕,才会调用回调函数。表面上是只做了一次回调,但它底层会多次调用async_write_some函数来帮我们完成数据的读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Session::WriteAllToSocket(const std::string& buf) {       //参数是要发送的数据
//构造一个发送消息结构体结点,存放buf数据(要发送的),插入到队列里面
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
if (_send_pending) { //如果有未发送完的数据,直接返回,就不要再调用异步发送了,因为多次调用异步发送会出问题
return;
}
//保证只调用一次回调,这次回调出来的长度就是我们要求发送的长度
this->_socket->async_send(asio::buffer(buf), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); //参数:我们要发送的数据,绑定函数(回调函数,该函数所属的对象,占位符)
_send_pending = true; //置为真,表示有数据在发送
}
//当发送完指定的数据后,就会调用回调函数WriteAllCallBack(不会多次回调,只会发送完指定数据,才执行回调函数)
void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec.value() != 0) { //有错误,直接退出
std::cout << "Error,code is " << ec.value() << ".Message is " << ec.message();
return;
}
//如果没有错误,就说明整条数据发送完毕,因为async_send调用一次回调函数就可以发送完要求发送的数据
_send_queue.pop(); //把队首元素抛出来
if (_send_queue.empty()) { //如果队列为空,说明数据都发送完了,把判断的状态置为空
_send_pending = false;
}
if (!_send_queue.empty()) { //说明还有结点的数据可以发送
auto& send_data = _send_queue.front(); //取出对头元素,继续发送
this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); //参数:我们要发送的数据,绑定函数(回调函数,该函数所属的对象,占位符)
}
}

5.3 异步读async_read_some

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Session::ReadFromSocket() {
if (_recv_pending) { //服务器端处于接受状态,其它的应用层调用该函数,就直接返回
return;
}
//创建一个接收结点
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
//异步的读函数,读完后,如果数据读好后,会调用传递给它的回调函数,_total_len定义的长度是1024
_socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
_recv_pending = true; //置为true,只要回调被执行完后,给它置为false
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
_recv_node->_cur_len += bytes_transferred; //对读取的数据长度进行更新
if (_recv_node->_cur_len < _recv_node->_total_len) { //如果当前读完的长度 < 要求长度
_socket->async_read_some(asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len), bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//没有执行上面if,说明读完了
_recv_pending = false;
_recv_node = nullptr; //不写这行也可以
}

5.4 异步读async_receive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Session::ReadAllFromSocket() {
if (_recv_pending) { //服务器端处于接受状态,其它的应用层调用该函数,就直接返回
return;
}
_recv_node = std::make_shared<MsgNode>(RECVSIZE); //创建一个接收结点
//async_receive是异步的接收,能一次性保证数据接收到我们要求的长度,只触发一次回调函数
_socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
_recv_pending = true; //判断状态只为true。虽然是异步,但通过同步的方式去控制它
}
//因为async_receive是读完所有数据才执行回调,所以执行回调的时候,数据已经读取完了
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
_recv_node->_cur_len += bytes_transferred;
_recv_node = nullptr; //不置为空也可以
_recv_pending = false; //表示这个数据已经接收完,不存在阻塞状态,已经触发回调函数,回调函数已经处理完,下一次又可以创建新的结点进行接收了
}

注意:读和写都是一样,发送数据推荐async_send,读数据推荐async_read_some。总结如下:
async_receive和async_read_some不能混着用,因为async_receive内部也是多次调用async_read_some,那么它也会多次触发
ReadCallBack回调函数,也会多次去发送我们要发送的数据。如果混着用,在boost::asio底层就不会去考虑顺序性,读到的数据就可能是乱的了。

6. 异步服务端实现

6.1 头文件

Session类是用于处理与客户端通信的工作;Server类是用于处理与客户端建立连接的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Session
{
public:
Session(boost::asio::io_context& ioc) :_socket(ioc) {
}
tcp::socket& Socket() { //获取socket变量的函数
return _socket;
}
void Start();
private:
void handle_read(const boost::system::error_code& error, size_t bytes_transferred); //错误码;读到的数据数
void handle_write(const boost::system::error_code& error);
tcp::socket _socket;
enum {max_length = 1024};
char _data[max_length]; //存放数据的
};
class Server {
public:
Server(boost::asio::io_context& ioc, short port); //初始化时,传入ioc和监听的端口号
private:
void start_accept();
void handle_accept(Session* new_session, const boost::system::error_code& error);
boost::asio::io_context& _ioc;
tcp::acceptor _acceptor;
};

6.2 主要函数

当有客户端发数据过来的时候,因为这里通过async_read_some监听了读事件,当读事件就绪的时候,会触发读的回调函数,在start函数里,为session绑定了一个读事件,_socket.async_read_some在boost::asio底层就会把这个_socket的读事件添加到epoll表里,这样当_socke有读事件就绪的时候(_socket它的tcp缓冲区由空变成有数据),就会触发回调函数handle_read,在回调函数里就可以把数据读出来。
至于为什么data会自动把新的数据拷贝到data里?因为asio帮我们做的,就是说我们把_data传给异步读函数async_read_some的时候,asio自动的把数据读到了_data里,所以handle_read函数里就直接读出_data的数据即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int main()
{
try {
boost::asio::io_context ioc; //定义一个服务
using namespace std;
Server s(ioc, 9527);
ioc.run(); //底层是让模型在它的线程里去做一个循环的轮询操作
}
catch (std::exception& e) {
std::cerr << "Exception:" << e.what() << "\n";
}
return 0;
}
//构造函数,传了两个参数(上下文,端口),后面_ioc因为是类成员变量,所以进行了初始化,_acceptor是用来负责接收连接的描述符,给_acceptor传递的参数:上下文、端点(它要绑定的端口)--->匹配本机地址--->_acceptor监听本地地址的port端口号的,只要客户端往这个端口去连接,都会被_acceptor捕获
Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc), _acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
cout << "Server start success, on part:" << port << endl;
start_accept(); //服务端算是搭建成功了,等待客户端来连接
}
//先创建一个new_session,然后用_acceptor去接收连接,当有连接到来的时候,这个new_session就绑定到这个函数(handle_accept)里,然后这个new_session就专门处理对端的连接信息了,对端发数据、写数据都交给new_session处理
void Server::start_accept() {
Session* new_session = new Session(_ioc); //创建一个Session
//把这个Session的socket传进去,这样accept才能根据这个socket给我们返回一个新的连接
//参入的参数:socket(这个socket负责服务与对端的通信);绑定的函数(new_session是绑定函数的第一个参数,而占位符是占位error位置)
_acceptor.async_accept(new_session->Socket(), std::bind(&Server::handle_accept, this, new_session, placeholders::_1)); //如果与客户端建立成功,就会执行handle_accept回调函数
}

void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {
if (!error) { //是0的话,就表示成功了
new_session->Start(); //让它去接收客户端的收发信息了
}
else {
delete new_session; //失败就delete掉
}
//上面就处理完一个连接了,_acceptor还要去接收新的连接
start_accept();
}

void Session::Start() {
memset(_data, 0, max_length); //将存放数据的_data清空置0
//buffer参数是:首地址;长度
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2)); //因为handle_read有两个参数,需要两个占位符
}
//读的回调函数,就是客户端发数据过来,就调用该函数(调用该函数时,数据已经读到了_data中)
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
if (!error) {
cout << "server receive data is " << _data << endl; //如果没有错,打印客户端发来的数据
//同读一样,把_socket通过async_write绑定一个写事件,写的数据是_data,写的多少是bytes_transferred。如果异步调用了该写函数,而tcp的发送缓冲区现在又没有可用的空间,它是不会回调async_write函数的;只有当tcp的发送缓冲区给我们的数据发出去后,它就会调用async_write函数了,然后把刚刚收到的数据发送过去。发送数据时,asio就会知道,它会调用我们新绑定的函数对象,把参数传给placeholders::_1所占的位置
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), std::bind(&Session::handle_write, this, placeholders::_1)); //绑定发送的回调函数
}
else { //如果有错误(对端关闭也属于错误,会执行下面)
cout << "read error" << endl;
delete this; //有错误,直接销毁掉Session,就说明这个连接断开了
}
}
//当调用了async_write回调函数时,先将data清除一下(因为已经发送成功),接下来就直接往这个data里面读数据即可
void Session::handle_write(const boost::system::error_code& error) {
if (!error) {
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2)); //因为handle_read有两个参数,需要两个占位符
}
else { //如果有错误
cout << "write error" << error.value() << endl;
delete this; //有错误,直接销毁掉Session,就说明这个连接断开了
}
}

上面需要了解的是:用户态跟tcp的缓冲区的联系,什么能造成写就绪事件,什么能造成读就绪事件,也就是什么时候能触发写回调,什么时候能触发读回调。触发读回调是因为tcp的缓冲区有数据;触发写回调是因为tcp缓冲区有空闲空间,那么就可用从用户态将信息拷贝到tcp缓冲区,然后tcp再发出去,这时候就能触发写回调。

6.3 伪闭包延长连接生命周期

上面异步代码在某些情况下,会出现一些隐患,比如说这个服务器将要发送数据给客户端,也就是读到数据后,将要调用async_write去发数据,如果这个时候客户端断掉了,而写事件已经就绪,写事件在写的时候,会触发写回调handle_write,它也会发现对端断掉了,所以它在handle_write函数里面会执行错误处理代码段,因此就会回收数据(delete this)。但是因为对端关闭了,它还会触发一次读回调,而读回调也会捕获到对端关闭了,那么它也会去执行delete this代码。所以这样的话,两次的delete this就会造成内存的二次释放,系统就会崩溃。

上面的情况可以理解为:读到数据过后,在发送数据给对端之前,把连接断掉。

解决办法:因为c++里面没有闭包的机制,所以这里就用c++11的特性里面的智能指针。因为智能指针是有引用计数的,如果把智能指针传给一个函数对象,这个函数对象不被释放掉,那么这个智能指针就不会被释放掉。如果我们把智能指针传给回调函数(假设它会被放到一个回调的队列里),那么回调函数就是一个函数对象,这个函数对象没有被调用,没有被释放之前,智能指针也不会被释放。所以我们可以把智能指针作为参数传递给回调函数,函数内部再使用智能指针,智能指针就不会被释放掉了。

头文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Session:public std::enable_shared_from_this<Session>  //继承一个模板类,需要什么样的类型,传入什么
{
public:
Session(boost::asio::io_context& ioc, Server* server) :_socket(ioc), _server(server) {
//定义一个uuid(通过自定义生成器,它的构造函数生成一个函数对象,函数对象再调用它函数)
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid); //把生成的uuid经过转换就能生成唯一的_uuid了
}
tcp::socket& Socket() { //获取socket变量的函数
return _socket;
}
void Start();
std::string& GetUuid();
private:
void handle_read(const boost::system::error_code& error, size_t bytes_transferred, shared_ptr<Session> _self_shared); //错误码;读到的数据数;接收一个指向当前 Session 对象的shared_ptr
void handle_write(const boost::system::error_code& error, shared_ptr<Session> _self_shared);
tcp::socket _socket;
enum {max_length = 1024};
char _data[max_length];
Server* _server; //Server成员变量
std::string _uuid;
};
class Server {
public:
Server(boost::asio::io_context& ioc, short port);
void ClearSession(std::string uuid);
private:
void start_accept();
void handle_accept(shared_ptr<Session> new_session, const boost::system::error_code& error);
boost::asio::io_context& _ioc;
tcp::acceptor _acceptor;
std::map<std::string, shared_ptr<Session>>_sessions;
};

实现功能的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//传入参数是ioc和端口号,往这个端口去连接,都会被_acceptor捕获
Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc), _acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
cout << "Server start success, on part:" << port << endl;
start_accept();
}
void Server::start_accept() {
shared_ptr<Session> new_session = make_shared<Session>(_ioc, this); //用智能指针的形式创建
_acceptor.async_accept(new_session->Socket(), std::bind(&Server::handle_accept, this, new_session, placeholders::_1));
}
void Server::handle_accept(shared_ptr<Session> new_session, const boost::system::error_code& error) {
if (!error) { //是0的话,就表示成功了
new_session->Start(); //让它去接收客户端的收发信息了
_sessions.insert(make_pair(new_session->GetUuid(), new_session)); //_sessions就可以管理这些连接了
}
else {
cout << "session accept failed, error is " << error.what() << endl;
}
//上面就处理完一个连接了,_acceptor还要去接收新的连接
start_accept();
}
void Server::ClearSession(std::string uuid) {
_sessions.erase(uuid); //从_sessions里将它移除就可以了
}

void Session::Start() {
memset(_data, 0, max_length); //将存放数据的_data清空置0
//shared_from_this()是创建了一个指向当前Session对象的shared_ptr。这个shared_ptr被传递给异步操作的回调函数,以确保在异步操作完成时,Session 对象仍然有效。这通常用于确保在异步操作的回调函数执行期间,对象不会被销毁。
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2, shared_from_this()));
}
//读的回调函数,就是客户端发数据过来,就调用该函数
void Session::handle_read(const boost::system::error_code& error,
size_t bytes_transferred, shared_ptr<Session> _self_shared) {
if (!error) {
cout << "server receive data is " << _data << endl;
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), std::bind(&Session::handle_write, this, placeholders::_1, _self_shared)); //绑定发送的回调函数
}
else { //如果有错误(对端关闭也属于错误,会执行下面)
cout << "read error" << endl;
//如果handle_read函数进入了错误处理,调用它的ClearSession函数,传入自己_uuid即可,这样该session就会从server中移除
_server->ClearSession(_uuid);
}
}

std::string& Session::GetUuid() {
return _uuid;
}
//当调用了async_write回调函数时,先将data清除一下(因为已经发送成功),接下来就直接往这个data里面读数据即可
void Session::handle_write(const boost::system::error_code& error, shared_ptr<Session> _self_shared) {
if (!error) {
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2, _self_shared));
}
else { //如果有错误
cout << "write error" << error.value() << endl;
_server->ClearSession(_uuid);
}
}

伪闭包延长连接声明周期原理:把智能指针作为参数传递给bind函数,bind是按值的方式去绑定的,它绑定一个智能指针的值给新生成的一个函数对象,新生成的函数对象会使用这个智能指针(以值的方式来使用),也就增加了智能指针的引用计数。我们使用的智能指针引用计数不为0,所以智能指针就不会被释放,则session就不会被释放(智能指针是session类型的),所以说该智能指针session它的声明周期就和新生成的函数对象的声明周期延长了,延长到和这个新生成的函数对象声明周期一致。因为我们既绑定了读的回调,也绑定了写的回调,所以说这个session,它的引用计数在这里每绑定一次,就会加1。也就不会出现新生成的函数对象被调用之前,这个session就被释放掉的情况(避免了释放已经释放的内存)。

6.4 添加发送队列

上面介绍了通过智能指针实现伪闭包的方式延长了session的生命周期,而实际使用的服务器并不是应答式,而是全双工通信方式,服务器是一直监听写事件,接收对端数据的,那么当服务器发送数据给对端时,就不能保证数据的有序性。

解决办法:设计一个数据结点,首先在CSession类里新增一个队列存储要发送的数据,因为我们不能保证每次调用发送接口的时候上一次数据已经发送完(如果上一次没有发送完,而恰巧我们又调用了一次接口,那么boost.asio底层不知道是要发送上一次没有发送完的数据,还是发送新一次调用的数据。所以勒,这样就能造成数据的混乱),所以就要把要发送的数据放入队列中,通过回调函数不断地发送。而且我们不能保证发送的接口和回调函数的接口在一个线程,所以要增加一个锁保证发送队列安全性。

头文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class MsgNode
{
friend class CSession;
public:
MsgNode(char * msg, int max_len) { //参数:数据的首地址;数据的长度
_data = new char[max_len]; //通过长度new处理一片空间
memcpy(_data, msg, max_len); //把原数据拷贝到现在的成员变量里,保证数据是一个深拷贝,以后操作这个数据即可
}
~MsgNode() {
delete[] _data; //在析构的时候,直接delete掉
}

private:
int _cur_len; //表示当前已经处理的长度
int _max_len; //表示数据的总长度
char* _data; //数据的首地址
};

class CServer
{
public:
CServer(boost::asio::io_context& io_context, short port);
void ClearSession(std::string);
private:
void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
void StartAccept();
boost::asio::io_context &_io_context;
short _port;
tcp::acceptor _acceptor;
std::map<std::string, shared_ptr<CSession>> _sessions;
};

class CSession:public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession() {
std::cout << "Ssession destruct" << endl;
}
tcp::socket& GetSocket();
std::string& GetUuid();
void Start();
void Send(char* msg, int max_length);
private:
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, shared_ptr<CSession> _self_shared);
void HandleWrite(const boost::system::error_code& error, shared_ptr<CSession> _self_shared);
tcp::socket _socket;
std::string _uuid;
char _data[MAX_LENGTH];
CServer* _server;
std::queue<shared_ptr<MsgNode> > _send_que; //队列
std::mutex _send_lock; //锁
};

实现功能的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
_acceptor(io_context, tcp::endpoint(tcp::v4(),port))
{
StartAccept();
}
void CServer::StartAccept() { //建立连接的函数
shared_ptr<CSession> new_session = make_shared<CSession>(_io_context, this); //当前的server也传进去了
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}
//与客户端连接传给后执行的函数
void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
if (!error) {
new_session->Start(); //没有问题,直接去执行读写操作
_sessions.insert(make_pair(new_session->GetUuid(), new_session)); //插入map,方便管理
}
else {
cout << "session accept failed, error is " << error.what() << endl;
}
StartAccept(); //连接成功一个后,继续去监听连接,等待其它客户端连接
}
void CServer::ClearSession(std::string uuid) {
_sessions.erase(uuid);
}

CSession::CSession(boost::asio::io_context& io_context, CServer* server):
_socket(io_context), _server(server){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
}

tcp::socket& CSession::GetSocket() {
return _socket;
}

std::string& CSession::GetUuid() {
return _uuid;
}
//-----------------下面是开始执行通信的操作---------------------
void CSession::Start(){
memset(_data, 0, MAX_LENGTH); //先将_data清0
//读取客户端发来的信息
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));
}
//当获取到到客户发送的数据后,会执行该回调函数
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, shared_ptr<CSession> _self_shared){
if (!error) {
//全双工的方式,收到数据后,就继续监听
cout << "read data is " << _data << endl; //收到数据会回调HandleRead函数,直接读出来
//发送数据
Send(_data, bytes_transferred); //把收到的数据给对端发回去
memset(_data, 0, MAX_LENGTH); //发完了把data清掉,方便下一次接收
//下面又绑定它的接收事件,继续读取客户端发来的数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else {
std::cout << "handle read failed, error is end " << endl;
_server->ClearSession(_uuid); //如果出错,就直接通过_uuid清除掉对应的Session
}
}
void CSession::Send(char* msg, int max_length) { //接收到数据后,回应对端的函数,参数是发发送的数据和长度
bool pending = false; //先默认将该变量设为false(表示上一次的数据已经发完了),为true表示当前的发送列表里有数据,表示上一次的数据没有发完
std::lock_guard<std::mutex> lock(_send_lock); //加锁
if (_send_que.size() > 0) { //判断队列是否为空,不为空,说明上一次发送的数据没有发完,将该变量设为true
pending = true;
}
//创建一个MsgNode节点,把要发送的数据拷贝到该节点,并将该节点放到队列里(用了深拷贝,数据都存放在自己的空间了)
_send_que.push(make_shared<MsgNode>(msg, max_length));
if (pending) { //再一次判断是否为true,如果为true,说明有数据没有发完,直接返回(已经把要发送的数据放到队列了)
return;
}
//如果为false,说明之前队列是为空的,但刚刚将要发送的数据添加到了队列,所以这一次必须是调用要发送的接口了(因为之前的数据发完了,
// 必须手动的再调用一次接口,调用一下异步发送,这样我们通过回调函数来处理我们发送的结果)
boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_from_this()));
}
//该回调函数是指定数据都发送完了,才会被调用。当被调用的时候,也说明发送完一个节点的数据,所以下面代码可以直接取出一个节点
void CSession::HandleWrite(const boost::system::error_code& error, shared_ptr<CSession> _self_shared) {
if (!error) {
std::lock_guard<std::mutex> lock(_send_lock); //加锁
_send_que.pop(); //取出队列首结点,刚刚发送的就是该结点。该函数被调用时,就说明队列的首元素已经发送完了
//判断发送队列是否为空,为空则发送完,否则不断取出队列数据调用async_write发送,直到队列为空。
if (!_send_que.empty()) { //不为空,就取出队列的首元素,再次调用异步的发送函数
auto &msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, _self_shared));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
_server->ClearSession(_uuid);
}
}

6.5 粘包处理

1.粘包问题

粘包问题是服务器收发数据常遇到的一个现象,比如说当客户端发送两个Hello World!给服务器,服务器TCP接收缓冲区接收了两次,一次是Hello World!Hello, 第二次是World!

2.粘包原因

因为TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据(上次要发送的数据比如’loveu’)未发送完,那么此时只有5个字节空闲空间,我们调用发送接口发送hello world!其实就是只能发送Hello给服务器,那么服务器一次性读取到的数据就很可能是loveuhello。而剩余的world!只能留给下一次发送,下一次服务器接收到的就是world!

3.产生粘包问题其它原因

  • 客户端的发送频率远高于服务器的接收频率,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!
  • tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法
  • 再就是我们提到的最简单的情况,发送端缓冲区有上次未发送完的数据或者接收端的缓冲区里有未取出的数据导致数据粘连

解决办法:处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容)

头文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class MsgNode         //存放数据的节点
{
friend class CSession;
public:
//发送时使用的构造函数。参数:一个字符串的首地址;该字符串的长度。初始化数据的总长度(数据+头),初始化当前发送长度为0
MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
_data = new char[_total_len+1](); //自己开辟了一块空间存数据,空间大小是总长度+1 --->最后面多加一个\0
memcpy(_data, &max_len, HEAD_LENGTH); //头节点存有效字符串数据的长度max_len
memcpy(_data+ HEAD_LENGTH, msg, max_len); //偏移两个字节存有效数据字符串
_data[_total_len] = '\0'; //最后一个字节空间存\0
}
//接收时使用的构造函数。参数:接收数据的长度
MsgNode(short max_len):_total_len(max_len),_cur_len(0) { //初始化了总长度和当前长度为0
_data = new char[_total_len +1](); //开辟了一块空间,存数据
}
~MsgNode() { //析构函数
delete[] _data;
}
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
short _cur_len; //当前发送或接收数据长度
short _total_len; //要发送或接受的数据长度
char* _data; //存放数据的首地址
};

class CSession: public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession();
tcp::socket& GetSocket();
std::string& GetUuid();
void Start();
void Send(char* msg, int max_length);
void Close();
std::shared_ptr<CSession> SharedSelf();
private:
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self);
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
tcp::socket _socket;
std::string _uuid;
char _data[MAX_LENGTH];
CServer* _server;
bool _b_close;
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node; //用来存储接收到的有效数据消息体
bool _b_head_parse; //表示头部是否解析完成
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node; //用来存储接收到头部消息体
};
class CServer
{
public:
CServer(boost::asio::io_context& io_context, short port);
void ClearSession(std::string);
private:
void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
void StartAccept();
boost::asio::io_context &_io_context;
short _port;
tcp::acceptor _acceptor;
std::map<std::string, shared_ptr<CSession>> _sessions;
};

实现功能的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
//主函数传入ioc和监听的端口号,该函数进行初始化成员变量和建立了服务端的终端端点(开始监听连接)
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port), _acceptor(io_context, tcp::endpoint(tcp::v4(),port))
{
cout << "Server start success, listen on port : " << _port << endl;
StartAccept();
}
void CServer::StartAccept() {
shared_ptr<CSession> new_session = make_shared<CSession>(_io_context, this); //创建一个session,并初始化
//异步连接,阻塞监听客户端,当与客户端建立连接成功后,执行绑定的回调函数HandleAccept
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}
void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
if (!error) {
new_session->Start(); //如果连接没有问题,开始通信
_sessions.insert(make_pair(new_session->GetUuid(), new_session)); //将该session添加到map里面
}
else {
cout << "session accept failed, error is " << error.what() << endl; //连接失败,打印错误信息
}
StartAccept(); //继续监听客户端释放发来连接
}
void CServer::ClearSession(std::string uuid) { //当出错或断开连接时,将对应的session移出map
_sessions.erase(uuid);
}
//-------------------------------下面是通信的代码------------------------------
CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_LENGTH);
}
CSession::~CSession() {
cout << "~CSession destruct" << endl;
}
tcp::socket& CSession::GetSocket() { //获取socket
return _socket;
}
std::string& CSession::GetUuid() { //获取uuid
return _uuid;
}
void CSession::Start(){ //开始通信函数
::memset(_data, 0, MAX_LENGTH); //先将_data清0
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())); //异步读数据,读成功执行回调函数
}
void CSession::Send(char* msg, int max_length) {
bool pending = false;
std::lock_guard<std::mutex> lock(_send_lock);
if (_send_que.size() > 0) {
pending = true;
}
_send_que.push(make_shared<MsgNode>(msg, max_length));
if (pending) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Close() {
_socket.close();
_b_close = true;
}

std::shared_ptr<CSession>CSession::SharedSelf() {
return shared_from_this();
}

void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
if (!error) {
std::lock_guard<std::mutex> lock(_send_lock);
cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;
_send_que.pop();
if (!_send_que.empty()) {
auto &msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}

void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){ //读了数据长度bytes_transferred,执行的该函数,数据放到_data中的
if (!error) { //如果读取数据没有出错
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred>0) { //只要读取的数据大于0,执行下面函数
if (!_b_head_parse) { //头部数据是否处理完,如果没有处理完,执行下面
//如果这次读的字节数+之前处理完的字节数 < 头部应该收到字节数 --->这种情况说明这次读到的数据全部头部数据,且头部数据还没有读完
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred); //直接将这次读的全部数据作为头部数据存到_recv_head_node节点
_recv_head_node->_cur_len += bytes_transferred; //更新头部数据读到的位置
::memset(_data, 0, MAX_LENGTH); //将_data清0,下一次继续往这里面存读到的数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //1.这种情况就退出,下次读回调触发,继续读头部数据。因为头部数据都没有接收到完毕,就没有必要继续执行下去
}
//收到的数据+已经处理的头部数据>头部数据,说明这次收到的数据一部分是头部数据,一部分是真实信息数据
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; //获取头部剩余未复制的长度
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain); //将头部剩余未处理的字节数全部存到_recv_head_node头节点
copy_len += head_remain; //更新已处理的长度
bytes_transferred -= head_remain; //更新剩余未处理的长度,剩余就是真实数据的内容一部分
//获取头部数据(也就是真实数据的长度)
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
cout << "data_len is " << data_len << endl; //打印真实数据的长度
if (data_len > MAX_LENGTH) { //判断真实数据的长度是否超出规定长度
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid); //如果超出规定长度,则直接将其移出map
return; //不合法直接返回
}
//如果合法,定义一个存放真实数据的MsgNode节点
_recv_msg_node = make_shared<MsgNode>(data_len);
//处理完头部数据后,剩余消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); //这种情况是剩余未处理的bytes_transferred全部是真实数据的一部分,所以全部copy
_recv_msg_node->_cur_len += bytes_transferred; //更新数据节点的当前处理长度
::memset(_data, 0, MAX_LENGTH); //将_data清0,继续存放下一次读取的数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
_b_head_parse = true; //虽然有效数据长度未处理完成,但头部结点处理完成,设未true
return; //2.这种情况退出后,下次继续执行HandleRead函数,读剩余真实数据(注意:此时_b_head_parse = true;)
}
//处理完头部数据后,剩余消息的长度大于等于头部规定的长度,说明真实数据收全 或 收全全还出现粘包
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len); //这种情况,直接将真实数据长度,全部copy到存放数据的节点_recv_msg_node
_recv_msg_node->_cur_len += data_len; //更新当前处理长度
copy_len += data_len; //更新读到的数据_data里面处理的长度
bytes_transferred -= data_len; //更新_data还剩未处理的数据长度
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //真实数据读完,后面加'\0'
cout << "receive data is " << _recv_msg_node->_data << endl; //打印真实数据内容(客户端真实数据的内容)
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len); //发送给客户端
//继续轮询剩余未处理数据
_b_head_parse = false; //处理完一个,继续下一个,将该变量设为false,表示头部未处理
_recv_head_node->Clear(); //把头部结点进行清除,下一次发送用
if (bytes_transferred <= 0) { //这种情况是没有粘包(刚刚好真实数据读完,_data就无数据要处理了)
::memset(_data, 0, MAX_LENGTH); //下面继续挂起读
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue; //还有数据,出现粘包情况,退出当前循环,执行下一次循环,继续读(包括从头部数据处理开始)
}
//如果是已经处理完头部(_b_head_parse为真,没有执行上面的if),处理上次未接受完的消息数据
//获取真实数据还未处理的长度
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) { //如果读到的数据仍小于剩余未处理的长度,说明下次还要继续读真实数据
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); //这次读的全部数据长度bytes_transferred都是真实数据的一部分
_recv_msg_node->_cur_len += bytes_transferred; //更新当前真实数据处理长度
::memset(_data, 0, MAX_LENGTH); //清0,存放下一次读的数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //退出,真实数据还没有读完
}
//如果读到的数据大于等于剩余未处理的长度,说明真实数据读完了 或 读完了并且粘包
//下面直接先将真实数据未处理的长度处理完
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg; //更新真实数据当前处理长度
bytes_transferred -= remain_msg; //更新_data处理好真实数据后,还剩余处理长度
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //真实数据处理完,加'\0'
cout << "receive data is " << _recv_msg_node->_data << endl; //打印客户端发来的内容
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len); //发送给客户端
//继续轮询剩余未处理数据
_b_head_parse = false; //处理完真实数据后,将该变量设未false,下一次继续从头节点开始处理
_recv_head_node->Clear();
if (bytes_transferred <= 0) { //这种情况是没有出现粘包
::memset(_data, 0, MAX_LENGTH); //清0,继续挂起读
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //直接退出,挂起了继续读
}
continue; //还有数据,出现粘包情况,退出当前循环,执行下一次循环,继续读(包括从头部数据处理开始)
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}