1. asio的通信流程

首先是在应用层调用async_read()函数,就相当于是往io_context里面注册读事件,并且注册读回调函数。然后io_context会把对应的读事件、socket和回调都写到epoll模型或iocp模型里,即注册给这两个模型。而在应用层调用io_context.run的时候,实际上是一个死循环,它会调用linux的epoll模型或windows的iocp模型,以死循环的方式不断的轮询,不断的去检测我们注册的那些socket那些就绪了

如果有socket就绪了,比如说我们注册了一个socket监听对端的一个发送事件,对端发送过来了,我们这个读事件就会就绪了。读事件就绪的话,就会把读事件对应的回调函数写到就绪的队列里。如果是单线程的话,这些事件的回调函数放到就绪的队列里,系统直接就会把就绪队列里的回调函数一个一个取出来,按顺序来给我们回调,顺序就是我们在底层去轮询,发现哪个socket回调函数先就绪了,就先放到就绪队列里。如果是写的事件,它也会被放到该就绪队列里,最后由asio统一派发。

2. 字节序处理

2.1 字节序的问题

在计算机网络中,由于不同的计算机使用的 CPU 架构和字节顺序可能不同,因此在传输数据时需要对数据的字节序进行统一,以保证数据能够正常传输和解析。具体来说,计算机内部存储数据的方式有两种:大端序和小端序。在大端序中,高位字节存储在低地址处,而低位字节存储在高地址处;在小端序中,高位字节存储在高地址处,而低位字节存储在低地址处。

在网络通信过程中,通常使用的是大端序。这是因为早期的网络硬件大多采用了 Motorola 处理器,而 Motorola 处理器使用的是大端序。此外,大多数网络协议规定了网络字节序必须为大端序。因此,在进行网络编程时,需要将主机字节序转换为网络字节序,也就是将数据从本地字节序转换为大端序。可以使用诸如 htonl、htons、ntohl 和 ntohs 等函数来实现字节序转换操作。

综上所述,网络字节序的主要作用是统一不同计算机间的数据表示方式,以保证数据在网络中的正确传输和解析。

判断当前本机的字节序:

1
2
3
4
5
6
7
8
9
10
11
// 判断当前系统的字节序是大端序还是小端序
bool is_big_endian() {
int num = 1;
if (*(char*)&num == 1) {
// 当前系统为小端序
return false;
} else {
// 当前系统为大端序
return true;
}
}

2.2 服务器中使用字节序

1.主机字节序转换为网络字节序

  • boost::asio::detail::socket_ops::host_to_network_long() :将一个32位无符号整数从主机字节序转换为网络字节序

  • boost::asio::detail::socket_ops::host_to_network_short() :将一个16位无符号整数从主机字节序转换为网络字节序

2.网络字节序转换为主机字节序

  • boost::asio::detail::socket_ops::network_to_host_short():将网络字节序转换为16位无符号整数主机字节序

  • boost::asio::detail::socket_ops::network_to_host_long():将网络字节序转换为32位无符号整数主机字节序

在异步处理的服务器中,当接受客户端发来的数据时,需要将其进行转换,从网络字节序转换为本地字节序:

1
2
3
4
5
short data_len = 0;            //存放转换后的结果
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH); //将头部数据内容copy到data_len,此时还是网络字节序
//网络字节序转化为本地字节序
data_len=boost::asio::detail::socket_ops::network_to_host_short(data_len);
cout << "data_len is " << data_len << endl;

在服务器的发送数据时会构造消息节点,构造消息节点时,将发送长度由本地字节序转化为网络字节序:

1
2
3
4
5
6
7
8
MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){  //发送数据节点
_data = new char[_total_len+1](); //开辟一块存发送数据的空间
//转为网络字节序
int max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
memcpy(_data, &max_len_host, HEAD_LENGTH); //将转换的结果存到_data头部(前面)
memcpy(_data+ HEAD_LENGTH, msg, max_len); //继续往_data后面存储真实数据
_data[_total_len] = '\0'; //最后加上'\0'
}

3. asio处理粘包的另一种方式

我们之前使用的一种方式是通过async_read_some函数监听读事件,并且绑定了读事件的回调函数HandleRead。async_read_some 这个函数的特点是只要对端发数据,服务器接收到数据,即使没有收全对端发送的数据也会触发HandleRead函数,所以我们会在HandleRead回调函数里判断接收的字节数,接收的数据可能不满足头部长度,可能大于头部长度但小于消息体的长度,可能大于消息体的长度,还可能大于多个消息体的长度,所以要切包等,这些逻辑写起来很复杂。

所以我们可以通过读取指定字节数,直到读完这些字节才触发回调函数,那么可以采用async_read函数,这个函数指定读取指定字节数,只有完全读完才会触发回调函数。

3.1 获取头部数据

当服务器与客户端建立完毕,进入通信工作,先就绪读头部数据

1
2
3
4
5
6
void CSession::Start(){
_recv_head_node->Clear(); //将存头部数据的节点_data清0,当前处理长度设为0
//先读头部的两个字节数据,一次性读完的,只有读完才会触发HandleReadHead回调函数
boost::asio::async_read(_socket, boost::asio::buffer(_recv_head_node->_data, HEAD_LENGTH), std::bind(&CSession::HandleReadHead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
//SharedSelf()是通过自己再生成一个智能指针,这个智能指针与其它的智能指针共享引用计数,保证Session不被异常释放
}

当对端发来指定大小HEAD_LENGTH数据,并存到_recv_head_node->_data时,触发回调函数HandleReadHead。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void CSession::HandleReadHead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self) {
if (!error) { //如果没有报错
if (bytes_transferred < HEAD_LENGTH) { //这种情况也是不正常的,因为指定读HEAD_LENGTH长度数据,却没有读完
cout << "read head length error";
Close(); //这里是把socket关闭掉,会关闭socket的事件处理机制。底层asio会把socket从它监听的epoll事件里面移出,socket就不会触发改回调函数
_server->ClearSession(_uuid); //将其session移出,减少它的引用计数
return;
}
//没有问题情况下,bytes_transferred一定是等于头部长度的
//头部收全,解析头部
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH); //获取真实数据的大小
cout << "data len is" << data_len << endl; //打印真实数据的大小
if (data_len > MAX_LENGTH) { //如果真实数据大于规定的最大值,说明不合法
std::cout << "invalid data length is" << data_len << endl;
_server->ClearSession(_uuid); //将其session移出map
return;
}
//如果正常,就读取真实数据
_recv_msg_node = make_shared<MsgNode>(data_len); //下面开始读真实数据,读真实数据总长度_total_len,存到该数据节点中
boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data,_recv_msg_node->_total_len), std::bind(&CSession::HandleReadMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}
else { //出错了
cout << "handle read head failed,error is" << error.what() << endl;
Close(); //关闭socket
_server->ClearSession(_uuid); //减少session的引用计数
}
}

3.2 获取信息体

当获取完头部数据后,就可以在其回调函数里执行异步读async_read,直接获取信息数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void CSession::HandleReadMsg(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self) {
if (!error) {
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //数据都读到了数据节点,再最后面加上'\0'
cout << "receive data is " << _recv_msg_node->_data << endl; //打印接收的数据
Send(_recv_msg_node->_data, _recv_msg_node->_total_len); //发送接收的数据给客户端
//再次接收头部数据
_recv_head_node->Clear();
boost::asio::async_read(_socket, boost::asio::buffer(_recv_head_node->_data, HEAD_LENGTH), std::bind(&CSession::HandleReadHead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}
else { //出错了
cout << "handle read Msg failed,error is" << error.what() << endl;
Close(); //关闭socket
_server->ClearSession(_uuid); //减少session的引用计数
}
}

4. 结合Json实现粘包处理tlv

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。结构图如下:

为了减少耦合和歧义,这里重新设计消息节点。

  • MsgNode表示消息节点的基类,头部的消息用这个结构存储
  • RecvNode表示接收消息的节点
  • SendNode表示发送消息的节点

4.1 信息节点类

MsgNode.h头文件定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MsgNode
{
public:
MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
_data = new char[_total_len + 1]();
_data[_total_len] = '\0';
}
~MsgNode() {
std::cout << "destruct MsgNode" << endl;
delete[] _data;
}
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
short _cur_len; //当前处理的长度(接收或发送)
short _total_len; //要处理的总长度
char* _data; //存放数据的首地址
};
//接收数据的节点类定义,继承了MsgNode类
class RecvNode :public MsgNode {
public:
RecvNode(short max_len, short msg_id); //传入数据的长度和信息id
private:
short _msg_id;
};
//发送数据的节点类定义,继承了MsgNode类
class SendNode:public MsgNode {
public:
SendNode(const char* msg,short max_len, short msg_id); //传入发送数据,数据长度和信息id
private:
short _msg_id;
};

接受数据节点和发送数据节点初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
//接收数据节点类初始化,将传进去的max_len用来给MsgNode类初始化,msg_id给id初始化
RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),_msg_id(msg_id){
}
//发送数据节点类初始化,数据长度+头节点长度=总长度
SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN), _msg_id(msg_id){
//先发送id, 转为网络字节序
short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
memcpy(_data, &msg_id_host, HEAD_ID_LEN); //将转换好的id存到_data第一部分
//转为网络字节序
short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN); //将转换好的数据长度存到_data第二部分
memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len); //将真实数据存到_data第三部分
}

4.2 Session类

Session类头节点如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class CSession: public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession();
tcp::socket& GetSocket(); //获取socket函数
std::string& GetUuid(); //获取uuid函数
void Start(); //开始通信函数
void Send(char* msg, short max_length, short msgid); //发送数据send
void Send(std::string msg, short msgid); //接收数据send
void Close(); //关闭socket函数
std::shared_ptr<CSession> SharedSelf(); //添加智能指针引用计数的函数
private:
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self);
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
tcp::socket _socket;
std::string _uuid;
char _data[MAX_LENGTH];
CServer* _server;
bool _b_close;
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse; //头部节点释放读取完毕
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
};

Session类实现功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
CSession::~CSession() {
std::cout << "~CSession destruct" << endl;
}
tcp::socket& CSession::GetSocket() { //获取socket函数实现
return _socket;
}
std::string& CSession::GetUuid() { //获取uuid函数实现
return _uuid;
}
void CSession::Start(){ //与客户端通信开始函数
::memset(_data, 0, MAX_LENGTH); //先将_data清0
//异步读async_read_some,将读到的数据放到了_data,读完后,并触发回调函数HandleRead
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}

void CSession::Send(std::string msg, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}
_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
if (send_que_size > 0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Send(char* msg, short max_length, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}
_send_que.push(make_shared<SendNode>(msg, max_length, msgid));
if (send_que_size>0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Close() {
_socket.close();
_b_close = true;
}

std::shared_ptr<CSession>CSession::SharedSelf() {
return shared_from_this();
}

void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
//增加异常处理
try {
if (!error) {
std::lock_guard<std::mutex> lock(_send_lock);
//cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;
_send_que.pop();
if (!_send_que.empty()) {
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cerr << "Exception code : " << e.what() << endl;
}
}
//当异步读,读到数据后触发该回调函数
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
try {
if (!error) { //如果没有出错
int copy_len = 0; //已经移动的字符数
while (bytes_transferred > 0) { //如果bytes_transferred大于0
if (!_b_head_parse) { //如果头部数据还没有处理完
//这次收到的数据不足头部大小,则这次收到的bytes_transferred长度数据都是头部数据,HEAD_TOTAL_LEN=4(id占2,数据长度占2)
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); //将这次读到的全部数据全部存到头部节点
_recv_head_node->_cur_len += bytes_transferred; //更新头部节点的当前处理位置
::memset(_data, 0, MAX_LENGTH); //清0,继续读,存放下一次读到的数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //这种情况直接退出,继续读数据(头部)
}
//收到的数据比头部多(没有执行上面if),说明这次读到的数据一部分是头部数据,一部分是真实数据
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; //头部剩余未处理的长度
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain); //将头部剩余未处理的长度,全部处理完(存到头部节点)
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部MSGID数据
short msg_id = 0; //用来存放信息id
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN); //从头部数据获取id
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
if (msg_id > MAX_LENGTH) { //如果id非法
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid); //从map移出对应uuid,减少该session的引用计数
return;
}
//如果id合法,继续读取信息长度
short msg_len = 0; //用来存真实信息长度
memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;
if (msg_len > MAX_LENGTH) { //如果真实数据长度非法
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid); //从map移出对应uuid,减少该session的引用计数
return;
}
_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id); //存放真实数据内容节点
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
_b_head_parse = true; //头部处理完成,将该变量设为true
return;
}
//如果读到数据>=头部读到的真实数据长度,说明可能出现粘包情况
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len); //这时就处理头部节点读到的真实数据长度
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len; //先减去真实数据长度
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //结尾加结束符
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root; //用来存放反序列化的值
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is " << root["data"].asString() << endl; //反序列化后,读取客户端发来的值
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString(); //又将root的值序列化
Send(return_str, root["id"].asInt()); //发送给客户端
//继续轮询剩余未处理数据
_b_head_parse = false; //因为上面已经处理完一个节点数据,又要从头节点开始处理
_recv_head_node->Clear(); //清理头节点,为下一次读做准备
if (bytes_transferred <= 0) { //如果bytes_transferred已经<=0,说明没有出现粘包情况
::memset(_data, 0, MAX_LENGTH); //清理_data,继续挂起读
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue; //出现粘包情况,就不执行下面了,_b_head_parse已经为false,可以重新循环执行下一个节点
}
//已经处理完头部,处理上次未接受完的真实消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //这种情况是真实数据还未处理完,退出继续读
}
//这次接收的数据>=真实数据未处理的长度,可能出现粘包情况
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is " << root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());
//继续轮询剩余未处理数据
_b_head_parse = false; //处理完一个节点,将该变量设为false
_recv_head_node->Clear();
if (bytes_transferred <= 0) { //这种情况是没有出现粘包
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return; //挂起读
}
continue; //出现粘包,就继续循环,因为bytes_transferred还大于0
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cout << "Exception code is " << e.what() << endl;
}
}

对于server类,没有出现其它变化,与之前代码差不多。

5. asio多线程模型IOServicePool

前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两种多线程模型,第一个是启动多个线程,每个线程管理一个iocontext;第二种是只启动一个iocontext,被多个线程共享。这里主要使用第一种模式,多个线程,每个线程管理独立的iocontext服务。

下面先介绍多线程的第一种模式:一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。为避免线程安全问题,我们将网络数据封装为逻辑包投递给逻辑系统,逻辑系统有一个单独线程处理,这样将网络IO和逻辑处理解耦合,极大的提高了服务器IO层面的吞吐率。

5.1 单线程与多线程

之前使用的单线程模型如下:

IOServicePool类型的多线程模型如下:

IOServicePool多线程模式特点:

  • 每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。

  • 但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。

  • 多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是串行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

5.2 服务端实现

主函数:先初始化服务池,创建一个负责监听的io_context,通过asio提供的异步等待函数和线程来完成信号的捕捉,当触发SIGINT和SIGTERM信号时,停止io服务,即不在监听处理客户端发来的请求连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
try {
auto pool = AsioIOServicePool::GetInstance(); //先将服务池初始化好
boost::asio::io_context io_context; //该io_context只负责监听,监听客户端是否请求发来连接
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); //初始化一个信号集,将要捕获的信号SIGINT, SIGTERM注册到服务io里
//异步等待,等待信号被触发,触发了会执行回调函数(匿名函数表示),io_context是用引用方式来捕获,auto:几个参数,就写几个auto
signals.async_wait([&io_context](auto, auto) { //信号集异步等待,信号集里面的信号触发了,就会执行这个回调函数(匿名函数)
io_context.stop(); //停止io服务
});
//上面因为是异步等待,所以不会阻塞,如果回调函数没有触发,会继续执行后面的代码
CServer s(io_context, 10086);
io_context.run(); //轮询
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
}

CServer类实现:这部分负责实现监听客户端的连接和获取服务池的io_context。每与客户端建立成功一次,就从服务池中获取一个io_context来负责处理与连接的客户端的一些业务,实现了一个线程一个io_context的多线程模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
_acceptor(io_context, tcp::endpoint(tcp::v4(),port))
{
//该函数初始化了_acceptor,里面的参数io_context是单独创建的,就负责用来监听客户端
cout << "Server start success, listen on port : " << _port << endl;
StartAccept(); //处理连接的函数
}
void CServer::StartAccept() {
//当有客户端发来请求连接时,就从服务池里面取出一个io_context来负责这次连接的业务处理
//GetIOService()函数实现的是从下标为0开始获取io_context,获取完一次,下标进行+1,当达到服务池里面提供的io_context的个数时,又从下标为0的io_context开始获取,所以当客户端连接的数量过多时,可能是多个客户端同时拥有同一个io_context
auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();
shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1)); //异步连接,与客户端成功连接后,执行回调函数HandleAccept()
}
void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
if (!error) {
new_session->Start(); //去session执行通信的业务
_sessions.insert(make_pair(new_session->GetUuid(), new_session)); //连接成功将对应的信息存入到map,方便管理
}
else {
cout << "session accept failed, error is " << error.what() << endl;
}
StartAccept();
}
void CServer::ClearSession(std::string uuid) {
_sessions.erase(uuid);
}

AsioIOServicePool类服务池的头文件实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//AsioIOServicePool就形成了单例模式,因为他继承了Singleton<AsioIOServicePool>模板
class AsioIOServicePool:public Singleton<AsioIOServicePool>
{
friend Singleton<AsioIOServicePool>; //这样Singleton<AsioIOServicePool>就可以访问AsioIOServicePool的构造函数(私有)
public:
using IOService = boost::asio::io_context; //IOServer是io_context的别名
using Work = boost::asio::io_context::work; //work是防止io_context在没有注册事件的时候退出(因为对于多线程我们需要先创建多个io_context)
using WorkPtr = std::unique_ptr<Work>; //Work是上面io_context::work的别名。这里是让Work不被拷贝,只能移动,从头用到尾
~AsioIOServicePool();
AsioIOServicePool(const AsioIOServicePool&) = delete; //取消拷贝构造,拷贝构造时,里面加了canst后要加&,不然会造成递归构造的危险
AsioIOServicePool& operator = (const AsioIOServicePool&) = delete; //赋值操作也取消掉
//使用round-robin的方式返回一个io_context
boost::asio::io_context& GetIOService(); //返回一个io_context
void Stop();
private:
AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); //hardware_concurrency()获取cpu的核数,一般是电脑有几个核,就开几个io_context
std::vector<IOService>_ioServices;
std::vector<WorkPtr>_works;
std::vector<std::thread>_threads;
std::size_t _nextIOService; //记录的下标索引
};

AsioIOServicePool类服务池的功能实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//AsioIOServicePool默认传的是cpu的核数,也可以自己指定,并初始化size个io_context,size个指针(默认是空指针),下标索初始化为0
AsioIOServicePool::AsioIOServicePool(std::size_t size) :_ioServices(size), _works(size), _nextIOService(0) {
//works初始化分配内存
for (std::size_t i = 0; i < size; i++) {
//将左值赋值给unique_ptr是不行的,而作为右值去赋值是可以的
_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i])); //work是绑定io_context的
}
//遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
for (std::size_t i = 0; i < _ioServices.size(); i++) { //装线程的_threads开始默认大小为0
//方法:通过遍历的方式,每次都为_threads插入一个线程。用push_back是以右值的形式将其挪进来,而用emplace_back是直接传入线程
//执行插入操作,就默认调用了线程的回调函数了,下面emplace_back相当于是给线程插入了一个回调函数,就直接跑起来了
_threads.emplace_back([this, i]() {//这是是快速构造,防止拷贝造成的开销。emplace_back传入的是匿名函数,第一个先绑定this(io_service),
_ioServices[i].run();
}); //这样,该线程就跑起来了。c11的标准,只要定义了一个线程,线程就跑起来了
/* //其它方式创建线程,但存在问题
thread t([this, i]() { //先初始化一个线程
_ioService[i].run();
});
_threads.push_back(t); //又拷贝了一个线程,这种情况是线程初始化时就跑起来了,现在又push进来,会造成开销,而且跑了两个线程
_threads.push_back(move(t)); //这种情况是相当于把上面t的生命周期进行一个接管,原来线程就失效了,但不能保证它能停下来,除非用stop销毁它 */
}
}
//析构函数
AsioIOServicePool::~AsioIOServicePool() {
std::cout << "AsioIOServicePool destruct" << endl;
}
//获取io_Service函数(io_Service是io_context的别名)
boost::asio::io_context& AsioIOServicePool::GetIOService() {
auto& service = _ioServices[_nextIOService++]; //从0下标开始取
if (_nextIOService == _ioServices.size()) {
_nextIOService = 0;
}
return service;
}
void AsioIOServicePool::Stop() { //要让每一个io_context都停止,就需要把绑定在它们身上的work reset掉
for (auto& Work : _works) {
Work.reset(); //这相当于把work智能指针释放掉,智能指针一旦释放掉,就会析构WorkPtr,而WorkPtr绑定的就是work(unique_ptr类型),
//所以就会调用work的析构,work就会把绑定的所有东西解除,从而保证io_context退出
}
//要等所有线程都结束了(io_service.run运行完),该函数才退出,所以调用了这个函数,不会立即执行完
for (auto& t : _threads) {
t.join();
}
}

由于监听客户端是否发来连接用的是刚开始初始化的io_context,后面与客户端的读写事件监听用的是服务池里面的io_context,所以当与客户端建立成功后,之后处理监听读写事件的io_context就需要从服务池里面去取。

1
2
3
4
5
6
void CServer::StartAccept() {
//多线程服务池模式:当有客户端发来请求连接时,就从服务池里面取出一个io_context来负责这次连接的业务处理
auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();
shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}

5.3 客户端实现

客户端实现的是创建100个线程,每个线程都通过json序列化一个信息包发送给服务端,并将从服务器接收的信息包反序列化,这样重复500次。测试一下等所有线程都执行完,并且都被回收所需要花费的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
std::vector<thread> vec_threads;    //定义一个存放连接线程的容器
int main()
{
auto start = std::chrono::high_resolution_clock::now(); //获取现在开始时间
for (int i = 0; i < 100; i++) { //循环100次,可以理解为向服务器发送建立了100次连接
vec_threads.emplace_back([]() { //将回调函数(线程需要执行)通过emplace_back放入容器
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造服务器的endpoint信息
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
tcp::socket sock(ioc); //创建一个socket
boost::system::error_code error = boost::asio::error::host_not_found;
sock.connect(remote_ep, error); //向服务器发送连接请求
if (error) { //连接失效,直接退出
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}
//连接成功执行下面,向服务器发送和接收各500次
int i = 0;
while (i < 500) { //发500次内容
Json::Value root;
root["id"] = 1001; //消息id
root["data"] = "hello world";
std::string request = root.toStyledString(); //序列化为字符串形式
size_t request_length = request.length(); //获取序列化后的字符串长度
char send_data[MAX_LENGTH] = { 0 };
int msgid = 1001;
//将信息id转换为网络字节序
int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);
memcpy(send_data, &msgid_host, 2); //存放到发送节点
//序列化后的数据长度转为网络字节序
int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);
memcpy(send_data + 2, &request_host_length, 2); //存放到发送节点
memcpy(send_data + 4, request.c_str(), request_length); //最后就是存放序列化后的数据
//将信息序列化后,以包的形式发送给服务器
boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));
cout << "begin to receive..." << endl; //下面开始接收
char reply_head[HEAD_TOTAL];
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));
msgid = 0;
memcpy(&msgid, reply_head, HEAD_LENGTH);
short msglen = 0;
memcpy(&msglen, reply_head + 2, HEAD_LENGTH);
//转为本地字节序
msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);
msgid = boost::asio::detail::socket_ops::network_to_host_short(msgid);
char msg[MAX_LENGTH] = { 0 };
size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));
Json::Reader reader;
reader.parse(std::string(msg, msg_length), root); //将反序列化后的信息存到root
std::cout << "msg id is " << root["id"] << " msg is " << root["data"] << endl;
i++; //发送一个包和接收完一个包后,i++。说明一个线程要完成发送和接收,共1000个包
}
}
catch (std::exception& e) {
std::cerr << "Exception:" << e.what() << endl;
}
});
std::this_thread::sleep_for(std::chrono::seconds(1)); //不能一直创线程建立连接,每创建一个线程连接就睡一秒,不然就是一直向服务器发送请求连接
}
for (auto& t : vec_threads) {
t.join(); //等全部线程结束回收
}
auto end = std::chrono::high_resolution_clock::now(); //执行完后,获取结束时的时间
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); //获取时间差,并转换为微妙
std::cout << "Time spent:" << duration.count() << "microseconds" << std::endl;
return 0;
}

6. asio多线程模式IOThreadPool

6.1 介绍

下面介绍多线程实现的第二种模式,即多线程模式IOThreadPool,我们只初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。

线程池模式的多线程模型调度结构图:

构造函数中实现了一个线程池,线程池里每个线程都会运行_service.run函数,_service.run函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数,进而调用回调函数,因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。
_service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表,在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。

iocp的流程:

1
2
3
4
5
IOCP的使用主要分为以下几步:
1 创建完成端口(iocp)对象
2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求
3 Socket关联iocp对象,在Socket上投递网络事件
4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理

epoll的流程:

1
2
3
4
1 调用epoll_creat在内核中创建一张epoll表
2 开辟一片包含n个epoll_event大小的连续空间
3 将要监听的socket注册到epoll表里
4 调用epoll_wait,传入之前我们开辟的连续空间,epoll_wait返回就绪的epoll_event列表,epoll会将就绪的socket信息写入我们之前开辟的连续空间

所以IOThreadPool模式有一个隐患,同一个socket的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱。

6.2 利用strand改进

对于多线程触发回调函数的情况,我们可以利用asio提供的串行类strand封装一下,这样就可以被串行调用了,其基本原理就是在线程各自调用函数时取消了直接调用的方式,而是利用一个strand类型的对象将要调用的函数投递到strand管理的队列中,再由一个统一的线程调用回调函数,调用是串行的,解决了线程并发带来的安全问题。

上面结构图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数,而是将回调函数投递给strand管理的队列,再由strand统一调度派发。

6.3 相关代码

主函数实现:通过signals.async_wait异步等待,创建了一个子线程负责监听退出信号,当退出信号产生,就会触发该子线程执行,它会停止io_context和线程池,并唤醒阻塞在主线程的锁。主线程就是负责也客户端的一系列监听事件,并阻塞在条件变量cond_quit。等待退出子线程唤醒后,主线程退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;
int main() {
try {
auto pool = AsioThreadPool::GetInstance(); //先将线程池初始化好
boost::asio::io_context io_context;
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); //初始化一个信号集,将要捕获的信号SIGINT, SIGTERM注册到服务io里
//异步等待,等待信号被触发,触发了会执行回调函数(匿名函数表示),io_context是用引用方式来捕获,auto是几个参数,就写几个auto
signals.async_wait([pool, &io_context](auto, auto) { //信号集异步等待,信号集里面的信号触发了,就会执行这个回调函数(匿名函数)
io_context.stop(); //停止io服务
pool->Stop(); //停止线程池
std::unique_lock<std::mutex>lock(mutex_quit); //加锁,对bstop进行修改
bstop = true;
cond_quit.notify_one(); //通知退出(唤醒)
});
//上面因为是异步等待,所以不会阻塞,如果回调函数没有触发,会继续执行后面的代码
CServer s(pool->GetIOService(), 10086);
{ //这个括号的代码是为了不让主线程直接退出,等子线程退出,主线程再退出
std::unique_lock<std::mutex>lock(mutex_quit);
while (!bstop) {
cond_quit.wait(lock); //没有结束之前,会阻塞在这里
}
}
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
}

AsioThreadPool类头文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class AsioThreadPool:public Singleton<AsioThreadPool>
{
public:
friend class Singleton<AsioThreadPool>; //使其基类Singleton<AsioThreadPool>可以访问它的私有函数
~AsioThreadPool() {}
AsioThreadPool& operator = (const AsioThreadPool&) = delete; //取消赋值构造
AsioThreadPool(const AsioThreadPool&) = delete; //取消拷贝构造
boost::asio::io_context& GetIOService(); //获取io_context
void Stop(); //停止该线程池类,友好退出
private:
AsioThreadPool(int threadNum = std::thread::hardware_concurrency()); //hardware_concurrency()是硬件允许的并行数
boost::asio::io_context _service;
std::unique_ptr<boost::asio::io_context::work>_work; //因为不注册读事件和写事件,_service调用.run就会退出,_work负责防止这种情况退出
std::vector<std::thread>_threads; //装线程的容器
};

AsioThreadPool类初始化:用c11特性,直接定义了线程,线程就可以执行。则将线程放到threads容器,线程就可以开始执行了。通过emplace_back插入一个回调函数,表示线程启动后要执行的内容。emplace_back是直接减少一层构造函数的开销,直接调用thread原始的构造函数,即直接传入一个回调函数即可,继而构造成一个线程,插入到_threads里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AsioThreadPool::AsioThreadPool(int threadNum):_work(new boost::asio::io_context::work(_service)) {
//开辟threadNum个线程
for (int i = 0; i < threadNum; i++) {
_threads.emplace_back([this]() { //下面用到的_service是AsioThreadPool的成员,匿名函数只有对器进行捕获了,才能使用
_service.run(); //this->_service.run();也可以。_service.run()返回了就说明结束了,该线程也就会退出
});
}
}
boost::asio::io_context& AsioThreadPool::GetIOService() {
return _service;
}
void AsioThreadPool::Stop() {
_work.reset(); //表示把unique_ptr类型的指针work释放掉,则就会调用_work的析构函数,那么_work开辟的空间就会被回收掉,
//则_service就没有_work去管理它,这时候,如果没有事件到来,_service就会stop了,这种情况就是_service.run会直接返回,不在一直轮询
for (auto& t : _threads) { //等待所有线程退出,一个一个回收
t.join();
}
}

建立连接函数:由于监听客户端是否发来连接用的是刚开始初始化的io_context,后面与客户端的读写事件监听也是用的同一个io_context,所以当与客户端建立成功后,之后的监听读写事件读由该io_context来处理。

1
2
3
4
5
void CServer::StartAccept() {
//多线程线程池模式:不需要有多个io_context,统一使用刚开始传进来的io_context
shared_ptr<CSession> new_session = make_shared<CSession>(_io_context, this);
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}

在Session类中添加一个成员变量_strand,如下所示。strand是个模板,需要声明它的类型。strand他有一个执行类型,要求这个执行类型是上下文的执行类型,也就是跟io_context匹配的执行类型。

1
boost::asio::strand<boost::asio::io_context::executor_type>_strand;

为了让回调函数被派发到strand的队列,我们只需要在注册回调函数时加一层strand的包装即可。

因为在asio中无论iocontext还是strand,底层都是通过executor调度的,可以将他理解为调度器,如果多个iocontext和strand的调度器是一个,那他们的消息派发统一由这个调度器执行。所以我们利用iocontext的调度器构造strand,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可。

比如我们在Start函数里添加读事件绑定 ,将回调函数的调用者绑定为_strand。

1
2
3
4
5
6
7
void CSession::Start() {
::memset(_data, 0, MAX_LENGTH);
//生成一个新的执行器,告诉asio底层,读事件就绪的时候,会调用执行器bind_executor来执行。该执行器会调用_strand队列里的线程,来派发该回调函数。
//也就是说将_strand和回调处理进行绑定生成一个新的执行者,该执行者来派发所有的消息。
//该执行者是唯一的,因为它是通过_stand来绑定的,而_strand只有一个调度器,这个调度器和对应的回调函数绑定起来
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())));
}

写事件绑定:

1
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self)));

如果不绑定执行器bind_executor,这个回调函数会默认让线程来调用。而绑定到_strand上,就由_strand来调用。底层是通过_strand的独立线程来调用。其次还有好处就是,各个线程的回调函数就绪的时候,会自动投递到stran队列里。

6.4 多线程模式使用建议

实际的生产和开发中,我们尽可能利用C++特性,使用多核的优势,将iocontext分布在不同的线程中效率更可取一点,但也要防止线程过多导致cpu切换带来的时间片开销,所以尽量让开辟的线程数小于或等于cpu的核数,从而利用多核优势。

7. asio协程实现并发服务器

7.1 介绍

ASIO协程是一种编程技术,‌它允许程序员以更简洁和直观的方式编写异步代码,‌特别是在处理网络编程时。‌ASIO协程通过提供一种无栈协程(stackless coroutine)的方式,‌简化了异步编程的复杂性,‌尤其是在内存管理方面。‌这种技术通过提供一种更高级别的抽象,‌使得程序员可以更加专注于业务逻辑的实现,‌而不是陷入底层细节的处理中。‌

利用协程实现并发程序有两个好处:

  • 将回调函数改写为顺序调用,提高开发效率。
  • 协程调度比线程调度更轻量化,因为协程是运行在用户空间的,线程切换需要在用户空间和内核空间切换。

7.2 协程的简单实现

在下面的协程实现中:

  • awaitable表示声明了一个函数,那么这个函数就变为可等待的函数了,比如listener被添加awaitable<void>之后,就可以被协程调用和等待了。
  • co_spawn表示启动一个协程,参数分别为调度器,执行的函数,以及启动方式, 比如我们启动了一个协程,deatched表示将协程对象分离出来,这种启动方式可以启动多个协程,他们都是独立的,如何调度取决于调度器,在用户的感知上更像是线程调度的模式,类似于并发运行,其实底层都是串行的。
  • acceptor接收到连接后,继续调用co_spawn启动一个协程,用来执行echo逻辑。echo逻辑里也是通过co_wait的方式接收和发送数据的,如果对端不发数据,执行echo的协程就会挂起,另一个协程启动,继续接收新的连接。当没有连接到来,接收新连接的协程挂起,如果所有协程都挂起,则等待新的就绪事件(对端发数据,或者新连接)到来唤醒。

服务端实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
using boost::asio::ip::tcp;
using boost::asio::awaitable; //异步等待
using boost::asio::co_spawn;
using boost::asio::detached; //让协程与主线程分离(协程独立启动)
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro; //返回当前协程可执行的环境
//负责通信协程的执行函数
awaitable <void>echo(tcp::socket socket) { //传过来与对端连接的socket
try {
char data[1024];
for (;;) {
//将异步变成同步,变成可等待的,通过协程的方式来使用。co_await表示阻塞时会释放占有的资源,交给其它协程和主线程
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable); //同步的发送给对端
}
}
catch (std::exception& e) {
std::cout << "echo exception is " << e.what() << std::endl;
}
}
//负责监听对端连接协程的执行函数
awaitable <void> listener() { //awaitable是可等待的。协程就可以直接执行这个函数了,返回一个可执行对象
//获取一个调度器
auto executor = co_await this_coro::executor; //返回一个协程的调度器。co_await表示异步的去查询调度器,一直找不到,就先挂起,把使用权交给主线程和其它协程
//监听对端的连接,acceptor调度是用的执行器executor,该执行器是协程的执行器,而协程执行器是主函数co_spawn里面的io_context
tcp::acceptor acceptor(executor, { tcp::v4(),10086 });
for (;;) {
//acceptor.async_accept是我们之前的异步处理,后面需要传递一个回调函数。因为这是异步的,不能阻塞的去等待,asio底层是通过回调函数来通知我们的。但使用协程,加上co_await关键字后,就可以写成这种同步式的代码,即可以阻塞等待接收,但这是协程内部的阻塞,他会通过co_await来释放使用权,把协程占用的资源都释放出来(看起来是阻塞,但不会影响主线程向下的执行)
tcp::socket socket = co_await acceptor.async_accept(use_awaitable); //当收到对端的连接,就返回一个socket。use_awaitable是在使用异步函数的时候,告诉asio,通过协程的方式来使用(即将异步函数变成可等待的函数)
std::cout<<"connect is successful"<<std::endl;
//建立连接成功,再启动一个协程,负责通信
co_spawn(executor, echo(std::move(socket)), detached); //执行器;要执行的函数;分离。socket通过移动操作传过去,这边的socket就不能用了
}
}
int main()
{
try {
boost::asio::io_context io_context(1); //将使用一个工作线程来执行所有的异步操作
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); //创建了一个信号集,绑定在上下文服务上,信号集处理的信号是 SIGINT、SIGTERM
//异步等待。捕获引用,把前面所有到的变量都用引用捕获。()里面是要处理的参数,几个信号,就传几个参数
signals.async_wait([&](auto, auto) {
io_context.stop(); //收到两个信号,就服务停止
});
//启动协程。将上下文里面的调度器与协程绑定起来(协程的调度是交给io_context内部的调度器来控制的),并且协程要执行listener()
co_spawn(io_context, listener(), detached); //detached可以理解为分离出来,严谨地说是先让协程独立运行,不会阻塞在这里
io_context.run(); //io_context在没有绑定任何事件的时候,会直接退出,这程序就会执行完(退出)
}
catch (std::exception& e) {
std::cout << "Exception is " << e.what() << std::endl;
}
}

客户端实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int main()
{
try {
boost::asio::io_context ioc;
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086); //传入回送地址,也是可以理解为本地地址
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found;
sock.connect(remote_ep, error);
if (error) { //不为0,就说明报错了
cout << "connect failed, code is " << error.value() << "error msg is" << error.message() << std::endl;
return 0;
}
std::cout << "Enter message:"; //终端打印提示信息
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH); //读取终端输入的信息,存到request
size_t request_length = strlen(request); //获取输入信息长度
boost::asio::write(sock, boost::asio::buffer(request, request_length)); //同步发送给服务器
char reply[MAX_LENGTH];
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length)); //同步接收服务器的信息
cout << "reply is " << string(reply, reply_length) << endl; //打印出服务端发来的信息
getchar();
}
catch (std::exception& e) {
std::cerr << "EXception is" << e.what() << std::endl;
}
return 0;
}

7.3 协程改进服务器

我们可以通过协程改进服务器编码流程,用一个iocontext管理绑定acceptor用来接收对端新的连接,再从服务池IOServicePool里取其它的io_contex来管理连接的收发操作,将之前服务器中每个连接的接收数据操作改为启动一个协程,通过顺序的方式读取收到的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void CSession::Start() {
//start开始,后面就需要结合协程来完成了
auto shared_this = shared_from_this(); //防止协程的智能指针被意外的释放,引用计数+1
//开启协程接收
//[=]表示以值的方式去捕获局部变量,捕获的局部变量引用计数可以加+;->awaitable<void>表示匿名函数要返回的类型
boost::asio::co_spawn(_io_context, [=]()->boost::asio::awaitable<void> {
try {
for (; !_b_close;) { //当变量_b_close置为true时,会退出该协程
_recv_head_node->Clear(); //先初始化头部节点(清0,方便存下一次的数据)
//先读头部数据,异步变成同步了。只有读到指定长度HEAD_DATA_LEN数据才返回,读不到该协程就挂起,让出使用权,给其它协程或主线程
std::size_t n = co_await boost::asio::async_read(_socket, boost::asio::buffer(_recv_head_node->_data, HEAD_DATA_LEN), boost::asio::use_awaitable);
if (n == 0) { //接收长度为0,说明对端关闭了
std::cout << "receive peer closed" << std::endl;
Close();
_server->ClearSession(_uuid);
co_return; //使用协程内部的一个返回函数
}
//获取头部MSGID数据
short msg_id = 0; //存的头部数据的id
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//将读到头部数据的前2个字节(id)存到变量里
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id); //将id转为本地字节序
std::cout << "msg_id is " << msg_id << std::endl;
if (msg_id > MAX_LENGTH) { //如果读到的id超过规定的id范围,则是异常
std::cout << "invalid msg id is " << msg_id << std::endl;
Close();
_server->ClearSession(_uuid);
co_return;
}
//开始读数据长度
short msg_len = 0; //存放数据长度变量
memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len); //将数据长度转为本地字节序
std::cout << "msg len is " << msg_id << std::endl;
if (msg_len > MAX_LENGTH) { //如果读到的数据长度超过规定的最大长度,则是异常
std::cout << "invalid msg len is " << msg_len << std::endl;
Close();
_server->ClearSession(_uuid);
co_return;
}
//异步读出包体
n == co_await boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data, _recv_msg_node->_total_len), boost::asio::use_awaitable);
if (n == 0) { //对端关闭
std::cout << "receive peer closed" << std::endl;
Close();
_server->ClearSession(_uuid);
co_return; //使用协程内部的一个返回函数
}
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //在数据最后面加上一个结尾符
std::cout << "receive data is " << _recv_msg_node->_data << std::endl;
//投递到逻辑线程处理
LogicSystem::GetInstance().PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
}
}
catch (std::exception& e) {
std::cout << "exception is " << e.what() << std::endl;
Close();
_server->ClearSession(_uuid);
}
}, boost::asio::detached);
}

对于其它部分,和之前的服务器逻辑一样,出于性能的考虑,没有把服务器的接收数据和发送数据都通过协程来完成,而只是将接收数据通过协程来完成,发送数据还是通过异步写的形式来完成的。