1. 简介

该项目的架构是B/S架构,也就是说客户端是浏览器,在本地随便一个浏览器即可;服务器是在linux上搭建的一个应用程序,该程序可以是多线程版的,也可以是单反应堆模型或者是多反应堆模型,反正最终都可以给客户端提供服务。

过程:

客户端(浏览器)向服务器发送一个Http请求,服务器就能把指定目录下的资源给到客户端(浏览器),浏览器就能将得到的数据展示到界面上。如果解析不了该文件,就会自动下载到本地,如果能够展示,它在本地指定的下载目录是不会有对应文件的,因为它下载到本地之后,就直接将内容显示到窗口上了,可以理解为它下载下来的只是一个缓存,该缓存能够支撑它在浏览器上把内容显示出来,当数据显示完成之后,浏览器会定期地清除缓存,那么该资源也就不存在了。

2. 单反应堆的服务器模型

在网络编程中,为了提高访问通信的效率,就可以使用单反应堆的服务器模型,它是IO多路复用与多线程相结合的一种技术。

设计思路:

在主线程里面,服务器绑定本地的ip和端口来得到一个通信的套接字,因为通信的套接字需要监听它的读事件,所以就将它的读事件注册给Reactor,也就是反应堆模型。该反应堆模型底层用到的是poll、epoll或select,这三种IO多路转接技术不是同时用,而是选择其中的某一种。在选择了底层的IO模型之后,把用于监听的文件描述符的读事件注册给该模型,然后内核就可以帮助我们检测这用于监听的文件描述符的读事件是否被激活了。如果被激活了,说明有新连接到达,那么我们就需要调用用于监听的文件描述符对应的处理函数,该处理函数其实就是负责与对端建立新连接,主要的处理核心动作核就是调用accept()函数,因此就能得到一个用于通信的文件描述符,然后对该通信描述符进行一个封装。在封装的时候,给该用于通信的文件描述符指定了读回调和写回调,至于为什么要指定读写回调,是因为反应堆模型就是基于回调的。TcpConnection模型当检测到了对应的文件描述符它的读事件或写事件被触发了之后,该框架就会自动的调用这个事件的处理函数,这种机制就称为反应堆机制。其实本质就是回调函数。

3. 多反应堆的服务器模型

对于多反应堆模型,反应堆的数量取决于线程的数量。

在主线程里面,它是有一个用于监听的套接字Listener,我们需要将它注册给主线程的MainReactor反应堆模型。在主线程的反应堆模型里面可以选择poll、epoll或select,它们就能帮助我们检测对应的监听文件描述符的读事件是否被触发。如果被触发了,就需要建立一个新连接,调用accept()函数得到一个新的用于通信的文件描述符connfd。然后对它进行封装,封装时为它指定一个读回调和写回调。又因为这是一个多反应堆模型,在得到了用于通信的套接字之后,就不要在主线程里面做通信了,把通信交给主线程的线程池,在主线程的线程池里面有多个子线程,每个子线程里面也有一个反应堆模型,该反应堆模型里面主要就是指定的IO多路转接poll、epoll或select。

然后我们只需要把用于通信的文件描述符的事件注册给到对应的子线程的反应堆模型(随机给的),因为这个过程是做了注册的,所以当用于通信的套接字,它的读事件或写事件触发了,对应的回调函数就会知道。当真正的事件被触发之后,子线程里的反应堆模型就会调用响应的处理函数。

4. Http协议

4.1 http请求

属于应用层协议,位于最上层,通过这个协议就可以对通信的数据进行封装。如果网络模型是B/S结构,就必须使用http协议,https协议是在http协议的基础上做了加密。

在数据发送之前,使用协议对数据进行封装,接收到数据之后,按照协议格式解析接收到的数据。

Http协议分为两部分:

1.http请求

  • 客户端给服务器发送数据,叫http请求,有两种请求方式

    • get请求

    • post请求

2.http响应

  • 服务器给客户端回复数据叫做http响应

http协议封装好数据之后是一个数据块,得到若干行数据,使用的换行符是\r\n。通过这个\r\r进行判断,就知道这行是否结束了。

http请求消息分为四部分:

1.请求行

  • 不管get请求还是post请求,请求行是分为三部分

2.请求头

3.空行

4.客户端向服务器提交的数据

如果使用get方式提交数据,第四部分是空的。

get与post分析比较:

  1. 从给服务器发送的数据的量上来分析

    • get:主要是向服务器索取数据,提交的数据量比较少
    • post:上传文件一般都会使用post,可以提交的数据量是非常大的
  2. 从上传的数据安全性来分析

    • get:提交的数据不安全,提交的数据会显示到地址栏中,数据容易被泄露
    • post:提交的数据并不会显示到地址栏中,完全不可见的,因此更安全

4.2 get请求

下面是浏览器对用户的请求数据进行了封装之后,得到的原始的http请求数据:

1
2
3
4
5
6
7
8
9
GET / HTTP/1.1
Host: 192.168.88.93:9393
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
\r\n

第1行是请求行:分为3部分,

  • 第1部分是请求的方式,主要有两种,一种是get,另一种是post。如果客户端给服务器发送的是get请求,说明请求的是服务器上的静态文件,即就是在服务器上,这些文件已经被提供好了的(就是在服务器上已经存在的文件),我们通过浏览器向服务器发起一个访问某个文件的请求,这就是get请求。

  • 第2部分是请求的资源,/代表服务器提供的资源目录(不代表是服务器的根目录),该资源目录可以是服务上的任意一个目录,只要存在即可。

  • 第3部分是http的版本,现在一般用的都是http的1.1版本

第2-8行是请求头:由若干个键值对组成:

  • Host表示要连接的服务器是192.168.88.93,绑定的端口是9393;

  • Connection的keep-alive表示客户端想要与服务器一直保持连接

  • User-Agent表示浏览器的版本,指的是内核版本

  • Accept-Encoding设置压缩的方式使用的是gzip

  • Accept-Language设置默认的语言

注意:如果我们使用的是get请求,并且get请求里面携带了一些动态数据,这些数据会出现在浏览器的地址栏里面,而该地址栏它的缓存是有上限的,因此如果携带的数据量很多,后面的数据就会丢失。

第9行是空行,空行完之后,就是通过这个请求协议给服务器提交的数据。如果是get请求,这部分内容是空的。因为如果通过get请求向服务器提交动态数据,这个动态数据也不会出现在请求协议的第4部分,而是出现在请求行的第2部分,即:

4.3 post请求

下面是post请求数据的格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST / HTTP/1.1
Host: 192.168.88.93:9393
Connection: keep-alive
Content-Length: 98
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
Origin: null
Content-Type: application/x-www-form-urlencoded
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8

username=subwen%40qq.com&phon=111111&email=sub%40qq.com&date=2020-01-01&sex=male&class=1&rule=on

如果我们通过post向服务器提交数据,那么这个数据肯定是动态的,即请求的数据在服务器上是不存在的。如注册账号,向服务器提供用户名和密码等信息。

第1行是请求行:分为3部分,

  • 第1部分是请求的方式,主要有两种,一种是get,另一种是post。如果客户端给服务器发送的是get请求,说明请求的是服务器上的静态文件,即就是在服务器上,这些文件已经被提供好了的(就是在服务器上已经存在的文件),我们通过浏览器向服务器发起一个访问某个文件的请求,这就是get请求。

  • 第2部分是请求的资源,/代表服务器提供的资源目录(不代表是服务器的根目录),该资源目录可以是服务上的任意一个目录,只要存在即可。

  • 第3部分是http的版本,现在一般用的都是http的1.1版本

第2-12行是请求头:由若干个键值对组成:

  • Content-Length表示提交的内容长度
  • Content-Type表示客户端向服务器提交的数据块的格式(还有其它很多种格式)

第13行是空行\r\n

第14行是客户端向服务器提交的数据块

4.4 http响应

服务器给客户端回复数据,称之为http响应,协议的格式分为四部分::
1.状态行

2.消息报头/响应头

3.空行

4.回复给客户端的数据块

  • http响应消息也是一个数据块,若干行组成,换行是\r\n

响应消息(Response)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Http/1.1 200 Ok
Server: micro_httpd
Data: Fri, 18 Jul 2014 14:34:26 GMT
Content-Type: text/plain; charset=iso-8859-1 (必选项)
Content-Length: 32
Location: https://www.lxx93.online
Content-Language: zh-CN
Last-Modified: Fri, 18 Jul 2014 08:36:36 GMT
Connection: close

#include <stdio.h>
int main(void)
{
printf("hello world!\n");
return 0;
}

第1行是状态行,分为3部分:

  1. Http的版本

  2. 服务器对客户端请求的处理状态(状态码),200就代表处理成功了

  3. 是对状态码的描述

第2到9行是响应头,由一系列的键值对组成:

  • Content-Type:表示的是http响应消息,响应的数据块的格式,text/plain代表的是一个纯文本,charset表示使用的字符编码
  • Content-Length:表示服务器给客户端回复的数据块的大小(要准确,不确定的话就不写)

第10行是空行

第11到16行是http响应给客户端的信息

http状态码类别:

状态码有三位数字组成,第一个数字定义了响应的类别,共分5种类别:

  • 1xx:指示信息–表示请求已经被接收,没有处理完,还正在处理
  • 2xx:成功–表示请求已被成功接收、理解、接受
  • 3xx:重定向–要完成请求必须进行更进一步的操作(网络地址的重新访问)
  • 4xx:客户端错误–请求有语法错误或请求无法实现
  • 5xx:服务器端错误–服务器未能实现合法的请求

常见的状态码:

状态码 状态描述 文字描述
200 OK 客户端请求成功
400 Bad Request 客户端请求有语法错误,不能被服务器所理解
401 Unauthorized 请求未经授权,这个状态码必须和WWW-Authenticate报头域一起使用
403 Forbidden 服务器收到请求,但是拒绝提供服务
404 Not Found 请求资源不存在,如:输入了错误的URL
500 Internal Server Error 服务器发送不可预期的错误
503 Server Unavailable 服务器当前不能处理客户端的请求,一段时间后可能恢复正常

整个处理流程:

首先调用threadPoolInit()函数,得到线程池的实例,得到该实例后,调用threadPoolRun()把线程池启动起来,即把线程池里面的子线程启动起来。然后就可以通过takeWorkerEventLoop()从线程池里面取出某一个子线程,得到子线程就能够得到对应的反应堆evLoop实例,将其返回给函数的调用者。调用者就可以通过这个evLoop实例往它的任务队列里面添加任务,当任务添加到evLoop对应的任务队列过后,就可以开始处理这个任务队列了,再根据这个任务队列节点的类型来处理这个dispatcher的检测集合。有3种情况,第1种是往检测集合里面添加新的节点;第2种是往检测集合里面删除节点;第3种情况就是修改检测集合里面某个文件描述符对应的事件。dispatcher这个检测集合处理完毕之后,对应的反应堆模型就开始进行循环了,它需要循环的调用底层的poll、epoll_wait或select来检测这个集合里面有没有激活的文件描述符。如果有激活的文件描述符,那么就通过这个文件描述符找到对应的channel,找到chennel后,然后再基于激活的事件,调用事件对应的回调函数,该回调函数调用完之后,对应的事件也就处理完毕了。

5. 多反应堆+线程池高并发服务器

该项目的结构如下:

在main()函数中,接收外部传来的两个参数,即监听的端口资源目录,通过传入端口和指定线程池中子线程的个数,得到一个TcpServer服务器实例,并对其进行启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
int main(int argc, char* argv[])
{
if (argc < 3) {
printf("./a.out port path\n");
return -1;
}
unsigned short port = atoi(argv[1]); //将端口进行类型转换
chdir(argv[2]); //切换服务器的工作路径,切换为资源文件
//启动服务器
struct TcpServer* server = tcpServerInit(port, 4); //得到一个tcp服务器实例
tcpServerRun(server);
return 0;
}

5.1反应堆模型

文件描述符封装类channel

头文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//定义函数指针
typedef int(*handleFunc)(void* arg);

enum FDEvent {
TimeOut = 0x01, //二进制:1
ReadEvent = 0x02, //二进制:10
WriteEvent = 0x04 //二进制:100
};

struct Channel {
//文件描述符
int fd;
//事件
int events;
//回调函数
handleFunc readCallback;
handleFunc writeCallback;
handleFunc destroyCallback; //回收资源的回调函数(释放tcpConnection)
//回调函数的参数
void* arg;
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//实例化一个channnel
struct Channel* channelInit(int fd, int events, handleFunc readFunc, handleFunc writeFunc, handleFunc destroyFunc, void* arg)
{
struct Channel* channel = (struct Channel*)malloc(sizeof(struct Channel));
channel->arg = arg;
channel->fd = fd;
channel->events = events;
channel->readCallback = readFunc;
channel->writeCallback = writeFunc;
channel->destroyCallback = destroyFunc;
return channel;
}
//对channel封装的描述符是否添加监听写事件功能
void writeEventEnable(struct Channel* channel, bool flag)
{
if (flag) {
channel->events |= WriteEvent; //往events里面追加写事件
}
else {
channel->events = channel->events & ~WriteEvent; //将第3位标志位清0,这样就不做写事件检测了
}
}
//返回channel封装fd是否有监听写事件
bool isWriteEventEnable(struct Channel* channel)
{
return channel->events & WriteEvent;
}

封装channel的容器类ChannelMap

头文件:

1
2
3
4
5
6
//该结构体主要就是想通过list下标,即fd来找到对应的channel
struct ChannelMap {
int size; //记录指针指向的数组的元素总个数
//list是一个指针数组(数组里面全部是指针),数组的下标对应的是文件描述符的值,如果文件描述符为9,那么其数组下标就为9,一一对应的
struct Channel** list; //相当于是一个数组list,里面存的是struct Channel*类型的指针,即struct Channel* list[]
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//实例化channelmap容器
struct ChannelMap* channelMapInit(int size)
{
struct ChannelMap* map = (struct ChannelMap*)malloc(sizeof(struct ChannelMap));
map->size = size;
map->list = (struct Channel**)malloc(size * sizeof(struct Channel*));
return map;
}
//清空channelmap容器
void ChannelMapClear(struct ChannelMap* map)
{
if (map != NULL) { //判断map如果不为空,就清理map数组
for (int i = 0; i < map->size; i++) {
if (map->list[i] != NULL) {
free(map->list[i]); //数组里面的元素指向的内存不为空就释放掉
}
}
free(map->list); //数组对应的内存地址也要释放掉
map->list = NULL;
}
map->size = 0; //所有的释放掉后,map里面的大小就变为了0
}
//参数:fd-channel数组;需要插入的fd;每个元素占的大小
bool makeMapRoom(struct ChannelMap* map, int newSize, int unitSize)
{
if (map->size < newSize) { //如果map中的size比newsize小,才做扩容操作
int curSize = map->size; //取出当前map的size容量
//容量每次扩大原来的1倍
while (curSize < newSize) { //当cursize扩容到比newSize大时,就退出循环
curSize *= 2;
}
//扩容realloc ----->参1是需要扩容的起始地址;参2是要扩容的大小
struct Channel ** temp = realloc(map->list, curSize * unitSize); //返回的是扩容成功之后,对应的起始地址
if(temp == NULL){ //如果为空,说明没有扩容成功,就返回false
return false;
}
map->list = temp; //更新将list指向起始地址
//将参1所指的内存前参3的字节设置为参2(参1指向的是原来空间个数的大小所对应的地址,并将其后面的地址空间初始化,即扩容的部分)
memset(&map->list[map->size], 0, (curSize - map->size) * unitSize); //将扩展的地址进行初始化
map->size = curSize; //更新将size变为扩容的大小
}
return true;
}

IO复用技术类dispatcher

结构体成员都是函数指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct EventLoop;    //不管该结构体现在是否被定义出来,先告诉编译器有这种类型,下面就可以用这种类型去定义变量(Dispatcher和eventloop互包含了)
struct Dispatcher {
// init --- 初始化epoll、poll或者select需要的数据块
void* (*init)(); //返回值为什么是泛型void*,是为了兼容epoll、poll和select对应的不同的数据块
// 添加
int (*add)(struct Channel* channel, struct EventLoop* evLoop);
// 删除
int (*remove)(struct Channel* channel, struct EventLoop* evLoop);
// 修改
int (*modify)(struct Channel* channel, struct EventLoop* evLoop);
// 事件检测
int (*dispatch)(struct EventLoop* evLoop, int timeout);
// 清除数据(关闭fd或者释放内存)
int (*clear)(struct EventLoop* evLoop);
};

epoll实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#define Max 520

struct EpollData {
int epfd; //epoll树根节点
struct epoll_event* events; //epoll_wait在工作时需要的传出参数,是一个结构体数组
};

//加上前缀static表示当前的函数为局部函数,作用域为当前的源文件
static void* epollInit();
static int epollAdd(struct Channel* channel, struct EventLoop* evLoop);
static int epollRemove(struct Channel* channel, struct EventLoop* evLoop);
static int epollModify(struct Channel* channel, struct EventLoop* evLoop);
static int epollDispatch(struct EventLoop* evLoop, int timeout);
static int epollClear(struct EventLoop* evLoop);
static int epollCtl(struct Channel* channel, struct EventLoop* evLoop, int op);

//初始化结构体Dispatcher的变量EpollDispatcher
struct Dispatcher EpollDispatcher = { //该结构体是全局变量,在要使用的文件中,添加extern即可
epollInit,
epollAdd,
epollRemove,
epollModify,
epollDispatch,
epollClear
};

//初始化epoll需要的数据
static void* epollInit() {
struct EpollData* data = (struct EpollData*)malloc(sizeof(struct EpollData)); //开辟一个结构体的空间
data->epfd = epoll_create(10);
if (data->epfd == -1) {
perror("epoll_create");
exit(0);
}
//开辟空间,参1是元素的个数;参2是每个元素占多大的内存
data->events = (struct epoll_event*)calloc(Max, sizeof(struct epoll_event)); //calloc相较于malloc,会将开辟的空间初始化为0
return data;
}

static int epollCtl(struct Channel* channel, struct EventLoop* evLoop, int op) {
//epfd和events都存储在EventLoop里面的Dispatcherdata里面
struct EpollData* data = (struct EpollData*)evLoop->dispatcherData;
struct epoll_event ev;
ev.data.fd = channel->fd; //要添加的文件描述符
int events = 0;
if (channel->events & ReadEvent) { //如果不为0,就保存读事件
events |= EPOLLIN;
}
if (channel->events & WriteEvent) {
events |= EPOLLOUT; //保存写事件
}
ev.events = events;
int ret = epoll_ctl(data->epfd, op, channel->fd, &ev); //参数:树的根节点;添加操作;对应的文件描述符;epoll_event结构体
return ret;
}

static int epollAdd(struct Channel* channel, struct EventLoop* evLoop) {
int ret = epollCtl(channel, evLoop, EPOLL_CTL_ADD);
if (ret == -1) {
perror("epoll_ctl add");
exit(0);
}
return ret;
}
static int epollRemove(struct Channel* channel, struct EventLoop* evLoop) {
int ret = epollCtl(channel, evLoop, EPOLL_CTL_DEL);
if (ret == -1) {
perror("epoll_ctl delete");
exit(0);
}
//通过channel释放对应的TcpConnection资源
channel->destroyCallback(channel->arg); //arg对应的是调用channelInit()的时候的conn,即TcpConnection
return ret;
}
static int epollModify(struct Channel* channel, struct EventLoop* evLoop) {
int ret = epollCtl(channel, evLoop, EPOLL_CTL_MOD);
if (ret == -1) {
perror("epoll_ctl modify");
exit(0);
}
return ret;
}
static int epollDispatch(struct EventLoop* evLoop, int timeout) {
struct EpollData* data = (struct EpollData*)evLoop->dispatcherData;
int count = epoll_wait(data->epfd, data->events, Max, timeout * 1000); //检测满足就绪的事件,timeout是秒,需要转为毫秒
for (int i = 0; i < count; i++) {
int events = data->events[i].events;
int fd = data->events[i].data.fd;
//对端断开连接后,就会产生EPOLLERR事件;对端断开连接后,还在发数据给对端就会产生EPOLLHUP事件
if (events & EPOLLERR || events & EPOLLHUP) { //如果对端断开连接
//对方断开了连接,删除fd
//epollRemove(Channel, evLoop);
continue; //退出本轮循环
}
if (events & EPOLLIN) { //如果读事件触发
//epoll、poll、select只要触发了读写事件,调用的都是该函数
eventActivate(evLoop, fd, ReadEvent);
}
if (events & EPOLLOUT) { //如果写事件触发
eventActivate(evLoop, fd, WriteEvent);
}
}
return 0;
}
static int epollClear(struct EventLoop* evLoop) {
struct EpollData* data = (struct EpollData*)evLoop->dispatcherData;
free(data->events); //释放
close(data->epfd); //关闭树根节点
free(data);
return 0;
}

select实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#define Max 1024

struct SelectData {
fd_set readSet;
fd_set writeSet;
};

//加上前缀static表示当前的函数为局部函数,作用域为当前的源文件
static void* selectInit();
static int selectAdd(struct Channel* channel, struct EventLoop* evLoop);
static int selectRemove(struct Channel* channel, struct EventLoop* evLoop);
static int selectModify(struct Channel* channel, struct EventLoop* evLoop);
static int selectDispatch(struct EventLoop* evLoop, int timeout);
static int selectClear(struct EventLoop* evLoop);
static void setFdSet(struct Channel* channel, struct SelectData* data);
static void clearFdSet(struct Channel* channel, struct SelectData* data);


//初始化结构体
struct Dispatcher SelectDispatcher = { //该结构体是全局变量,在要使用的文件中,添加extern即可
selectInit,
selectAdd,
selectRemove,
selectModify,
selectDispatch,
selectClear
};

//初始化epoll需要的数据
static void* selectInit() {
struct SelectData* data = (struct SelectData*)malloc(sizeof(struct SelectData)); //开辟一个结构体的空间
FD_ZERO(&data->readSet); //读集合清空
FD_ZERO(&data->writeSet); //写集合清空
return data;
}

static void setFdSet(struct Channel* channel, struct SelectData* data){
if (channel->events & ReadEvent) {
FD_SET(channel->fd, &data->readSet); //将fd添加到读事件中
}
if (channel->events & WriteEvent) {
FD_SET(channel->fd, &data->writeSet); //将fd添加到写事件中
}
}
static void clearFdSet(struct Channel* channel, struct SelectData* data) {
if (channel->events & ReadEvent) {
FD_CLR(channel->fd, &data->readSet); //从读集合中移除fd,在读集合中对应的fd标志位被设为了0
}
if (channel->events & WriteEvent) {
FD_CLR(channel->fd, &data->writeSet); //写集合中移除fd
}
}

static int selectAdd(struct Channel* channel, struct EventLoop* evLoop) {
struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
if (channel->fd >= Max) { //如果channel的fd是大于最大值的,就没有必须执行下面了
return -1;
}
setFdSet(channel, data);
return 0;
}
static int selectRemove(struct Channel* channel, struct EventLoop* evLoop) {
struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
clearFdSet(channel, data);
//通过channel释放对应的TcpConnection资源
channel->destroyCallback(channel->arg); //arg对应的是调用channelInit()的时候的conn,即TcpConnection
return 0;
}
static int selectModify(struct Channel* channel, struct EventLoop* evLoop) {
struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
//睡先调用都可以
setFdSet(channel, data);
clearFdSet(channel, data);
return 0;
}
static int selectDispatch(struct EventLoop* evLoop, int timeout) {
struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
struct timeval val;
val.tv_sec = timeout;
val.tv_usec = 0;
fd_set rdtmp = data->readSet;
fd_set wrtmp = data->writeSet;
int count = select(Max, &rdtmp, &wrtmp, NULL, &val); //检测满足就绪的事件,timeout是秒,需要转为毫秒
if (count == -1) {
perror("select");
exit(0);
}
//变量读集合和写集合,看是哪个集合里面的文件描述符被激活了,如果被激活,在集合中对应标志位为1,否则为0
for (int i = 0; i < Max; i++) {
if (FD_ISSET(i, &rdtmp)) { //如果i在rdtmp里面,说明对应的文件描述符i的读数据就绪
eventActivate(evLoop, i, ReadEvent); //调用对应的回调函数
}
if (FD_ISSET(i, &wrtmp)) { //如果i在rdtmp里面,说明对应的文件描述符i的写数据就绪
eventActivate(evLoop, i, WriteEvent);
}
}
return 0;
}
static int selectClear(struct EventLoop* evLoop) {
struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
free(data);
return 0;
}

poll实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#define Max 1024
struct PollData
{
int maxfd;
struct pollfd fds[Max];
};

//加上前缀static表示当前的函数为局部函数,作用域为当前的源文件
static void* pollInit();
static int pollAdd(struct Channel* channel, struct EventLoop* evLoop);
static int pollRemove(struct Channel* channel, struct EventLoop* evLoop);
static int pollModify(struct Channel* channel, struct EventLoop* evLoop);
static int pollDispatch(struct EventLoop* evLoop, int timeout); // 单位: s
static int pollClear(struct EventLoop* evLoop);

//初始化结构体
struct Dispatcher PollDispatcher = { //该结构体是全局变量,在要使用的文件中,添加extern即可
pollInit,
pollAdd,
pollRemove,
pollModify,
pollDispatch,
pollClear
};

//初始化epoll需要的数据
static void* pollInit()
{
struct PollData* data = (struct PollData*)malloc(sizeof(struct PollData)); //开辟一个结构体的空间
data->maxfd = 0; //初始化为0
for (int i = 0; i < Max; ++i)
{
data->fds[i].fd = -1; //指定为-1,为无效文件描述符
data->fds[i].events = 0; //事件都指定为0
data->fds[i].revents = 0; //事件都指定为0
}

return data;
}

static int pollAdd(struct Channel* channel, struct EventLoop* evLoop)
{
struct PollData* data = (struct PollData*)evLoop->dispatcherData;
int events = 0;
if (channel->events & ReadEvent) //如果不为0,就保存读事件
{
events |= POLLIN;
}
if (channel->events & WriteEvent)
{
events |= POLLOUT; //保存写事件
}
//找空位,将events写进去
int i = 0;
for (; i < Max; ++i)
{
if (data->fds[i].fd == -1) //找到空位了
{
data->fds[i].events = events;
data->fds[i].fd = channel->fd;
data->maxfd = i > data->maxfd ? i : data->maxfd; //maxfd记录的是最大文件描述符
break; //找到空位存储后,就可以退出循环了
}
}
if (i >= Max) //如果i>Max了,说明没有找到,就直接退出返回-1
{
return -1;
}
return 0;
}

static int pollRemove(struct Channel* channel, struct EventLoop* evLoop)
{
struct PollData* data = (struct PollData*)evLoop->dispatcherData;
int i = 0;
for (; i < Max; ++i)
{
if (data->fds[i].fd == channel->fd)
{
data->fds[i].events = 0;
data->fds[i].revents = 0;
data->fds[i].fd = -1;
break; //找到要删除的fd后,进行重置(从检测集合中删除),就可以退出循环了
}
}
// 通过 channel 释放对应的 TcpConnection 资源(给channel的回收资源回调函数指向销毁TcpConnection的函数即可)
channel->destroyCallback(channel->arg); //arg对应的是调用channelInit()的时候的conn,即TcpConnection
if (i >= Max) //如果i>Max了,说明没有找到,就直接退出返回-1
{
return -1;
}
return 0;
}

static int pollModify(struct Channel* channel, struct EventLoop* evLoop)
{
//channel里面有要更改为什么样的事件
struct PollData* data = (struct PollData*)evLoop->dispatcherData;
int events = 0;
if (channel->events & ReadEvent) //如果不为0,就保存读事件
{
events |= POLLIN;
}
if (channel->events & WriteEvent)
{
events |= POLLOUT; //保存写事件
}
int i = 0;
for (; i < Max; ++i)
{
if (data->fds[i].fd == channel->fd)
{
data->fds[i].events = events; //得到更改好的events
break; //找到空位存储后,就可以退出循环了
}
}
if (i >= Max) //如果i>Max了,说明没有找到,就直接退出返回-1
{
return -1;
}
return 0;
}

static int pollDispatch(struct EventLoop* evLoop, int timeout)
{
struct PollData* data = (struct PollData*)evLoop->dispatcherData;
int count = poll(data->fds, data->maxfd + 1, timeout * 1000); //检测满足就绪的事件,timeout是秒,需要转为毫秒
if (count == -1)
{
perror("poll");
exit(0);
}
int i;
for (i = 0; i <= data->maxfd; ++i)
{
if (data->fds[i].fd == -1) //如果为-1,说明是没有用的一块空间,无效元素
{
continue; //退出本轮循环
}
//如果是有效,判断它的revents是读事件还是写事件
if (data->fds[i].revents & POLLIN) //如果读事件触发,调用对应的读回调
{
//通过fd找到对应的channel
eventActivate(evLoop, data->fds[i].fd, ReadEvent);
}
if (data->fds[i].revents & POLLOUT) //如果写事件触发,调用对应的写回调
{
eventActivate(evLoop, data->fds[i].fd, WriteEvent);
}
}
return 0;
}

static int pollClear(struct EventLoop* evLoop)
{
struct PollData* data = (struct PollData*)evLoop->dispatcherData;
free(data);
return 0;
}

事件循环类EventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
extern struct Dispatcher EpollDispatcher;           //如果在某个文件中要使用其它文件里定义的全局变量时,需要添加关键字extern
extern struct Dispatcher PollDispatcher;
extern struct Dispatcher SelectDispatcher;

//处理该节点中的channel的方式
enum Elemtype {ADD, DELETE, MODIFY};

//定义任务队列的节点
struct ChannelElement {
int type; //如何处理该节点中的channel
struct Channel* channel;
struct ChanenlElenmet* next;
};

struct Dispatcher; //不管该结构体现在是否被定义出来,先告诉编译器有这种类型,下面就可以用这种类型去定义变量(Dispatcher和eventloop互包含了)
struct EventLoop {
bool isQuit; //EventLoop是否在工作
struct Dispatcher* dispatcher; //使用的io多路转接技术结构体,指向的是三个中其中一个
void* dispatcherData; //dispatcher指向哪种io技术,这就指向对应的数据块
// 任务队列
struct ChannelElement* head;
struct ChannelElement* tail;
// map
struct ChannelMap* channelMap;
//线程id,name,mutex
pthread_t threadID;
char threadName[32];
pthread_mutex_t mutex; //互斥锁,保护任务队列的
int socketPair[2]; //存储本地通信的fd,通过socketpair初始化(用于激活阻塞的io复用)
};

//初始化
struct EventLoop* eventLoopInit(); //如果eventloop是主线程,就使用这个
struct EventLoop* eventLoopInitEx(const char* threadName); //如果创建的eventloop属于子线程,就使用这个
//启动反应堆模型
int eventLoopRun(struct EventLoop* evLoop); //参数为启动的实例
//处理被激活的文件fd(在evloop模型里面通过fd找到了对应的channel,然后就可以通过传进来的event来判断执行读回调或写回调)
int eventActivate(struct EventLoop* evLoop, int fd, int event); //参数:evloop;要激活的fd;对应的事件
//添加任务到任务队列
int eventLoopAddTask(struct EventLoop* evLoop, struct Channel* channel, int type);
//子线程处理任务队列中的任务
int eventLoopProcessTask(struct EventLoop* evLoop);
//处理dispatcher中的节点
int eventLoopAdd(struct EventLoop* evLoop, struct Channel* channel);
int eventLoopRemove(struct EventLoop* evLoop, struct Channel* channel);
int eventLoopModify(struct EventLoop* evLoop, struct Channel* channel);
//释放channel(断开连接,不需要用fd时调用)
int destroyChannel(struct EventLoop* evLoop, struct Channel* channel);

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
struct EventLoop* eventLoopInit()
{
return eventLoopInitEx(NULL); //使用子线程的初始化函数即可
}

//写数据(这是主线程用来唤醒阻塞子线程的调用函数,在主线程执行eventLoopAddTask时会调用)
void taskWakeup(struct EventLoop* evLoop) {
const char* msg = "我是要成为海贼王的男人!!!";
write(evLoop->socketPair[0], msg, strlen(msg));
}
//读数据(主线程用来唤醒的读回调函数)
int readLocalMessage(void* arg) {
struct EventLoop* evloop = (struct EventLoop*)arg;
char buf[256];
read(evloop->socketPair[1], buf, sizeof(buf)); //主要目的不是读数据处理,而是触发一次读回调,解除阻塞
return 0;
}
struct EventLoop* eventLoopInitEx(const char* threadName)
{
struct EventLoop* evLoop = (struct EventLoop*)malloc(sizeof(struct EventLoop)); //先创建出一个反应堆结构体
evLoop->isQuit = false; //刚开始还没有运行,赋值为false
evLoop->threadID = pthread_self(); //哪个线程调用该函数,pthread_self()就会是该线程的id
pthread_mutex_init(&evLoop->mutex, NULL);
strcpy(evLoop->threadName, threadName == NULL ? "MainThread" : threadName); //如果是主线程,就固定名字,是子线程就设置为指定的名字
evLoop->dispatcher = &EpollDispatcher; //使用epoll模型
//evLoop->dispatcher = &PollDispatcher; //使用poll模型
//evLoop->dispatcher = &SelectDispatcher; //使用select模型
evLoop->dispatcherData = evLoop->dispatcher->init(); //通过函数指针init得到epoll模型需要的数据(根节点、检测集合)
//链表
evLoop->head = evLoop->tail = NULL;
// map
evLoop->channelMap = channelMapInit(128);

//解决子线程在epoll、poll或select中阻塞的情况
int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, evLoop->socketPair); //调用成功后,evloop结构体里的socketPair数组里面就有2个可用的文件描述符,可进行本地的网络通信
if (ret == -1) {
perror("socketpair");
exit(0);
}
//指定规则:evLoop->socketPair[0]发送数据;evLoop->socketPair[1]接收数据
// 1.当子线程发送阻塞时,通过socketPair[0]发送数据,socketPair[1]就会接收数据,读事件被激活
// 2.底层epoll、poll或select阻塞时,它们检测对应的读集合里面没有处于激活状态的,然后在该读激活里面添加了socketPair[1],当从socketPair[0]
// 发送数据时,它们就可以检测到socketPair[1]被激活了,然后它们就解除阻塞了
//初始化一个channel,当socketPair[0]发送数据后,socketPair[1]就会被激活,然后执行读回调readLocalMessage
struct Channel* channel = channelInit(evLoop->socketPair[1], ReadEvent,
readLocalMessage, NULL, NULL, evLoop); //参数:通信描述符;事件;读回调;写回调;回调函数的参数
//channel添加到任务队列
eventLoopAddTask(evLoop, channel, ADD); //ADD添加到检测集合里面(但还没有开始进行监听)
return evLoop;
}

int eventLoopRun(struct EventLoop* evLoop)
{
assert(evLoop != NULL);
//取出事件分发和检测模型
struct Dispatcher* dispatcher = evLoop->dispatcher;
//比较线程id是否正常
if (evLoop->threadID != pthread_self()) {
return -1; //异常情况就退出返回-1
}
//循环进行事件处理
while (!evLoop->isQuit) {
////超时时长2s,如果evLoop->dispatcher分别可以指向三张种io多路的其中一个,指向的不同,该行执行的也不同
dispatcher->dispatch(evLoop, 2); //如果发生阻塞,就是阻塞在该行程序,里面执行的是就绪的读和写回调函数
//dispatch函数里面只是监听fd,对有事件的fd调用对应的执行函数(所以每次执行完一系列fd的回调函数后,就查看处理一下任务队列eventLoopProcessTask)
eventLoopProcessTask(evLoop); //子线程处理任务队列中的任务(dispatch没有对fd中的读和写事件进行修改的模块)
}
return 0;
}

int eventActivate(struct EventLoop* evLoop, int fd, int event)
{
if (fd < 0 || evLoop == NULL) { //如果出现异常情况,直接退出
return -1;
}
//取出channel
struct Channel* channel = evLoop->channelMap->list[fd];
assert(channel->fd == fd); //理论上是相等的,因为list指向的数组下标与fd相等,且一一对应
//初始化channel的时候,可能没有指定回调函数,所以当为空的时候,也不执行回调函数
if (event & ReadEvent && channel->readCallback) {
channel->readCallback(channel->arg); //如果是读事件,就调用对应的读回调函数
}
if (event & WriteEvent && channel->writeCallback) {
channel->writeCallback(channel->arg); //如果是写事件,就调用对应的写回调函数
}
return 0;
}

//添加任务到任务队列(主线程和子线程都可能执行该函数)
//2条路径:(1)子线程往任务队列里面添加任务,然后自己处理该任务;(2)主线程给任务队列添加入任务了,因为它是不能处理的,而它又不清楚子线程目前
// 是在工作还在阻塞,就默认是阻塞,因此主线程就调用一个唤醒函数,这样就能保证子线程是在工作了,即子线程都从run()函数的dispatch函数中解除阻塞
// 解除阻塞了,就可以执行任务队列中的任务了。
int eventLoopAddTask(struct EventLoop* evLoop, struct Channel* channel, int type)
{
//添加链表节点的时候,有可能是主线程添加,也有可能是子线程添加,所以得加互斥锁
pthread_mutex_lock(&evLoop->mutex); //加锁,保护共享资源
//创建新节点
struct ChannelElement* node = (struct ChannelElement*)malloc(sizeof(struct ChannelElement));
//对节点的成员做初始化
node->channel = channel;
node->type = type;
node->next = NULL; //先让其指向空
//插入链表
if (evLoop->head == NULL) {
evLoop->head = evLoop->tail = node; //如果链表还为空,就将头指针和尾指针都指向node
}
else { //不为空,就后插法
evLoop->tail->next = node; //添加
evLoop->tail = node; //后移
}
pthread_mutex_unlock(&evLoop->mutex); //解锁,防止死锁
//处理节点(主线程只负责监听有无客户端连接,子线程负责通信任务)
/*
细节:
1.对于链表节点的添加:可能是当前线程也可能是其它线程(主线程)--->前提条件是当前的eventloop反应堆属于子线程
(1)修改fd事件,当前子线程发起,当前子线程处理
(2)添加新的fd,添加任务节点的操作是由主线程发起的
2.不能让主线程处理任务队列,需要由当前的子线程取消处理(因为每个线程都有一个eventloop反应堆,底层都有一个dispatcher,每个dispatcher都
有epoll、poll或select,给它们提供服务的就是任务队列,都是一一对应的关系,对应错了,程序也就错了)
*/
if (evLoop->threadID == pthread_self()) { //如果evloop是主线程发反应堆模型,而目前执行该函数的线程也是主线程,就可以直接执行下面
//当作为子线程,直接处理任务(遍历任务链表执行)
eventLoopProcessTask(evLoop);
}
else {
//当前为主线程 --->告诉子线程处理任务队列的任务
// 情况:主线程监听到客户端连接后,执行它的读回调函数(取子反应堆与cfd,将channel添加到evloop的任务队列里)
//子线程此时在:1.子线程在工作
// 2.子线程被阻塞了:select,poll,epoll(子线程被阻塞(没有激活事件),如果主线程与客户端又来连接,主线程将任务添加到任务队列,子线程因为阻塞不会去执行)
taskWakeup(evLoop); //通过socketPair[0]向socketPair[1]写数据
}
return 0;
}

//处理任务队列中的任务
int eventLoopProcessTask(struct EventLoop* evLoop)
{
pthread_mutex_lock(&evLoop->mutex); //加锁
//取出头节点
struct ChannelElement* head = evLoop->head;
while (head != NULL) {
struct Channel* channel = head->channel;
if (head->type == ADD) {
//添加
eventLoopAdd(evLoop, channel);
}
else if (head->type == DELETE) {
//删除(将channel里面的fd从evloop里面dispatch检测集合删除)
eventLoopRemove(evLoop, channel);
}
else if (head->type == MODIFY) {
//修改
eventLoopModify(evLoop, channel);
}
struct ChannelElement* tmp = head;
head = head->next; //后移
free(tmp); //释放刚刚处理的任务节点
}
evLoop->head = evLoop->tail = NULL; //退出循环后,说明任务队列里面的任务已经处理完了,将头指针和尾指针都指向空
pthread_mutex_unlock(&evLoop->mutex); //解锁
return 0;
}

//将channel添加到map集合中和dispatcher的检测集合
int eventLoopAdd(struct EventLoop* evLoop, struct Channel* channel)
{
int fd = channel->fd; //取出文件描述符(要添加的)
struct ChannelMap* channelMap = evLoop->channelMap;
//文件描述符对应的就是channelMap数组的下标
if (fd >= channelMap->size) { //说明channelMap容量不够,没有位置来存储与channel(每个channel有一个fd)的关系
//没有足够空间存储键值对 fd - channel ==》扩容
if (!makeMapRoom(channelMap, fd, sizeof(struct Channel*))) {
return -1; //如果没有成功就退出
}
}
//找到fd对应的数组元素位置,并存储
if (channelMap->list[fd] == NULL) {
channelMap->list[fd] = channel; //存储fd与channel对应关系
//把channel的文件描述符fd添加到对应的文件描述符检测集合中
evLoop->dispatcher->add(channel, evLoop); //选择的dispatcher模型(epoll、poll、select)不一样,该指针指向的函数add处理动作也就不一样
}

return 0;
}

int eventLoopRemove(struct EventLoop* evLoop, struct Channel* channel)
{
int fd = channel->fd; //取出文件描述符(要删除的)
struct ChannelMap* channelMap = evLoop->channelMap;
//文件描述符对应的就是channelMap数组的下标
if (fd >= channelMap->size) { //说明要删除的文件描述符不在channelMap容器里存储,即要删除的描述符不在检测的集合里面
return -1; //不在就直接退出即可
}
int ret = evLoop->dispatcher->remove(channel, evLoop);
return ret;
}

int eventLoopModify(struct EventLoop* evLoop, struct Channel* channel)
{
int fd = channel->fd; //取出文件描述符
struct ChannelMap* channelMap = evLoop->channelMap;
//文件描述符对应的就是channelMap数组的下标
if (fd>=channelMap->size || channelMap->list[fd] == NULL) { //说明要修改的文件描述符不在channelMap容器里存储(相比视频多了一个判断,更严谨)
return -1; //不在就直接退出即可
}
int ret = evLoop->dispatcher->modify(channel, evLoop);
return ret;
}

int destroyChannel(struct EventLoop* evLoop, struct Channel* channel)
{
//删除 channel 和 fd 的对应关系
evLoop->channelMap->list[channel->fd] = NULL; //将该反应堆里面channelMap对应fd的位置置为null
//关闭fd
close(channel->fd);
//释放
free(channel);
return 0;
}

5.2 多线程

线程池类ThreadPool

1
2
3
4
5
6
7
8
9
//定义线程池
struct ThreadPool {
//主线程的反应堆模型
struct EventLoop* mainLoop; //主要做备份用,只负责与客户端建立连接(只有在线程池没有子线程的情况下,它才负责出来与客户端的连接通信)
bool isStart; //当前线程池是否启动
int threadNum; //线程池中子线程的数量
struct WorkerThread* workerThreads; //指向子线程数组的指针
int index; //编号
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
struct ThreadPool* threadPoolInit(struct EventLoop* mainLoop, int count)
{
struct ThreadPool* pool = (struct ThreadPool*)malloc(sizeof(struct ThreadPool));//给线程池申请一块堆内存
pool->index = 0; //下标先置为0
pool->isStart = false; //线程池默认情况下没有启动的,置为false
pool->mainLoop = mainLoop; //主线程的反应堆模型(当线程池没有子线程时,也就没有子反应堆,可以使用主反应堆模型)
pool->threadNum = count; //线程池中子线程的个数
pool->workerThreads = (struct WorkerThread*)malloc(sizeof(struct WorkerThread) * count); //存储子线程实例的数组
return pool;
}

void threadPoolRun(struct ThreadPool* pool)
{
assert(pool && !pool->isStart); //如果pool还为空(说明还没有创建线程池) 或线程池已经启动了都是有问题的
//线程池里面存的是主线程反应堆,如果里面的线程号与当前的线程不一样,说明调用该函数的不是主线程,就退出
if (pool->mainLoop->threadID != pthread_self()) { //如果现在要启动线程池的不是主线程,就退出
exit(0);
}
//以上都没有出错,就开始启动线程池
pool->isStart = true; //是否启动置为true
if (pool->threadNum) { //如果线程池里面的子线程大于0的。就初始化它们,再启动
for (int i = 0; i < pool->threadNum; i++) {
workerThreadInit(&pool->workerThreads[i], i); //初始化。参数:子线程的地址,第i个
//启动子线程(创建子反应堆-->启动子反应堆-->evloop的dispatcher开始一直检测)
workerThreadRun(&pool->workerThreads[i]); //启动,参数:子线程实例化后的地址
}
}
}

struct EventLoop* takeWorkerEventLoop(struct ThreadPool* pool) //该函数的调用由主线程来完成
{
assert(pool->isStart); //此时如果线程池pool还没有运行,说明是错误的
if (pool->mainLoop->threadID != pthread_self()) { //如果调用该函数的线程的不是主线程,是错误的,退出
exit(0);
}
//从线程池中找到一个子线程,然后取出里边的反应堆实例
struct EventLoop* evLoop = pool->mainLoop; //赋值主线程反应堆实例的地址
if (pool->threadNum > 0) { //判断线程池里面的线程数量是否大于0
evLoop = pool->workerThreads[pool->index].evLoop; //取出子线程的反应堆模型
pool->index = ++pool->index % pool->threadNum; //使得每个子线程的反应堆模型都可以获取到
}
return evLoop; //如果线程池里面的子线程<=0,那么返回的就是主线程的反应堆模型
}

工作线程类WorkerThread

1
2
3
4
5
6
7
8
//定义子线程对应的结构体
struct WorkerThread {
pthread_t threadID; //ID
char name[24]; //线程名
pthread_mutex_t mutex; //互斥锁
pthread_cond_t cond; //条件变量
struct EventLoop* evLoop; //反应堆模型
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int workerThreadInit(struct WorkerThread* thread, int index)    //参1是在外部创建好的一块结构体空间,然后传入的是指向该结构体的指针
{
thread->evLoop = NULL; //默认情况下,子线程的反应堆模型是没有的,所以指为空
thread->threadID = 0; //线程还没有启动起来,id也指为0
sprintf(thread->name, "SubThread-%d", index); //根据编号起名字
pthread_mutex_init(&thread->mutex, NULL); //初始化互斥锁
pthread_cond_init(&thread->cond, NULL); //初始化条件变量
return 0;
}

//子线程的回调函数(创建子反应堆模型)
void* subThreadRunning(void* arg) {
struct WorkerThread* thread = (struct WorkerThread*)arg;
pthread_mutex_lock(&thread->mutex); //加锁
thread->evLoop = eventLoopInitEx(thread->name); //给当前的子线程实例化一个反应堆模型
pthread_mutex_unlock(&thread->mutex); //解锁
pthread_cond_signal(&thread->cond); //唤醒主线程,说明子线程创建反应堆成功
//底层就是启动dispatcher,开始检测
eventLoopRun(thread->evLoop); //将该反应堆运行起来
return NULL;
}
void workerThreadRun(struct WorkerThread* thread)
{
//创建子线程
pthread_create(&thread->threadID, NULL, subThreadRunning, thread);
//阻塞主线程,让当前函数不会直接结束(如果主线程执行该函数结束了,而子线程的回调函数还没有结束,即反应堆还没有创建出来,那当我们使用该
// 子线程的反应堆模型时,就会出现错误)
pthread_mutex_lock(&thread->mutex); //因为子线程的反应堆evLoop是共享资源(主线程和子线程都在访问,这里就是主线程访问),所以需要加锁
while (thread->evLoop == NULL) { //如果没有创建好,就阻塞在互斥锁上
pthread_cond_wait(&thread->cond, &thread->mutex);
}
pthread_mutex_unlock(&thread->mutex); //退出循环,说明子线程执行完了,就解锁退出
}

5.3 IO模型

读写缓存区类Buffer

1
2
3
4
5
6
struct Buffer {
char* data; //指向内存的指针
int capacity; //buffer内存块的总大小
int readPos; //读数据的位置
int writePos; //写数据的位置
};

功能实现函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//要使用memmem()函数不仅要有头文件<strings.h>,还需要定义宏_GNU_SOURCE

struct Buffer* bufferInit(int size)
{
struct Buffer* buffer = (struct Buffer*)malloc(sizeof(struct Buffer)); //申请出一块Buffer堆内存
if (buffer != NULL) { //如果开辟成功
buffer->data = (char*)malloc(size); //让buffer里面的data指针指向char型的数组,长度为size
buffer->capacity = size;
buffer->writePos = buffer->readPos = 0; //初始情况下,读位置和写位置都为0
memset(buffer->data, 0, size); //将data指针指向的大小为size的内存块都置为0
}
return buffer;
}

void bufferDestroy(struct Buffer* buf)
{
if (buf != NULL) { //首先判断buf是否为空
if (buf->data != NULL) { //再判断buf里面的data成员指向的内存是否为空
free(buf->data); //如果都不为空,说明它是指向了一块有效的堆内存,然后就释放该data堆内存
}
}
free(buf); //释放buf堆内存
}

void bufferExtendRoom(struct Buffer* buffer, int size)
{
//1.内存够用 - 不需要扩容
if (bufferWriteableSize(buffer) >= size) {
return;
}
//2.内存需要合并才够用 - 不需要扩容
// 已读的内存 + 剩余的可写的内存 >= size
else if (buffer->readPos + bufferWriteableSize(buffer) >= size) {
//得到未读的内存大小
int readable = bufferReadableSize(buffer);
//移动内存(将未读的数据移动到初始点)
memcpy(buffer->data, buffer->data + buffer->readPos, readable); //将未读数据移动到初始点data,移动的长度为未读数据长度
//更新位置
buffer->readPos = 0; //将读数据位置设为起点
buffer->writePos = readable; //将写数据位置设为未读数据的长度
}
// 3,内存不够用 - 扩容
else {
void* temp = realloc(buffer->data, buffer->capacity + size); //参数:扩容内存块的起始地址;新的内存块需要多大
if (temp == NULL) {
return; //失败了直接返回
}
memset(temp + buffer->capacity, 0, size); //将新扩充的内存初始为0
//更新数据
buffer->data = temp; //更新起点
buffer->capacity += size; //更新容量大小
}
}

int bufferWriteableSize(struct Buffer* buffer)
{
return buffer->capacity - buffer->writePos;
}

int bufferReadableSize(struct Buffer* buffer)
{
return buffer->writePos - buffer->readPos;
}

int bufferAppendData(struct Buffer* buffer, const char* data, int size)
{
if (buffer == NULL || data == NULL || data <= 0) { //这三种情况任何一种发送都是异常的
return -1;
}
//扩容
bufferExtendRoom(buffer, size); //这个函数里面不一定真的会扩容
//数据拷贝
memcpy(buffer->data + buffer->writePos, data, size); //从可写位置开始拷贝数据data,长度为size
buffer->writePos += size; //更新写数据位置
return 0;
}

//当data数据的中间没有\0的时候,可以使用该函数
int bufferAppendString(struct Buffer* buffer, const char* data)
{
int size = strlen(data); //strlen是遇到\0就结束统计
int ret = bufferAppendData(buffer, data, size);
return ret;
}
//将客户端发来的信息存到readbuffer里面
int bufferSocketRead(struct Buffer* buffer, int fd)
{
// read/recv/readv都可以(readv接收数据时可以往多个数组里面放,第一个用完,就用第二个,以此类推)
struct iovec vec[2]; //定义结构体数组,大小为2
//初始化数组元素
int writeable = bufferWriteableSize(buffer); //接收buffer还有多长的能写区域
vec[0].iov_base = buffer->data + buffer->writePos; //成员base指向buffer的写数据位置
vec[0].iov_len = writeable; //成员len赋值为能写区域的大小
char* tmpbuf = (char*)malloc(40960); //重新申请一块空间,大小为4k
vec[1].iov_base = tmpbuf; //第2个结构体的base指向刚才申请的空间
vec[1].iov_len = 40960;
int result = readv(fd, vec, 2); //接收数据,返回值是接收的字节数。参数:文件描述符;vec地址;vec里面元素的个数
if (result == -1) {
return -1; //出现错误,退出
}
else if (result <= writeable) { //说明全部数据都写到了vec[0]这个结构体里面
buffer->writePos += result; //直接更新写数据位置
}
else { //这种情况就是vec[0]的内存不够写,写了一部分数据到刚申请的空间vec[1]结构体里面
buffer->writePos = buffer->capacity; //更新buffer里面的写数据位置(因为这张情况是写满了,所以就等于它的容量)
//将vec[1]里面的数据拷贝到buffer的data里面(会进行扩容)
bufferAppendData(buffer, tmpbuf, result - writeable); //参数:buffer;要写入的数据;写入的长度
}
free(tmpbuf); //释放为vec[1]开辟的内存
return result; //将接受到的字节返回出去
}

char* bufferFindCRLF(struct Buffer* buffer)
{
//strstr --->大字符串中匹配子字符串(遇到\0结束)
//memmem --->大数据块中匹配子数据块(需要指定数据块大小)
char* ptr = memmem(buffer->data + buffer->readPos, bufferReadableSize(buffer), "\r\n", 2); //读数据起始位置;数据块大小;子数据块;子数据块大小
return ptr;
}

int bufferSendData(struct Buffer* buffer, int socket)
{
//判断有无数据
int readable = bufferReadableSize(buffer); //buffer中还有多少待处理(未读)的数据 --->待发送的数据
if (readable > 0) { //如果大于0的,就把数据发送出去
//将未读的数据都发送出去(为了防止管道破裂,需要指定参4为非0)
int count = send(socket, buffer->data + buffer->readPos, readable, MSG_NOSIGNAL); //参4指定为一个信号,表示忽略底层发来的该信号
if (count > 0) { //说明发送成功了
buffer->readPos += count; //移动buffer里面的读数据位置
usleep(1); //睡眠一会,让接收端休息一下
}
return count; //返回发送数据的长度
}
return 0;
}

Tcp通信类TcpConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//注释该宏,就是边读边写模式
#define MSG_SEND_AUTO

//TcpConnection是运行在某个子线程里面的,其在哪个子线程,evloop就属于哪个子线程,即TcpConnection是属于evloop的
struct TcpConnection {
struct EventLoop* evLoop;
struct Channel* channel;
struct Buffer* readBuf;
struct Buffer* writeBuf;
char name[32];
//协议
struct HttpRequest* request;
struct HttpResponse* response;
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//子线程反应堆对应的读回调函数(处理读到的数据,即客户端给服务器发生的数据。将要发送的数据写到writebuf)
// 细节:
// 1.当使用的是一起发模式,读回调函数执行完后,所有要发送的数据都写到了wirtebuf,然后通过写回调函数进行一起发送
// 2.当使用的是边读边发模式,读回调函数执行完后,所有数据也都发送完了(因此在读回调函数末尾,就可以断开连接了)
int processRead(void* arg) {
struct TcpConnection* conn = (struct TcpConnection*)arg;
//接收数据,存到readBuffer里面
int count = bufferSocketRead(conn->readBuf, conn->channel->fd);

Debug("接收到的http请求数据:%s", conn->readBuf->data + conn->readBuf->readPos);
if (count > 0) { //读到数据了
//接收到了http请求,解析http请求
int socket = conn->channel->fd; //初始化时,channel记录的只有检测文件描述符的读事件,而没有写事件(所以需要在检测集合对fd添加写事件)
#ifdef MSG_SEND_AUTO //如果定义了该宏,就执行下面代码
//由于写回调和读回调都是同一个线程来完成,所以执行完下来两行代码后,写事件不会立即生效,要等该读回调执行完后,线程继续执行下一轮的
//dispatcher检测,这时就会检测到fd的写事件触发,就调用写回调函数,这时writebuf里面是肯定有数据的,因为写读调执行parseHttpRequest时,
//就对http协议进行解析、处理、组织响应块,并存储到了writebuf里面。那么写回调函数就可以把writebuf里面的数据发送给客户端了,并断开连接
//这种方法的弊端:只有将所有响应数据放到writebuf过后,才能把数据发送出去,如果要发送的数据很大,writebuf或许装不下(不能将所有发送的数据
// 放到writebuf,就不能进行数据的发送)
//解决方法:写一部分,发一部分
writeEventEnable(conn->channel, true); //参数2为true,会往channel的检测添加写事件,这时channel记录的就是检测文件描述符的读写事件了
eventLoopAddTask(conn->evLoop, conn->channel, MODIFY); //在eventLoop里面修改节点,从原来的检测读事件变为了检测读写事件(只要能检测写事件,就会触发其写回调函数)
#endif
//接收到的数据在readbuf,通过readbuf处理响应体,将响应数据写到writebuf
bool flag = parseHttpRequest(conn->request, conn->readBuf, conn->response, conn->writeBuf, socket);
if (!flag) { //如果为false就是解析失败,为true就是成功了,说明要发送的数据已经写到了writeBuf
//解析失败,回复一个简单的html
char* errMsg = "Http/1.1 400 Bad Request\r\n\r\n";
bufferAppendString(conn->writeBuf, errMsg); //将errMsg写入到writeBuf
}
}
else {
#ifdef MSG_SEND_AUTO
//断开连接,把用于通信的文件描述符conn对应的channel里面的fd从反应堆模型的dispatcher检测集合中删除
eventLoopAddTask(conn->evLoop, conn->channel, DELETE); //没有读到数据,断开连接
#endif
}
#ifndef MSG_SEND_AUTO //如果没有定义了该宏,就执行下面代码(边读边发送模式)-->读回调执行完,数据也就发完了,直接断开连接
//断开连接,把用于通信的文件描述符conn对应的channel里面的fd从反应堆模型的dispatcher检测集合中删除
eventLoopAddTask(conn->evLoop, conn->channel, DELETE); //这里直接进行了统一断开连接,因为上面多种情况都会断开连接
#endif
return 0;
}

//处理写(写回调函数)--->当没有注释宏MSG_SEND_AUTO时,下面的写回调函数是永远不会执行的(因为没有在检测集合里面对fd进行修改事件)
int processWrite(void* arg) {
Debug("开始发送数据了(基于写回调发送).....");
struct TcpConnection* conn = (struct TcpConnection*)arg;
//发送数据
int count = bufferSendData(conn->writeBuf, conn->channel->fd); //返回发送数据长度
if (count > 0) {
//判断数据是否被全部发送出去了
if (bufferReadableSize(conn->writeBuf) == 0) { //满足条件(可读的长度为0),说明数据发送完了
//数据全部发送出去,就不检测写事件了(修改检测集合)-->但由于B/S架构是 建立连接-请求-响应-断开 这一过程,所以发送完数据说明响应结束
//1. 不再检测事件 -- 修改channel中保存的事件(可忽略)
writeEventEnable(conn->channel, false); //参1是要修改的channel;参2未false,表示不检测写事件了
//2. 修改dispatcher检测的集合 -- 添加任务节点(可忽略)
eventLoopAddTask(conn->evLoop, conn->channel, MODIFY); //上面对channel从检测读写事件,改为了检测读事件,这里才是真正修改节点数据
//3. 删除这个节点(上面的1和2步都可以不要,只留下这一句即可)
eventLoopAddTask(conn->evLoop, conn->channel, DELETE); //删除这个channel,客户端与服务端就断开连接了
//数据发送完,断开连接:就将当前的文件描述符从eventLoop检测集合中删除就行,这样客户端与服务器的连接就断开了
}
}
}

struct TcpConnection* tcpConnectionInit(int fd, struct EventLoop* evloop)
{
struct TcpConnection* conn = (struct TcpConnection*)malloc(sizeof(struct TcpConnection)); //给TcpConnection申请一块堆内存
conn->evLoop = evloop; //赋值为传进来的反应堆模型(子线程的反应堆)
conn->readBuf = bufferInit(10240); //初始化一块内存
conn->writeBuf = bufferInit(10240); //初始化一块内存
//得到实例
conn->request = httpRequestInit();
conn->response = httpResponseInit();
sprintf(conn->name, "Connection-%d", fd); //初始化名字
//对通信描述符fd进行封装(参数:通信描述符;检测读事件;读回调函数;写回调函数;回调函数参数)
conn->channel = channelInit(fd, ReadEvent, processRead, processWrite, tcpConnectionDestroy, conn); //初始只定义为检测读事件,当需要发的时候再改
eventLoopAddTask(evloop, conn->channel, ADD); //将channel添加到事件循环的任务队列里面(子反应堆模型evloop的任务队列)
Debug("和客户端建立连接,threadName: %s, threadID: %s, connName: %s", evloop->threadName, evloop->threadID, conn->name);
return conn;
}

//释放内存tcpConnection(在服务器和客户端断开连接过后调用,即在epoll、poll、select的remove函数中调用)--->channel的销毁回调函数
int tcpConnectionDestroy(void* arg)
{
struct TcpConnection* conn = (struct TcpConnection*)arg;
if (conn != NULL) { //conn不为空,就可以执行下面的销毁操作
//如果readBuf和wirteBuf都指向有效内存,且在这两块内存里面没有可处理的数据了
if (conn->readBuf && bufferReadableSize(conn->readBuf) == 0 && conn->writeBuf && bufferReadableSize(conn->writeBuf) == 0) {
//就可以把TcpConnection里面保存的所有的资源释放掉
destroyChannel(conn->evLoop, conn->channel); //释放channel(把属于哪个反应堆模型evLoop的channel删除)
bufferDestroy(conn->readBuf); //释放readBuf
bufferDestroy(conn->writeBuf); //释放writeBuf
httpRequestDestroy(conn->request); //释放request
httpResponseDestroy(conn->response); //释放response
free(conn);
}
}
Debug("连接断开,释放资源, gameover, connName: %s", conn->name);
return 0;
}

5.4 服务器

TcpServer类

1
2
3
4
5
6
7
8
9
10
11
struct Listener {
int lfd; //监听的描述符
unsigned short port; //端口
};

struct TcpServer {
int threadNum; //线程池里面子线程的个数
struct EventLoop* mainLoop; //反应堆模型(主线程)
struct ThreadPool* threadPool; //线程池
struct Listener* listener;
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//对结构体TcpServer进行初始化
struct TcpServer* tcpServerInit(unsigned short port, int threadNum)
{
struct TcpServer* tcp = (struct TcpServer*)malloc(sizeof(struct TcpServer)); //给tcpserver申请一块内存
tcp->listener = listenerInit(port); //传入端口,得到通信fd,进行监听
tcp->mainLoop = eventLoopInit(); //初始化主线程的反应堆模型
tcp->threadNum = threadNum; //子线程的个数
//实例化线程池
tcp->threadPool = threadPoolInit(tcp->mainLoop, threadNum); //参数:主线程的反应堆实例;线程池里面子线程的个数
return tcp;
}
//通过端口,创建监听fd
struct Listener* listenerInit(unsigned short port)
{
struct Listener* listener = (struct Listener*)malloc(sizeof(struct Listener)); //创建一块Listener结构体
//1.创建监听的fd(基于tcp的套接字) --->参三写0即可,因为这里是用流式协议,写0就代表使用的式流式协议的tcp;如果使用的是报文协议,写0就代表使用的是报文协议的udp
int lfd = socket(AF_INET, SOCK_STREAM, 0); //参数:指定使用的ip协议是ipv4还是ipv6;指定使用的套接字是基于流式协议还是报文协议
if (lfd == -1) {
perror("socket");
return NULL;
}
//2.设置端口复用
int opt = 1;
int ret = setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);
if (ret == -1) {
perror("setsockopt");
return NULL;
}
//3.绑定
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port); //将转进来的端口转为网络字节序再进行存储
//该0地址就表示这里绑定本地任意一个ip地址,假如本地计算机有多个网卡,0地址就可以监听所有网卡上面指定的port端口(INADDR_ANY没有大小端之分,所以直接使用即可)
addr.sin_addr.s_addr = INADDR_ANY; //如果有客户端将连接请求发给了网卡a或网卡b对应的port端口,在本地都可以接收到该请求
ret = bind(lfd, (struct sockaddr*)&addr, sizeof addr);
if (ret == -1) {
perror("bind");
return NULL;
}
//4.设置监听
ret = listen(lfd, 128); //一次性最多接收128个请求
if (ret == -1) {
perror("listen");
return NULL;
}
//返回fd
listener->lfd = lfd;
listener->port = port;
return listener;
}

//主线程反应堆模型中监听描述符的读回调函数(与客户端连接)
int acceptConnection(void* arg) {
struct TcpServer* server = (struct TcpServer*)arg;
//和客户端建立连接
int cfd = accept(server->listener->lfd, NULL, NULL); //参数:监听描述符;存储对端信息结构体;参2的大小
//从线程池中取出一个子线程的反应堆实例。去处理这个cfd通信
struct EventLoop* evLoop = takeWorkerEventLoop(server->threadPool); //从线程池中取出一个子子线程,得到该子线程的反应堆模型
//将cfd放到TcpConnection中处理
tcpConnectionInit(cfd, evLoop); //对fd进行封装,封装好后放到子线程的反应堆模型里面
return 0;
}
void tcpServerRun(struct TcpServer* server)
{
Debug("服务器程序已经启动了..........");
//启动线程池
threadPoolRun(server->threadPool);
//给主线程的反应堆模型添加检测的任务(监听连接)
//初始化一个channel实例。参数:监听的文件描述符;检测什么事件;事件触发处理的动作(读回调);写回调;前面回调函数的参数
struct Channel* channel = channelInit(server->listener->lfd,
ReadEvent, acceptConnection, NULL, NULL, server); //因为是监听描述符,所以不需要写回调,同时运行期间不应该释放lfd,所以销毁回调也会为空
//下面先将任务添加到任务队列进行处理,最后添加到对应检测集合中,再启动反应堆模型
eventLoopAddTask(server->mainLoop, channel, ADD); //参数:主线程的反应堆模型;channel(里面封装了文件描述符);
//启动主线程的反应堆模型
eventLoopRun(server->mainLoop);
}

5.5 Http

接收请求类HttpRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//请求头键值对
struct RequestHeader {
char* key;
char* value;
};

//当前的解析状态(用来记录)
enum HttpRequestState {
ParseReqLine, //请求行
ParseReqHeaders, //请求头
ParseReqBody, //请求的数据块(get请求没有该状态)
ParseReqDone //表示当前的请求协议已经解析完了
};

//定义http请求结构体
struct HttpRequest {
//请求行
char* method; //请求方式
char* url; //请求资源
char* version; //使用的版本
//请求头(有若干个键值对组成)
struct RequestHeader* reqHeaders; //比如说01234下的键值对已经添加,reqHeadersNum就为5,下次继续添加键值对时,直接从5下标开始
int reqHeadersNum; //键值对的个数
enum HttpRequestState curState; //记录当前的解析状态
};

//初始化
struct HttpRequest* httpRequestInit();
//重置
void httpRequestReset(struct HttpRequest* req);
void httpRequestResetEx(struct HttpRequest* req);
void httpRequestDestroy(struct HttpRequest* req); //内存释放函数
//获取处理状态
enum HttpRequestState httpRequestState(struct HttpRequest* request);
//添加请求头
void httpRequestAddHeader(struct HttpRequest* request, const char* key, const char* value);
//根据key得到请求头的value
char* httpRequestGetHeader(struct HttpRequest* request, const char* key);
//解析请求行
bool parseHttpRequestLine(struct HttpRequest* request, struct Buffer* readBuf);

//解析请求头
bool parseHttpRequestHeader(struct HttpRequest* request, struct Buffer* readBuf);
//解析http请求协议(readbuf里面存了有待解析的数据)
bool parseHttpRequest(struct HttpRequest* request, struct Buffer* readBuf,
struct HttpResponse* response, struct Buffer* sendBuf, int socket);

//处理http请求协议(处理前需要先解析http请求)
bool processHttpRequest(struct HttpRequest* request, struct HttpResponse* response);
//解码字符串
void decodeMsg(char* to, char* from);
//响应数据类型
const char* getFileType(const char* name);
//发送目录
void sendDir(const char* dirName, struct Buffer* sendBuf, int cfd);
//发送文件
void sendFile(const char* filename, struct Buffer* sendBuf, int cfd);

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
#define HeaderSize 12

struct HttpRequest* httpRequestInit()
{
struct HttpRequest* request = (struct HttpRequest*)malloc(sizeof(struct HttpRequest)); //为HttpRequest结构体申请一块空间
httpRequestReset(request);
request->reqHeaders = (struct RequestHeader*)malloc(sizeof(struct RequestHeader) * HeaderSize); //给请求头开辟一块空间
return request;
}

//该函数主要是用来数据重置的(每发完一次,回到初始设置)
void httpRequestReset(struct HttpRequest* req)
{
req->curState = ParseReqLine; //刚开始默认状态是处理请求行
req->method = NULL;
req->url = NULL;
req->version = NULL;
req->reqHeadersNum = 0; //放数据时从该下标开始放
}

void httpRequestResetEx(struct HttpRequest* req)
{
free(req->url);
free(req->method);
free(req->version);
if (req->reqHeaders != NULL) { //先判断指向结构体数值的指针是否为空
for (int i = 0; i < req->reqHeadersNum; i++) { //遍历数组释放元素
free(req->reqHeaders[i].key);
free(req->reqHeaders[i].value);
}
free(req->reqHeaders); //释放完元素,就释放组数
}
httpRequestReset(req);
}

//内存释放
void httpRequestDestroy(struct HttpRequest* req)
{
if (req != NULL) { //先判断req是否为空
httpRequestResetEx(req);
free(req); //释放结构体地址
}
}

enum HttpRequestState httpRequestState(struct HttpRequest* request)
{
return request->curState;
}

void httpRequestAddHeader(struct HttpRequest* request, const char* key, const char* value)
{
//外面是创建好的一块key和value的结构体,这里面直接赋值即可,不需要申请内存
request->reqHeaders[request->reqHeadersNum].key = (char*)key;
request->reqHeaders[request->reqHeadersNum].value = (char*)value;
request->reqHeadersNum++;
}

char* httpRequestGetHeader(struct HttpRequest* request, const char* key)
{
if (request != NULL) { //如果request不指向空
for (int i = 0; i < request->reqHeadersNum; i++) { //遍历键值对
//strncasecmp是指比较n个不区分大小写的字符串
if (strncasecmp(request->reqHeaders[i].key, key, strlen(key)) == 0) {
return request->reqHeaders[i].value;
}
}
}
return NULL; //为空或没有找到对应的键值对,就直接返回空
}

//拆分请求行(解决下面函数parseHttpRequestLine的冗余问题) 参数:起始位置;结束位置;子数据块
char* splitRequestLine(const char* start, const char* end, const char* sub, char** ptr) {
char* space = end;
if (sub != NULL) { //第一个和第二个需要用到,获取版本时传入sub==NULL即可
space = memmem(start, end - start, sub, strlen(sub));
assert(space != NULL); //等于空就报错(NULL不等于空格)
}
int length = space - start; //没有进if(),space就是end
char* tmp = (char*)malloc(length + 1); //开辟一块空间
strncpy(tmp, start, length);
tmp[length] = '\0';
*ptr = tmp;
return space + 1;
}

//解析请求行(将发来的http请求信息(在readbuf里面)解析后存到requeset结构体里面)
bool parseHttpRequestLine(struct HttpRequest* request, struct Buffer* readBuf)
{
//保存字符串结束地址\r\n的位置(该函数并没有对buffer进行读取操作,即readpos的位置没有变)
char* end = bufferFindCRLF(readBuf);
//保存字符串起始地址
char* start = readBuf->data + readBuf->readPos;
//请求行总长度
int lineSize = end - start;
if (lineSize) { //如果请求行总长度大于0
start = splitRequestLine(start, end, " ", &request->method); //调用辅助函数,完成method的截取
start = splitRequestLine(start, end, " ", &request->url); //静态资源的截取
splitRequestLine(start, end, NULL, &request->version); //版本号的截取

#if 0
// get /xxx/xxx.txt http/1.1\r\n --->end指向的是\r
//请求方式
char* space = memmem(start, lineSize, " ", 1); //大数据块起始位置;大数据块长度;子数据块;子数据块长度
assert(space != NULL); //等于空就报错(NULL不等于空格)
int methodSize = space - start; //获取method长度
request->method = (char*)malloc(methodSize + 1); //多加一个字节是为了在尾部加\0
strncpy(request->method, start, methodSize); //从起始位置读methodSize长度(get)放到request->method里面
request->method[methodSize] = "\0"; //在get后面加\0

//请求的静态资源
start = space + 1; //重新设置起始位置,原来是get后面的空格+1即可
char* space = memmem(start, end-start, " ", 1); //大数据块起始位置;大数据块长度;子数据块;子数据块长度
assert(space != NULL); //等于空就报错(NULL不等于空格)
int urlSize = space - start; //获取静态资源长度
request->url = (char*)malloc(urlSize + 1); //多加一个字节是为了在尾部加\0
strncpy(request->url, start, urlSize); //从起始位置读urlSize长度(静态资源)放到request->url里面
request->url[urlSize] = "\0"; //在静态资源后面加\0

//http版本
start = space + 1; //重新设置起始位置,原来是静态资源后面的空格+1即可
request->version = (char*)malloc(end - start + 1); //多加一个字节是为了在尾部加\0
strncpy(request->version, start, end - start); //从起始位置读相应长度(静态资源)放到request->version里面
request->version[end - start] = "\0";
#endif

//为解析请求头做准备
readBuf->readPos += lineSize; //令读数据位置为请求行的\r
readBuf->readPos += 2; //再加2个位置(\r\n),读数据位置就为请求头行了
//修改状态
request->curState = ParseReqHeaders; //当前状态为请求头状态
return true;
}
return false;
}

//该函数处理请求头中的一行(如果要处理请求头中的多行,需要循环调用)
bool parseHttpRequestHeader(struct HttpRequest* request, struct Buffer* readBuf) //传进来的readBuf已经处理完请求头,读数据位置在请求头行
{
char* end = bufferFindCRLF(readBuf); //正常情况下end指向\r
if (end != NULL) {
char* start = readBuf->data + readBuf->readPos; //请求头的起始位置
int lineSize = end - start; //获取长度
//基于: 搜索字符串
char* middle = memmem(start, lineSize, ": ", 2); //在请求头的某一行搜索子数据块": ",标准的http请求key和value之间是": "
if (middle != NULL) { //如果不等于NULL,说明middle指向了:
char* key = malloc(middle - start + 1); //为key开辟一块空间,加1是为了最后存\0
strncpy(key, start, middle - start);
key[middle - start] = '\0';
//middle+2的位置就是value值的初始位置
char* value = malloc(end - middle - 2 + 1); //为value开辟一块空间
strncpy(value, middle+2, end - middle - 2);
key[end - middle - 2] = '\0';
//httpRequestAddHeader函数里面是没有对key和value申请内存的,所以在本函数为它们开辟有效内存
httpRequestAddHeader(request, key, value); //将得到key和value存放到结构体数组中
//移动读数据的位置
readBuf->readPos += lineSize;
readBuf->readPos += 2; //readBuf里面的读数据位置移动到下一行
}
else { //否则说明请求头已经的内容已经获取完了(请求头的下面是空行\r\n,里面是没有": "的)
//请求头被解析完了,跳过空行
readBuf->readPos += 2;
//修改解析状态
//忽略post请求(空行下面是有数据的),按照get请求处理(空行下面是没有数据的)
request->curState = ParseReqDone;
}
return true;
}
return false;
}

//解析http协议(先解析请求行,再解析请求头),调用辅助函数
bool parseHttpRequest(struct HttpRequest* request, struct Buffer* readBuf, struct HttpResponse* response, struct Buffer* sendBuf, int socket)
{
bool flag = true;
while (request->curState != ParseReqDone) { //当前状态不是结束状态就循环
switch (request->curState) {
case ParseReqLine:
flag = parseHttpRequestLine(request, readBuf); //解析请求行存到request,将状态改为解析请求头
break;
case ParseReqHeaders:
flag = parseHttpRequestHeader(request, readBuf); //解析请求头存到request,将状态改为解析完毕ParseReqDone
break;
case ParseReqBody: //只有post请求才会用到这种状态
break;
default:
break;
}
if (!flag) { //当解析请求行或请求头出现问题时,会使flag为false,这种情况就直接退出
return flag;
}
//判断是否解析完毕了,如果完毕了,需要准备回复的数据
if (request->curState == ParseReqDone) {
// 1. 根据解析出的原始数据,对客户端的请求做出处理(主要完成http协议响应的格式)
processHttpRequest(request,response); //该函数调用后,response指针指向的结构体地址被写入了要发送的数据
// 2. 组织相应数据写到sendBuf,并发送给客户端
httpResponsePrepareMsg(response, sendBuf, socket); //根据response里面写入的结构体数据,组织了一个发送给客户端的响应数据块存到sendBuf
}
}
request->curState = ParseReqLine; //还原状态,保证还能继续处理第二条以及以后的请求(从客户端读来的数据都存到buffer里面的)
return flag;
}

//处理http请求(http协议的响应格式发送)--->没有发送,只是进行了存储,存储在了response结构体中
bool processHttpRequest(struct HttpRequest* request, struct HttpResponse* response)
{
if (strcasecmp(request->method, "get") != 0) { //若干客户端的请求不是get请求,就不处理,直接返回-1
return -1;
}
decodeMsg(request->url, request->url); //将参2url解码后又存到参1url(中文的utf-8形式转纯中文)
//处理客户端请求的静态资源
char* file = NULL;
if (strcmp(request->url, "/") == 0) { //若干客户端的请求路径是/
file = "./"; //将目录改为资源文件的根目录(因为有操作会将当前目录切换为资源目录,所以才是./)
}
else {
file = request->url + 1; //若不是,就移动位置,取客户端想要的文件
}
//获取文件的属性
struct stat st;
int ret = stat(file, &st);
if (ret == -1) { //出错
//文件不存在--回复404
strcpy(response->fileName, "404.html"); //文件名
response->statusCode = NotFound; //状态码
strcpy(response->statusMsg, "Not Found"); //状态描述
//响应头(有许多键值对的,这里只发一个键值对)
httpResponseAddHeader(response, "Content-type", getFileType(".html"));
response->sendDataFunc = sendFile; //发送数据的函数指针指向发送文件数据函数
return 0; //出错了退出
}
strcpy(response->fileName, file);
response->statusCode = OK;
strcpy(response->statusMsg, "OK");
//判断文件类型(看是目录还是文件)
if (S_ISDIR(st.st_mode)) { //如果是目录
//响应头(有许多键值对的,这里只法一个键值对)
httpResponseAddHeader(response, "Content-type", getFileType(".html")); //因为是目录,它也是通过html文件显示的
response->sendDataFunc = sendDir; //发送数据的函数指针指向发送目录数据函数
}
else {
//响应头
char tmp[12] = { 0 };
sprintf(tmp,"%ld", st.st_size);
httpResponseAddHeader(response, "Content-type", getFileType(file)); //文件类型
httpResponseAddHeader(response, "Content-length", tmp); //文件内容长度
response->sendDataFunc = sendFile; //发送数据的函数指针指向发送数据函数
}
return false;
}

// 将字符转换为整形数
int hexToDec(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;

return 0;
}

// 解码
// to 存储解码之后的数据, 传出参数, from被解码的数据, 传入参数
void decodeMsg(char* to, char* from)
{
for (; *from != '\0'; ++to, ++from)
{
// isxdigit -> 判断字符是不是16进制格式, 取值在 0-f
// Linux%E5%86%85%E6%A0%B8.jpg
if (from[0] == '%' && isxdigit(from[1]) && isxdigit(from[2]))
{
// 将16进制的数 -> 十进制 将这个数值赋值给了字符 int -> char
// B2 == 178
// 将3个字符, 变成了一个字符, 这个字符就是原始数据
*to = hexToDec(from[1]) * 16 + hexToDec(from[2]);

// 跳过 from[1] 和 from[2] 因此在当前循环中已经处理过了
from += 2;
}
else
{
// 字符拷贝, 赋值
*to = *from;
}

}
*to = '\0';
}

//响应数据类型
const char* getFileType(const char* name)
{
// a.jpg a.mp4 a.html
// 自右向左查找‘.’字符, 如不存在返回NULL
const char* dot = strrchr(name, '.'); //从右往左找,找到点,就停止,读后面内容也就是后缀了
if (dot == NULL)
return "text/plain; charset=utf-8"; // 纯文本
if (strcmp(dot, ".html") == 0 || strcmp(dot, ".htm") == 0)
return "text/html; charset=utf-8";
if (strcmp(dot, ".jpg") == 0 || strcmp(dot, ".jpeg") == 0)
return "image/jpeg";
if (strcmp(dot, ".gif") == 0)
return "image/gif";
if (strcmp(dot, ".png") == 0)
return "image/png";
if (strcmp(dot, ".css") == 0)
return "text/css";
if (strcmp(dot, ".au") == 0)
return "audio/basic";
if (strcmp(dot, ".wav") == 0)
return "audio/wav";
if (strcmp(dot, ".avi") == 0)
return "video/x-msvideo";
if (strcmp(dot, ".mov") == 0 || strcmp(dot, ".qt") == 0)
return "video/quicktime";
if (strcmp(dot, ".mpeg") == 0 || strcmp(dot, ".mpe") == 0)
return "video/mpeg";
if (strcmp(dot, ".vrml") == 0 || strcmp(dot, ".wrl") == 0)
return "model/vrml";
if (strcmp(dot, ".midi") == 0 || strcmp(dot, ".mid") == 0)
return "audio/midi";
if (strcmp(dot, ".mp3") == 0)
return "audio/mpeg";
if (strcmp(dot, ".ogg") == 0)
return "application/ogg";
if (strcmp(dot, ".pac") == 0)
return "application/x-ns-proxy-autoconfig";

return "text/plain; charset=utf-8"; //如果没有找到对应的文件格式,就返回纯文本,符号编码是utf-8
}

//将拼好的html数据块添加到readbuf里面
void sendDir(const char* dirName, struct Buffer* sendBuf, int cfd)
{
char buf[4096] = { 0 };
sprintf(buf, "<html><head><title>%s</title></head><body><table>", dirName);
struct dirent** namelist;
//遍历目录,返回值是该目录下文件的数量
//scandir 函数会动态分配内存,将每个目录项的指针存储到一个指针数组namelist中。这个数组中的每个元素都是指向struct dirent结构体的指针,这些结构体包含了文件名等目录项信息。
int num = scandir(dirName, &namelist, NULL, alphasort); //参1:要遍历的目录的名字;参2:传出参数;参3:回调函数(遍历的规则);参4:排序的方式
int i;
for (i = 0; i < num; i++) {
//取出文件名。namelist指向的是一个指针数组 struct dirent* tmp[] ----->数组tmp里面的元素都是struct dirent*类型的指针
char* name = namelist[i]->d_name;
struct stat st;
char subPath[1024] = { 0 };
sprintf(subPath, "%s/%s", dirName, name); //拼接成绝对路径
stat(subPath, &st); //判断该绝对路径是目录还是文件
if (S_ISDIR(st.st_mode)) {
//注意:如果是%s/有斜线,就代表要跳转到某个目录里面去,如果没有斜线,就代表是访问某个文件
sprintf(buf + strlen(buf), "<tr><td><a href=\"%s/\">%s</a></td><td>%ld</td></tr>", name, name, st.st_size);
}
else {
sprintf(buf + strlen(buf), "<tr><td><a href=\"%s\">%s</a></td><td>%ld</td></tr>", name, name, st.st_size);
}
bufferAppendString(sendBuf, buf); //将buf里面的数据写到sendBuf
#ifndef MSG_SEND_AUTO //如果定义了该宏(说明使用一起发模式,即将所有数据写到sendbuf里面,最后在写回调函数一起发),下面代码就无效
bufferSendData(sendBuf, cfd); //sendbuf又有数据了,发送数据(边读边发的模式)
#endif
memset(buf, 0, sizeof(buf)); //然后将容器buf清0
free(namelist[i]); //用完了释放空间
}
sprintf(buf, "</table></body></html>"); //拼接结束标签
bufferAppendString(sendBuf, buf); //将buf里面的数据写到sendBuf
#ifndef MSG_SEND_AUTO //如果定义了该宏,下面代码就无效
bufferSendData(sendBuf, cfd); //发送数据
#endif
free(namelist); //释放二级指针
}

//将要发送的数据写到readbuf
void sendFile(const char* filename, struct Buffer* sendBuf, int cfd) //发送文件函数
{
//1.打开文件
int fd = open(filename, O_RDONLY); //以只读的方式打开文件filename
if (fd == -1) {
perror("open");
}
//assert(fd > 0); //设置断言(如果fd<=0,就会报错)
//发送文件的两种方式(没有改,这里只能用第一种方法)
#if 1
while (1) {
char buf[1024];
int len = read(fd, buf, sizeof buf); //从文件中读出数据存到buf
if (len > 0) { //每次循环后,如果数据还没有读完,len就会大于0
bufferAppendData(sendBuf, buf, len); //将数据buf长度为len的数据写入到sendBuf(buf结尾不是\0,所以得用该函数)
#ifndef MSG_SEND_AUTO //如果定义了该宏(说明使用一起发模式,即将所有数据写到sendbuf里面,最后在写回调函数一起发),下面代码就无效
bufferSendData(sendBuf, cfd); //sendBuf又有数据了,发送数据(边读边发的模式)
#endif
//usleep(10); //重要,但已经在bufferSendData()函数里面睡眠了(无论是 一起发模式 还是 边写边发模式 都会调用该函数)
}
else if (len == 0) { //文件读完了,直接退出循环
break;
}
else { //其它情况就是出现问题了,打印错误信息
close(fd); //关闭fd
perror("read");
}
}
#else
//该方法只适用于发送文件的场景
//这种方式是把文件打开,把文件描述符传给sendfile函数,函数内部就会将数据发给客户端(效率更高)--->但如果发送的是目录,该方法还是不行
off_t offset = 0;
int size = lseek(fd, 0, SEEK_END); //获取文件的大小(指针被移动到尾部了)
lseek(fd, 0, SEEK_SET); //将指针移回头部
while (offset < size) { //当偏移量到达要发送文件大小后(说明发送完了),就直接退出循环
int ret = sendfile(cfd, fd, &offset, size - offset); //参1是通信描述符;参2的文件描述符;参3是参2的偏移量;参4是open打开文件的大小
//sendfile的第3个参数offset:
// 1.发送数据之前,根据该偏移量开始读文件数据
// 2.发送数据之后,更新该偏移量(不需要我们手动设置,系统自动更新偏移量)
//printf("ret value: %d\n", ret);
if (ret == -1 && errno == EAGAIN) {
printf("没有数据.....\n");
}
}

#endif
close(fd);
}

组织响应体类HttpResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//定义状态码枚举类
enum HttpStatusCode {
Unknown,
OK = 200,
MovedPermanently = 301,
MovedTemporarily = 302,
BadRequest = 400,
NotFound = 404
};

//定义响应的结构体(一个结构体对应一个key-value)
struct ResponseHeader {
char key[32];
char value[128];
};

//定义一个函数指针,用来组织要回复给客户端的数据块
typedef void (*responseBody)(const char* fileName, struct Buffer* sendBuf, int socket);

//定义结构体
struct HttpResponse {
//状态行:状态码,状态描述
enum HttpStatusCode statusCode; //状态码
char statusMsg[128]; //状态描述
char fileName[128]; //文件名
//响应头:键值对
struct ResponseHeader* headers; //通过malloc多开辟几个该结构体空间,即形成一个数组,headers指向数组头
int headerNum; //响应头里面有效的元素个数
responseBody sendDataFunc; //回复给客户端的数据块(函数指针类型),也可以理解为回调函数
};

功能实现文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
struct HttpResponse* httpResponseInit()
{
struct HttpResponse* response = (struct HttpResponse*)malloc(sizeof(struct HttpResponse));
response->headerNum = 0; //初始化响应头的个数为0
int size = sizeof(struct ResponseHeader) * ResHeaderSize;
response->headers = (struct ResponseHeader*)malloc(size); //开辟存放响应头的空间
response->statusCode = Unknown; //初始化状态码为未知
//初始化数组
bzero(response->headers, size);
bzero(response->statusMsg, sizeof(response->statusMsg)); //状态描述
bzero(response->fileName, sizeof(response->fileName)); //文件名
//函数指针
response->sendDataFunc = NULL;
return response;
}

void httpResponseDestroy(struct HttpResponse* response)
{
if (response != NULL) {
free(response->headers); //释放响应头数组
free(response);
}
}

void httpResponseAddHeader(struct HttpResponse* response, const char* key, const char* value)
{
if (response == NULL || key == NULL || value == NULL) { //当着三个1其中一个出现空时,都是错误的
return;
}
strcpy(response->headers[response->headerNum].key, key); //c语言不能直接赋值,需要进行拷贝
strcpy(response->headers[response->headerNum].value, value); //拷贝value
response->headerNum++; //元素个数++

}

//组织http响应数据(将之前添加到response里面的数据写入到sendBuf里面)
void httpResponsePrepareMsg(struct HttpResponse* response, struct Buffer* sendBuf, int socket)
{
//状态行
char tmp[1024] = { 0 };
sprintf(tmp, "HTTP/1.1 %d %s\r\n", response->statusCode, response->statusMsg); //拼接状态行到tmp
bufferAppendString(sendBuf, tmp); //将数据tmp写到sendBuf
//响应头
for (int i = 0; i < response->headerNum; i++) {
sprintf(tmp, "%s: %s\r\n", response->headers[i].key, response->headers[i].value);
bufferAppendString(sendBuf, tmp); //将拼接的响应头中的每一行都依次存入到sendBuf
}
//空行
bufferAppendString(sendBuf, "\r\n");
//目前,http响应体(状态行、状态头、空行)已经组织好放在了sendbuf(如果采用边发边发送的模式,就可以先发送了)
#ifndef MSG_SEND_AUTO //如果定义了该宏(说明使用一起发的模式),下面代码就无效
bufferSendData(sendBuf, socket); //发送数据(先将sendbuf里面的数据先发送)
#endif

//回复的数据(该函数指针指向的是两个目录,一个是发送目录,一个是发送文件),在processHttpRequest函数里面就让该函数指针指向对应的函数地址
response->sendDataFunc(response->fileName, sendBuf, socket);
}

流程:

从main()函数开始,先通过函数tcpServerInit()创建一个服务器的实例,并设置了主线程启动后,它线程池里面的子线程个数为4,这样就得到了一个服务器的实例对象server,随后就可以调用它的一个Run方法了。

在启动服务器的时候,就是把线程池启动起来,并且把用于监听的套接字用于封装,然后把它放到了当前主线程对应的反应堆模型里面,之后主线程的反应堆模型就运行起来了,那么它底层的pool、epool或select也就运行起来了。它就可以检测监听描述符lfd里面的事件,这里是指读事件。如果有新的客户端连接,读事件就触发了,然后读回调函数acceptConnection就被调用了。

在读回调函数acceptConnection里,它第一件事就是和客户端建立连接得到了一个通信的文件描述符,然后从主线程里面取出了一个子线程,并且把子线程的反应堆模型evLoop取出来,然后把用于通信的文件描述符放到了evLoop里面。在tcpConnectionInit()函数里面,其实就是把cfd进行了封装,最终得到了一个新的channel,然后把这个channel放到了evLoop子线程的反应堆模型里面。

总的来说,就是当主线程建立连接之后,它并不会去处理与客户端的通信,和客户端的通信全都是在子线程里面处理的。