1. asio的通信流程 首先是在应用层调用async_read()函数,就相当于是往io_context里面注册读事件,并且注册读回调函数。然后io_context会把对应的读事件、socket和回调都写到epoll模型或iocp模型里,即注册给这两个模型。而在应用层调用io_context.run的时候,实际上是一个死循环,它会调用linux的epoll模型或windows的iocp模型,以死循环的方式不断的轮询,不断的去检测我们注册的那些socket那些就绪了
如果有socket就绪了,比如说我们注册了一个socket监听对端的一个发送事件,对端发送过来了,我们这个读事件就会就绪了。读事件就绪的话,就会把读事件对应的回调函数写到就绪的队列里。如果是单线程的话,这些事件的回调函数放到就绪的队列里,系统直接就会把就绪队列里的回调函数一个一个取出来,按顺序来给我们回调,顺序就是我们在底层去轮询,发现哪个socket回调函数先就绪了,就先放到就绪队列里。如果是写的事件,它也会被放到该就绪队列里,最后由asio统一派发。
2. 字节序处理 2.1 字节序的问题 在计算机网络中,由于不同的计算机使用的 CPU 架构和字节顺序可能不同,因此在传输数据时需要对数据的字节序进行统一,以保证数据能够正常传输和解析。具体来说,计算机内部存储数据的方式有两种:大端序和小端序。在大端序中,高位字节存储在低地址处,而低位字节存储在高地址处;在小端序中,高位字节存储在高地址处,而低位字节存储在低地址处。
在网络通信过程中,通常使用的是大端序。这是因为早期的网络硬件大多采用了 Motorola 处理器,而 Motorola 处理器使用的是大端序。此外,大多数网络协议规定了网络字节序必须为大端序。因此,在进行网络编程时,需要将主机字节序转换为网络字节序,也就是将数据从本地字节序转换为大端序。可以使用诸如 htonl、htons、ntohl 和 ntohs 等函数来实现字节序转换操作。
综上所述,网络字节序的主要作用是统一不同计算机间的数据表示方式,以保证数据在网络中的正确传输和解析。
判断当前本机的字节序:
1 2 3 4 5 6 7 8 9 10 11 bool is_big_endian () { int num = 1 ; if (*(char *)&num == 1 ) { return false ; } else { return true ; } }
2.2 服务器中使用字节序 1.主机字节序转换为网络字节序
2.网络字节序转换为主机字节序
在异步处理的服务器中,当接受客户端发来的数据时,需要将其进行转换,从网络字节序转换为本地字节序:
1 2 3 4 5 short data_len = 0 ; memcpy (&data_len, _recv_head_node->_data, HEAD_LENGTH); data_len=boost::asio::detail::socket_ops::network_to_host_short (data_len); cout << "data_len is " << data_len << endl;
在服务器的发送数据时会构造消息节点,构造消息节点时,将发送长度由本地字节序转化为网络字节序:
1 2 3 4 5 6 7 8 MsgNode (char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0 ){ _data = new char [_total_len+1 ](); int max_len_host = boost::asio::detail::socket_ops::host_to_network_short (max_len); memcpy (_data, &max_len_host, HEAD_LENGTH); memcpy (_data+ HEAD_LENGTH, msg, max_len); _data[_total_len] = '\0' ; }
3. asio处理粘包的另一种方式 我们之前使用的一种方式是通过async_read_some函数监听读事件,并且绑定了读事件的回调函数HandleRead。async_read_some 这个函数的特点是只要对端发数据,服务器接收到数据,即使没有收全对端发送的数据也会触发HandleRead函数,所以我们会在HandleRead回调函数里判断接收的字节数,接收的数据可能不满足头部长度,可能大于头部长度但小于消息体的长度,可能大于消息体的长度,还可能大于多个消息体的长度,所以要切包等,这些逻辑写起来很复杂。
所以我们可以通过读取指定字节数,直到读完这些字节才触发回调函数,那么可以采用async_read函数,这个函数指定读取指定字节数,只有完全读完才会触发回调函数。
3.1 获取头部数据 当服务器与客户端建立完毕,进入通信工作,先就绪读头部数据
1 2 3 4 5 6 void CSession::Start () { _recv_head_node->Clear (); boost::asio::async_read (_socket, boost::asio::buffer (_recv_head_node->_data, HEAD_LENGTH), std::bind (&CSession::HandleReadHead, this , std::placeholders::_1, std::placeholders::_2, SharedSelf ())); }
当对端发来指定大小HEAD_LENGTH数据,并存到_recv_head_node->_data时,触发回调函数HandleReadHead。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void CSession::HandleReadHead (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) { if (!error) { if (bytes_transferred < HEAD_LENGTH) { cout << "read head length error" ; Close (); _server->ClearSession (_uuid); return ; } short data_len = 0 ; memcpy (&data_len, _recv_head_node->_data, HEAD_LENGTH); cout << "data len is" << data_len << endl; if (data_len > MAX_LENGTH) { std::cout << "invalid data length is" << data_len << endl; _server->ClearSession (_uuid); return ; } _recv_msg_node = make_shared <MsgNode>(data_len); boost::asio::async_read (_socket, boost::asio::buffer (_recv_msg_node->_data,_recv_msg_node->_total_len), std::bind (&CSession::HandleReadMsg, this , std::placeholders::_1, std::placeholders::_2, SharedSelf ())); } else { cout << "handle read head failed,error is" << error.what () << endl; Close (); _server->ClearSession (_uuid); } }
3.2 获取信息体 当获取完头部数据后,就可以在其回调函数里执行异步读async_read,直接获取信息数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void CSession::HandleReadMsg (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) { if (!error) { _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_data << endl; Send (_recv_msg_node->_data, _recv_msg_node->_total_len); _recv_head_node->Clear (); boost::asio::async_read (_socket, boost::asio::buffer (_recv_head_node->_data, HEAD_LENGTH), std::bind (&CSession::HandleReadHead, this , std::placeholders::_1, std::placeholders::_2, SharedSelf ())); } else { cout << "handle read Msg failed,error is" << error.what () << endl; Close (); _server->ClearSession (_uuid); } }
4. 结合Json实现粘包处理tlv 我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。结构图如下:
为了减少耦合和歧义,这里重新设计消息节点。
MsgNode
表示消息节点的基类,头部的消息用这个结构存储
RecvNode
表示接收消息的节点
SendNode
表示发送消息的节点
4.1 信息节点类 MsgNode.h头文件定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class MsgNode { public : MsgNode (short max_len) :_total_len(max_len), _cur_len(0 ) { _data = new char [_total_len + 1 ](); _data[_total_len] = '\0' ; } ~MsgNode () { std::cout << "destruct MsgNode" << endl; delete [] _data; } void Clear () { ::memset (_data, 0 , _total_len); _cur_len = 0 ; } short _cur_len; short _total_len; char * _data; }; class RecvNode :public MsgNode {public : RecvNode (short max_len, short msg_id); private : short _msg_id; }; class SendNode :public MsgNode {public : SendNode (const char * msg,short max_len, short msg_id); private : short _msg_id; };
接受数据节点和发送数据节点初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 RecvNode::RecvNode (short max_len, short msg_id):MsgNode (max_len),_msg_id(msg_id){ } SendNode::SendNode (const char * msg, short max_len, short msg_id):MsgNode (max_len + HEAD_TOTAL_LEN), _msg_id(msg_id){ short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short (msg_id); memcpy (_data, &msg_id_host, HEAD_ID_LEN); short max_len_host = boost::asio::detail::socket_ops::host_to_network_short (max_len); memcpy (_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN); memcpy (_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len); }
4.2 Session类 Session类头节点如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class CSession : public std::enable_shared_from_this<CSession>{ public : CSession (boost::asio::io_context& io_context, CServer* server); ~CSession (); tcp::socket& GetSocket () ; std::string& GetUuid () ; void Start () ; void Send (char * msg, short max_length, short msgid) ; void Send (std::string msg, short msgid) ; void Close () ; std::shared_ptr<CSession> SharedSelf () ; private : void HandleRead (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) ; void HandleWrite (const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) ; tcp::socket _socket; std::string _uuid; char _data[MAX_LENGTH]; CServer* _server; bool _b_close; std::queue<shared_ptr<MsgNode> > _send_que; std::mutex _send_lock; std::shared_ptr<MsgNode> _recv_msg_node; bool _b_head_parse; std::shared_ptr<MsgNode> _recv_head_node; };
Session类实现功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 CSession::CSession (boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false ),_b_head_parse(false ){ boost::uuids::uuid a_uuid = boost::uuids::random_generator ()(); _uuid = boost::uuids::to_string (a_uuid); _recv_head_node = make_shared <MsgNode>(HEAD_TOTAL_LEN); } CSession::~CSession () { std::cout << "~CSession destruct" << endl; } tcp::socket& CSession::GetSocket () { return _socket; } std::string& CSession::GetUuid () { return _uuid; } void CSession::Start () { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, SharedSelf ())); } void CSession::Send (std::string msg, short msgid) { std::lock_guard<std::mutex> lock (_send_lock) ; int send_que_size = _send_que.size (); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl; return ; } _send_que.push (make_shared <SendNode>(msg.c_str (), msg.length (), msgid)); if (send_que_size > 0 ) { return ; } auto & msgnode = _send_que.front (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, SharedSelf ())); } void CSession::Send (char * msg, short max_length, short msgid) { std::lock_guard<std::mutex> lock (_send_lock) ; int send_que_size = _send_que.size (); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl; return ; } _send_que.push (make_shared <SendNode>(msg, max_length, msgid)); if (send_que_size>0 ) { return ; } auto & msgnode = _send_que.front (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, SharedSelf ())); } void CSession::Close () { _socket.close (); _b_close = true ; } std::shared_ptr<CSession>CSession::SharedSelf () { return shared_from_this (); } void CSession::HandleWrite (const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) { try { if (!error) { std::lock_guard<std::mutex> lock (_send_lock) ; _send_que.pop (); if (!_send_que.empty ()) { auto & msgnode = _send_que.front (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, shared_self)); } } else { std::cout << "handle write failed, error is " << error.what () << endl; Close (); _server->ClearSession (_uuid); } } catch (std::exception& e) { std::cerr << "Exception code : " << e.what () << endl; } } void CSession::HandleRead (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) { try { if (!error) { int copy_len = 0 ; while (bytes_transferred > 0 ) { if (!_b_head_parse) { if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) { memcpy (_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); _recv_head_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, shared_self)); return ; } int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; memcpy (_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain); copy_len += head_remain; bytes_transferred -= head_remain; short msg_id = 0 ; memcpy (&msg_id, _recv_head_node->_data, HEAD_ID_LEN); msg_id = boost::asio::detail::socket_ops::network_to_host_short (msg_id); std::cout << "msg_id is " << msg_id << endl; if (msg_id > MAX_LENGTH) { std::cout << "invalid msg_id is " << msg_id << endl; _server->ClearSession (_uuid); return ; } short msg_len = 0 ; memcpy (&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN); msg_len = boost::asio::detail::socket_ops::network_to_host_short (msg_len); std::cout << "msg_len is " << msg_len << endl; if (msg_len > MAX_LENGTH) { std::cout << "invalid data length is " << msg_len << endl; _server->ClearSession (_uuid); return ; } _recv_msg_node = make_shared <RecvNode>(msg_len, msg_id); if (bytes_transferred < msg_len) { memcpy (_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, shared_self)); _b_head_parse = true ; return ; } memcpy (_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len); _recv_msg_node->_cur_len += msg_len; copy_len += msg_len; bytes_transferred -= msg_len; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; Json::Reader reader; Json::Value root; reader.parse (std::string (_recv_msg_node->_data, _recv_msg_node->_total_len), root); std::cout << "recevie msg id is " << root["id" ].asInt () << " msg data is " << root["data" ].asString () << endl; root["data" ] = "server has received msg, msg data is " + root["data" ].asString (); std::string return_str = root.toStyledString (); Send (return_str, root["id" ].asInt ()); _b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, shared_self)); return ; } continue ; } int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len; if (bytes_transferred < remain_msg) { memcpy (_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, shared_self)); return ; } memcpy (_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg); _recv_msg_node->_cur_len += remain_msg; bytes_transferred -= remain_msg; copy_len += remain_msg; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; Json::Reader reader; Json::Value root; reader.parse (std::string (_recv_msg_node->_data, _recv_msg_node->_total_len), root); std::cout << "recevie msg id is " << root["id" ].asInt () << " msg data is " << root["data" ].asString () << endl; root["data" ] = "server has received msg, msg data is " + root["data" ].asString (); std::string return_str = root.toStyledString (); Send (return_str, root["id" ].asInt ()); _b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, shared_self)); return ; } continue ; } } else { std::cout << "handle read failed, error is " << error.what () << endl; Close (); _server->ClearSession (_uuid); } } catch (std::exception& e) { std::cout << "Exception code is " << e.what () << endl; } }
对于server类,没有出现其它变化,与之前代码差不多。
5. asio多线程模型IOServicePool 前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两种多线程模型,第一个是启动多个线程,每个线程管理一个iocontext;第二种是只启动一个iocontext,被多个线程共享。这里主要使用第一种模式,多个线程,每个线程管理独立的iocontext服务。
下面先介绍多线程的第一种模式:一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。为避免线程安全问题,我们将网络数据封装为逻辑包投递给逻辑系统,逻辑系统有一个单独线程处理,这样将网络IO和逻辑处理解耦合,极大的提高了服务器IO层面的吞吐率。
5.1 单线程与多线程 之前使用的单线程模型如下:
IOServicePool类型的多线程模型如下:
IOServicePool多线程模式特点:
每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。
多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是串行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。
5.2 服务端实现 主函数:先初始化服务池,创建一个负责监听的io_context,通过asio提供的异步等待函数和线程来完成信号的捕捉,当触发SIGINT和SIGTERM信号时,停止io服务,即不在监听处理客户端发来的请求连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int main () { try { auto pool = AsioIOServicePool::GetInstance (); boost::asio::io_context io_context; boost::asio::signal_set signals (io_context, SIGINT, SIGTERM) ; signals.async_wait ([&io_context](auto , auto ) { io_context.stop (); }); CServer s (io_context, 10086 ) ; io_context.run (); } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << endl; } }
CServer类实现:这部分负责实现监听客户端的连接和获取服务池的io_context。每与客户端建立成功一次,就从服务池中获取一个io_context来负责处理与连接的客户端的一些业务,实现了一个线程一个io_context的多线程模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 CServer::CServer (boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port), _acceptor(io_context, tcp::endpoint (tcp::v4 (),port)) { cout << "Server start success, listen on port : " << _port << endl; StartAccept (); } void CServer::StartAccept () { auto & io_context = AsioIOServicePool::GetInstance ()->GetIOService (); shared_ptr<CSession> new_session = make_shared <CSession>(io_context, this ); _acceptor.async_accept (new_session->GetSocket (), std::bind (&CServer::HandleAccept, this , new_session, placeholders::_1)); } void CServer::HandleAccept (shared_ptr<CSession> new_session, const boost::system::error_code& error) { if (!error) { new_session->Start (); _sessions.insert (make_pair (new_session->GetUuid (), new_session)); } else { cout << "session accept failed, error is " << error.what () << endl; } StartAccept (); } void CServer::ClearSession (std::string uuid) { _sessions.erase (uuid); }
AsioIOServicePool类服务池的头文件实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class AsioIOServicePool :public Singleton<AsioIOServicePool>{ friend Singleton<AsioIOServicePool>; public : using IOService = boost::asio::io_context; using Work = boost::asio::io_context::work; using WorkPtr = std::unique_ptr<Work>; ~AsioIOServicePool (); AsioIOServicePool (const AsioIOServicePool&) = delete ; AsioIOServicePool& operator = (const AsioIOServicePool&) = delete ; boost::asio::io_context& GetIOService () ; void Stop () ; private : AsioIOServicePool (std::size_t size = std::thread::hardware_concurrency ()); std::vector<IOService>_ioServices; std::vector<WorkPtr>_works; std::vector<std::thread>_threads; std::size_t _nextIOService; };
AsioIOServicePool类服务池的功能实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 AsioIOServicePool::AsioIOServicePool (std::size_t size) :_ioServices(size), _works(size), _nextIOService(0 ) { for (std::size_t i = 0 ; i < size; i++) { _works[i] = std::unique_ptr <Work>(new Work (_ioServices[i])); } for (std::size_t i = 0 ; i < _ioServices.size (); i++) { _threads.emplace_back ([this , i]() { _ioServices[i].run (); }); } } AsioIOServicePool::~AsioIOServicePool () { std::cout << "AsioIOServicePool destruct" << endl; } boost::asio::io_context& AsioIOServicePool::GetIOService () { auto & service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size ()) { _nextIOService = 0 ; } return service; } void AsioIOServicePool::Stop () { for (auto & Work : _works) { Work.reset (); } for (auto & t : _threads) { t.join (); } }
由于监听客户端是否发来连接用的是刚开始初始化的io_context,后面与客户端的读写事件监听用的是服务池里面的io_context,所以当与客户端建立成功后,之后处理监听读写事件的io_context就需要从服务池里面去取。
1 2 3 4 5 6 void CServer::StartAccept () { auto & io_context = AsioIOServicePool::GetInstance ()->GetIOService (); shared_ptr<CSession> new_session = make_shared <CSession>(io_context, this ); _acceptor.async_accept (new_session->GetSocket (), std::bind (&CServer::HandleAccept, this , new_session, placeholders::_1)); }
5.3 客户端实现 客户端实现的是创建100个线程,每个线程都通过json序列化一个信息包发送给服务端,并将从服务器接收的信息包反序列化,这样重复500次。测试一下等所有线程都执行完,并且都被回收所需要花费的时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 std::vector<thread> vec_threads; int main () { auto start = std::chrono::high_resolution_clock::now (); for (int i = 0 ; i < 100 ; i++) { vec_threads.emplace_back ([]() { try { boost::asio::io_context ioc; tcp::endpoint remote_ep (address::from_string ("127.0.0.1" ), 10086 ); tcp::socket sock (ioc); boost::system::error_code error = boost::asio::error::host_not_found; sock.connect (remote_ep, error); if (error) { cout << "connect failed, code is " << error.value () << " error msg is " << error.message (); return 0 ; } int i = 0 ; while (i < 500 ) { Json::Value root; root["id" ] = 1001 ; root["data" ] = "hello world" ; std::string request = root.toStyledString (); size_t request_length = request.length (); char send_data[MAX_LENGTH] = { 0 }; int msgid = 1001 ; int msgid_host = boost::asio::detail::socket_ops::host_to_network_short (msgid); memcpy (send_data, &msgid_host, 2 ); int request_host_length = boost::asio::detail::socket_ops::host_to_network_short (request_length); memcpy (send_data + 2 , &request_host_length, 2 ); memcpy (send_data + 4 , request.c_str (), request_length); boost::asio::write (sock, boost::asio::buffer (send_data, request_length + 4 )); cout << "begin to receive..." << endl; char reply_head[HEAD_TOTAL]; size_t reply_length = boost::asio::read (sock, boost::asio::buffer (reply_head, HEAD_TOTAL)); msgid = 0 ; memcpy (&msgid, reply_head, HEAD_LENGTH); short msglen = 0 ; memcpy (&msglen, reply_head + 2 , HEAD_LENGTH); msglen = boost::asio::detail::socket_ops::network_to_host_short (msglen); msgid = boost::asio::detail::socket_ops::network_to_host_short (msgid); char msg[MAX_LENGTH] = { 0 }; size_t msg_length = boost::asio::read (sock, boost::asio::buffer (msg, msglen)); Json::Reader reader; reader.parse (std::string (msg, msg_length), root); std::cout << "msg id is " << root["id" ] << " msg is " << root["data" ] << endl; i++; } } catch (std::exception& e) { std::cerr << "Exception:" << e.what () << endl; } }); std::this_thread::sleep_for (std::chrono::seconds (1 )); } for (auto & t : vec_threads) { t.join (); } auto end = std::chrono::high_resolution_clock::now (); auto duration = std::chrono::duration_cast <std::chrono::microseconds>(end - start); std::cout << "Time spent:" << duration.count () << "microseconds" << std::endl; return 0 ; }
6. asio多线程模式IOThreadPool 6.1 介绍 下面介绍多线程实现的第二种模式,即多线程模式IOThreadPool,我们只初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run
在多个线程中调用,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。
线程池模式的多线程模型调度结构图:
构造函数中实现了一个线程池,线程池里每个线程都会运行_service.run
函数,_service.run
函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数,进而调用回调函数,因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。_service.run
内部在Linux环境下调用的是epoll_wait
返回所有就绪的描述符列表,在windows上会循环调用GetQueuedCompletionStatus
函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。
iocp的流程:
1 2 3 4 5 IOCP的使用主要分为以下几步: 1 创建完成端口(iocp)对象 2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求 3 Socket关联iocp对象,在Socket上投递网络事件 4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理
epoll的流程:
1 2 3 4 1 调用epoll_creat在内核中创建一张epoll表 2 开辟一片包含n个epoll_event大小的连续空间 3 将要监听的socket注册到epoll表里 4 调用epoll_wait,传入之前我们开辟的连续空间,epoll_wait返回就绪的epoll_event列表,epoll会将就绪的socket信息写入我们之前开辟的连续空间
所以IOThreadPool模式有一个隐患,同一个socket的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱。
6.2 利用strand改进 对于多线程触发回调函数的情况,我们可以利用asio提供的串行类strand封装一下,这样就可以被串行调用了,其基本原理就是在线程各自调用函数时取消了直接调用的方式,而是利用一个strand类型的对象将要调用的函数投递到strand管理的队列中,再由一个统一的线程调用回调函数,调用是串行的,解决了线程并发带来的安全问题。
上面结构图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数,而是将回调函数投递给strand管理的队列,再由strand统一调度派发。
6.3 相关代码 主函数实现:通过signals.async_wait异步等待,创建了一个子线程负责监听退出信号,当退出信号产生,就会触发该子线程执行,它会停止io_context和线程池,并唤醒阻塞在主线程的锁。主线程就是负责也客户端的一系列监听事件,并阻塞在条件变量cond_quit。等待退出子线程唤醒后,主线程退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 bool bstop = false ;std::condition_variable cond_quit; std::mutex mutex_quit; int main () { try { auto pool = AsioThreadPool::GetInstance (); boost::asio::io_context io_context; boost::asio::signal_set signals (io_context, SIGINT, SIGTERM) ; signals.async_wait ([pool, &io_context](auto , auto ) { io_context.stop (); pool->Stop (); std::unique_lock<std::mutex>lock (mutex_quit); bstop = true ; cond_quit.notify_one (); }); CServer s (pool->GetIOService(), 10086 ) ; { std::unique_lock<std::mutex>lock (mutex_quit); while (!bstop) { cond_quit.wait (lock); } } } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << endl; } }
AsioThreadPool类头文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class AsioThreadPool :public Singleton<AsioThreadPool>{ public : friend class Singleton <AsioThreadPool>; ~AsioThreadPool () {} AsioThreadPool& operator = (const AsioThreadPool&) = delete ; AsioThreadPool (const AsioThreadPool&) = delete ; boost::asio::io_context& GetIOService () ; void Stop () ; private : AsioThreadPool (int threadNum = std::thread::hardware_concurrency ()); boost::asio::io_context _service; std::unique_ptr<boost::asio::io_context::work>_work; std::vector<std::thread>_threads; };
AsioThreadPool类初始化:用c11特性,直接定义了线程,线程就可以执行。则将线程放到threads容器,线程就可以开始执行了。通过emplace_back插入一个回调函数,表示线程启动后要执行的内容。emplace_back是直接减少一层构造函数的开销,直接调用thread原始的构造函数,即直接传入一个回调函数即可,继而构造成一个线程,插入到_threads里面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 AsioThreadPool::AsioThreadPool (int threadNum):_work(new boost::asio::io_context::work (_service)) { for (int i = 0 ; i < threadNum; i++) { _threads.emplace_back ([this ]() { _service.run (); }); } } boost::asio::io_context& AsioThreadPool::GetIOService () { return _service; } void AsioThreadPool::Stop () { _work.reset (); for (auto & t : _threads) { t.join (); } }
建立连接函数:由于监听客户端是否发来连接用的是刚开始初始化的io_context,后面与客户端的读写事件监听也是用的同一个io_context,所以当与客户端建立成功后,之后的监听读写事件读由该io_context来处理。
1 2 3 4 5 void CServer::StartAccept () { shared_ptr<CSession> new_session = make_shared <CSession>(_io_context, this ); _acceptor.async_accept (new_session->GetSocket (), std::bind (&CServer::HandleAccept, this , new_session, placeholders::_1)); }
在Session类中添加一个成员变量_strand,如下所示。strand是个模板,需要声明它的类型。strand他有一个执行类型,要求这个执行类型是上下文的执行类型,也就是跟io_context匹配的执行类型。
1 boost::asio::strand<boost::asio::io_context::executor_type>_strand;
为了让回调函数被派发到strand的队列,我们只需要在注册回调函数时加一层strand的包装即可。
因为在asio中无论iocontext还是strand,底层都是通过executor调度的,可以将他理解为调度器,如果多个iocontext和strand的调度器是一个,那他们的消息派发统一由这个调度器执行。所以我们利用iocontext的调度器构造strand,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可。
比如我们在Start函数里添加读事件绑定 ,将回调函数的调用者绑定为_strand。
1 2 3 4 5 6 7 void CSession::Start () { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), boost::asio::bind_executor (_strand, std::bind (&CSession::HandleRead, this , std::placeholders::_1, std::placeholders::_2, SharedSelf ()))); }
写事件绑定:
1 boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), boost::asio::bind_executor (_strand, std::bind (&CSession::HandleWrite, this , std::placeholders::_1, shared_self)));
如果不绑定执行器bind_executor,这个回调函数会默认让线程来调用。而绑定到_strand上,就由_strand来调用。底层是通过_strand的独立线程来调用。其次还有好处就是,各个线程的回调函数就绪的时候,会自动投递到stran队列里。
6.4 多线程模式使用建议 实际的生产和开发中,我们尽可能利用C++特性,使用多核的优势,将iocontext分布在不同的线程中效率更可取一点,但也要防止线程过多导致cpu切换带来的时间片开销,所以尽量让开辟的线程数小于或等于cpu的核数,从而利用多核优势。
7. asio协程实现并发服务器 7.1 介绍 ASIO协程 是一种编程技术,它允许程序员以更简洁和直观的方式编写异步代码,特别是在处理网络编程时。ASIO协程通过提供一种无栈协程(stackless coroutine)的方式,简化了异步编程的复杂性,尤其是在内存管理方面。这种技术通过提供一种更高级别的抽象,使得程序员可以更加专注于业务逻辑的实现,而不是陷入底层细节的处理中。
利用协程实现并发程序有两个好处:
将回调函数改写为顺序调用,提高开发效率。
协程调度比线程调度更轻量化,因为协程是运行在用户空间的,线程切换需要在用户空间和内核空间切换。
7.2 协程的简单实现 在下面的协程实现中:
awaitable表示声明了一个函数,那么这个函数就变为可等待的函数了,比如listener
被添加awaitable<void>
之后,就可以被协程调用和等待了。
co_spawn
表示启动一个协程,参数分别为调度器,执行的函数,以及启动方式, 比如我们启动了一个协程,deatched表示将协程对象分离出来,这种启动方式可以启动多个协程,他们都是独立的,如何调度取决于调度器,在用户的感知上更像是线程调度的模式,类似于并发运行,其实底层都是串行的。
当acceptor
接收到连接后,继续调用co_spawn
启动一个协程,用来执行echo逻辑。echo逻辑里也是通过co_wait的方式接收和发送数据的,如果对端不发数据,执行echo的协程就会挂起,另一个协程启动,继续接收新的连接。当没有连接到来,接收新连接的协程挂起,如果所有协程都挂起,则等待新的就绪事件(对端发数据,或者新连接)到来唤醒。
服务端实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 using boost::asio::ip::tcp;using boost::asio::awaitable; using boost::asio::co_spawn;using boost::asio::detached; using boost::asio::use_awaitable;namespace this_coro = boost::asio::this_coro; awaitable <void >echo (tcp::socket socket) { try { char data[1024 ]; for (;;) { std::size_t n = co_await socket.async_read_some (boost::asio::buffer (data), use_awaitable); co_await async_write (socket, boost::asio::buffer(data, n), use_awaitable) ; } } catch (std::exception& e) { std::cout << "echo exception is " << e.what () << std::endl; } } awaitable <void > listener () { auto executor = co_await this_coro::executor; tcp::acceptor acceptor (executor, { tcp::v4(),10086 }) ; for (;;) { tcp::socket socket = co_await acceptor.async_accept (use_awaitable); std::cout<<"connect is successful" <<std::endl; co_spawn (executor, echo (std::move (socket)), detached); } } int main () { try { boost::asio::io_context io_context (1 ) ; boost::asio::signal_set signals (io_context, SIGINT, SIGTERM) ; signals.async_wait ([&](auto , auto ) { io_context.stop (); }); co_spawn (io_context, listener (), detached); io_context.run (); } catch (std::exception& e) { std::cout << "Exception is " << e.what () << std::endl; } }
客户端实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 int main () { try { boost::asio::io_context ioc; tcp::endpoint remote_ep (address::from_string("127.0.0.1" ), 10086 ) ; tcp::socket sock (ioc) ; boost::system::error_code error = boost::asio::error::host_not_found; sock.connect (remote_ep, error); if (error) { cout << "connect failed, code is " << error.value () << "error msg is" << error.message () << std::endl; return 0 ; } std::cout << "Enter message:" ; char request[MAX_LENGTH]; std::cin.getline (request, MAX_LENGTH); size_t request_length = strlen (request); boost::asio::write (sock, boost::asio::buffer (request, request_length)); char reply[MAX_LENGTH]; size_t reply_length = boost::asio::read (sock, boost::asio::buffer (reply, request_length)); cout << "reply is " << string (reply, reply_length) << endl; getchar (); } catch (std::exception& e) { std::cerr << "EXception is" << e.what () << std::endl; } return 0 ; }
7.3 协程改进服务器 我们可以通过协程改进服务器编码流程,用一个iocontext管理绑定acceptor用来接收对端新的连接,再从服务池IOServicePool里取其它的io_contex来管理连接的收发操作,将之前服务器中每个连接的接收数据操作改为启动一个协程,通过顺序的方式读取收到的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void CSession::Start () { auto shared_this = shared_from_this (); boost::asio::co_spawn (_io_context, [=]()->boost::asio::awaitable<void > { try { for (; !_b_close;) { _recv_head_node->Clear (); std::size_t n = co_await boost::asio::async_read (_socket, boost::asio::buffer (_recv_head_node->_data, HEAD_DATA_LEN), boost::asio::use_awaitable); if (n == 0 ) { std::cout << "receive peer closed" << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } short msg_id = 0 ; memcpy (&msg_id, _recv_head_node->_data, HEAD_ID_LEN); msg_id = boost::asio::detail::socket_ops::network_to_host_short (msg_id); std::cout << "msg_id is " << msg_id << std::endl; if (msg_id > MAX_LENGTH) { std::cout << "invalid msg id is " << msg_id << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } short msg_len = 0 ; memcpy (&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN); msg_len = boost::asio::detail::socket_ops::network_to_host_short (msg_len); std::cout << "msg len is " << msg_id << std::endl; if (msg_len > MAX_LENGTH) { std::cout << "invalid msg len is " << msg_len << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } n == co_await boost::asio::async_read (_socket, boost::asio::buffer (_recv_msg_node->_data, _recv_msg_node->_total_len), boost::asio::use_awaitable); if (n == 0 ) { std::cout << "receive peer closed" << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; std::cout << "receive data is " << _recv_msg_node->_data << std::endl; LogicSystem::GetInstance ().PostMsgToQue (std::make_shared <LogicNode>(shared_from_this (), _recv_msg_node)); } } catch (std::exception& e) { std::cout << "exception is " << e.what () << std::endl; Close (); _server->ClearSession (_uuid); } }, boost::asio::detached); }
对于其它部分,和之前的服务器逻辑一样,出于性能的考虑,没有把服务器的接收数据和发送数据都通过协程来完成,而只是将接收数据通过协程来完成,发送数据还是通过异步写的形式来完成的。