23 March 2020

前言

谈起epoll大家应该都不陌生,可以算是当前linux系统下支持高性能网络IO的底层基石了,我们常用的支持高并发的网络服务,比如Redis、 Nginx、Kafka、Netty,Swoole等等,底层无一例外使用epoll,所以我觉得每个程序员都应该对epoll有非常透彻的理解,借着疫情待岗期间, 我把我对epoll的理解从新梳理了一下,用C语言把几种常见的socket网络通信方式验证了一遍,包括同步阻塞方式,select方式和epoll方式, 从中查漏补缺,并且整理成一篇文章,以便日后温习,希望对大家也有些帮助。

我觉得理解一个东西至少要分四个层次,首先是看懂或听懂别人的讲解,这一步是被动的接受,表面上貌似看懂了,其实 还有大量细节不清楚,并且抓不住其中的重点,还很容易遗忘。

第二步需要自己尝试,必须亲自写一些demo验证,这时你会发现之前 的理解还很肤浅,有大量遗漏的知识点,很多疑问会自己涌现出来,然后再去查找资料,这样理解才能深刻也记得更牢。

第三步是讲给别人听或者自己写一篇技术博客,这一步的要求就更高了,因为你要避免出丑露怯或被人问住,更需要对一些边边角角的细节都能了解并准确描述, 还要自己组织语言,让别人能听懂。

最后一步就是正式应用于生产实践中了,这一步是可遇而不可求的,因为越是底层的技术(例如epoll)越不容易用于生产实践,而一旦用于生产实践 那一定是非常核心并且通用的服务。但是这并不意味着学习底层技术是没有意义的,正相反,理解计算机的底层逻辑对日常开发具有非常积极的意义, 这一点不必细说了,相信大家都能理解。

epoll并不是一项新技术,已经诞生了十几年,网上教程、博客、技术资料多如牛毛,水平也是参差不齐, 你要是再写,总得有点特别的地方,对此,我发现网上很多文章都讲了某一部分知识点,全面完整的描述不多,我的定位是尽可能的把一些知识点串起来, 加入我个人的理解,特别是有些细节在其他技术文章中容易忽略但又容易造成迷惑的地方,就重点聊一下,还有一些知识点,虽然也很重要, 但在网上或书中能够很容易找到答案,就不做过多介绍了。另外,前人为了便于理解画过很多优秀的图,我就直接拿来主义了(会给出原文出处), 在此对前人的工作表示感谢。我也只能说,个人水平有限,如果出现一些纰漏,欢迎大家指正。

硬件原理

首先,从硬件上说,A、B两台主机通信,数据通过网络从A主机传输到B主机时,必先经过B主机网卡,然后再从网卡写入到内存中。

注意这里写入的内存叫做内核缓冲区,属于内核空间,只有操作系统能够访问。

网卡把数据写到内存以后,会向CPU发出一个中断信号,CPU收到这个信号,会中断正在执行的程序,开始执行网卡中断处理函数。

网卡中断处理函数主要干两件事,首先要通知网卡数据收到了,第二是要把数据从网卡缓冲区中尽快写入内核缓冲区,这件事需要尽快完成 (及时处理中断),不然网卡缓冲区排在前面的数据就被覆盖了。

数据从网卡写入内核缓冲区这一步是需要花时间的,CPU不会干等着什么都不做,操作系统会控制当前进程出让CPU,让CPU去做别的事, 等数据都写好了,再唤醒当前进程继续执行。

此时这个进程就会处于阻塞等待状态,等数据拷贝完成后,系统会唤醒这个进程继续执行。

那么阻塞和唤醒的本质是什么呢?

首先,当进程执行到创建socket语句时,操作系统会创建一个socket对象,这个对象属于文件系统的一员(注意理解linux系统一切皆文件, 后面会提到 epfd 对象也是文件系统的一员),返回一个int型文件句柄 fd,这个socket对象主要包含3部分:

  • 发送缓冲区
  • 接收缓冲区
  • 等待列表 - 注意这个等待列表是接下来要重点提及的

操作系统还维护了一个工作队列,凡是处于运行状态的进程都会加入到这个工作队列中,CPU通过轮询分时执行,同一时刻只会处理一个进程, 因为进程切换很快,给人的感觉就像多个进程在同时执行。

当进程需要阻塞时,操作系统就把这个进程从工作队列中删除,这样下次轮询就不会轮到这个进程了。

同时,操作系统会把这个进程加到这个进程监视的所有socket对象的等待列表中。

注意等待列表可能不是一个,而是多个,每个socket都有自己的等待列表,一个进程A可能同时监视了10个socket,进程进入阻塞状态时, 就得把进程A的指针插入到这10个socket的等待列表里(轮询一遍)。

当数据准备好以后(从网卡拷贝到内存中完成),就可以唤醒进程了,唤醒的具体操作是什么呢?

操作系统会再次轮询所有监视的socket,把进程A从它们各自的等待队列中删除,再把进程A加入当前工作队列中,下次CPU时间片切换时就会 继续执行这个进程了。

下面介绍一下几种常见的网络I/O模型,并贴出C语言版demo代码,代码都经过测试可以跑通。

网络I/O模型之阻塞模式

这是比较原始的I/O模型,也是最好理解的。

图,待补充。。。

客户端代码:(简单起见我后面都用这个客户端测试了)

#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#define SERVER_PORT 6666

/*
连接到服务器后,会不停循环,等待输入,
输入quit后,断开与服务器的连接
*/
int main() {
    //客户端只需要一个套接字文件描述符,用于和服务器通信
    int clientSocket;
    //描述服务器的socket
    struct sockaddr_in serverAddr;
    char sendbuf[200];
    char recvbuf[200];
    int iDataNum;
    if((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0){
        perror("socket");
        return 1;
    }
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);

    //指定服务器端的ip,本地测试:127.0.0.1
    //inet_addr()函数,将点分十进制IP转换成网络字节序IP
    serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
    if(connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
        perror("connect");
        return 1;
    } else {
        printf("connect ok\n");
    }
    printf("连接到主机...\n");
    while(1) {
        printf("发送消息:");
        scanf("%s", sendbuf);
        printf("\n");
        send(clientSocket, sendbuf, strlen(sendbuf), 0);
        if(strcmp(sendbuf, "quit") == 0) break;
        printf("读取消息:");
        recvbuf[0] = '\0';
        iDataNum = recv(clientSocket, recvbuf, 200, 0);
        recvbuf[iDataNum] = '\0';
        printf("%s\n", recvbuf);
    }
    close(clientSocket);
    return 0;
}

    

服务器端代码:

#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#define SERVER_PORT 6666

/*
监听后,一直处于accept阻塞状态,
直到有客户端连接,
当客户端如数quit后,断开与客户端的连接
*/
int main() {
    //调用socket函数返回的文件描述符
    int serverSocket;
    //声明两个套接字sockaddr_in结构体变量,分别表示客户端和服务器
    struct sockaddr_in server_addr;
    struct sockaddr_in clientAddr;
    int addr_len = sizeof(clientAddr);
    int client;
    char buffer[200];
    int iDataNum;

    //socket函数,失败返回-1
    //int socket(int domain, int type, int protocol);
    //第一个参数表示使用的地址类型,一般都是ipv4,AF_INET
    //第二个参数表示套接字类型:tcp:面向连接的稳定数据传输SOCK_STREAM
    //第三个参数设置为0
    if((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        return 1;
    }
    bzero(&server_addr, sizeof(server_addr));
    //初始化服务器端的套接字,并用htons和htonl将端口和地址转成网络字节序
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    //ip可是是本服务器的ip,也可以用宏INADDR_ANY代替,代表0.0.0.0,表明所有地址
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    //对于bind,accept之类的函数,里面套接字参数都是需要强制转换成(struct sockaddr *)
    //bind三个参数:服务器端的套接字的文件描述符,
    if(bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("connect");
        return 1;
    }

    //设置服务器上的socket为监听状态
    if(listen(serverSocket, 5) < 0) {
        perror("listen");
        return 1;
    }

    while(1)
    {
        printf("监听端口: %d\n", SERVER_PORT);
        //调用accept函数后,会进入阻塞状态
        //accept返回一个套接字的文件描述符,这样服务器端便有两个套接字的文件描述符,
        //serverSocket和client。
        //serverSocket仍然继续在监听状态,client则负责接收和发送数据
        //clientAddr是一个传出参数,accept返回时,传出客户端的地址和端口号
        //addr_len是一个传入-传出参数,传入的是调用者提供的缓冲区的clientAddr的长度,以避免缓冲区溢出。
        //传出的是客户端地址结构体的实际长度。
        //出错返回-1
        client = accept(serverSocket, (struct sockaddr*)&clientAddr, (socklen_t*)&addr_len);
        if(client < 0) {
            perror("accept");
            continue;
        } else {
            printf("accept %d\n", htons(clientAddr.sin_port));
        }

        printf("等待消息...\n");
        //inet_ntoa ip地址转换函数,将网络字节序IP转换为点分十进制IP
        //表达式:char *inet_ntoa (struct in_addr);
        printf("IP is %s\n", inet_ntoa(clientAddr.sin_addr));
        printf("Port is %d\n", htons(clientAddr.sin_port));
        while(1) {
            printf("读取消息:");
            buffer[0] = '\0';
            iDataNum = recv(client, buffer, 1024, 0);
            if(iDataNum < 0) {
                perror("recv null");
                continue;
            }

            buffer[iDataNum] = '\0';
            if(strcmp(buffer, "quit") == 0) {
                break;
            }
            printf("%s\n", buffer);
            printf("发送消息:");
            scanf("%s", buffer);
            printf("\n");
            send(client, buffer, strlen(buffer), 0);
            if(strcmp(buffer, "quit") == 0)
            break;
        }
    }
    close(serverSocket);
    return 0;
}
    

以上阻塞模式要想实现并发响应用户请求,只能使用多线程或多进程,每一个socket都fork出一个进程出来,开销非常大,记得很久以前的web服务器Apche就是 这么干的,在高并发的场景下肯定不行,所以就出现了I/O多路复用技术,I/O多路复用又分3种,select、poll和epoll,select和poll差不多,本质上没有太大区别, epoll是I/O多路复用的终极版本,适用于高并发场景,下面重点介绍一下select和epoll。

网络I/O模型之select

服务器端代码:

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

int main()
{
	int server_sockfd, client_sockfd;
	int server_len, client_len;
	struct sockaddr_in server_address;
	struct sockaddr_in client_address;
	int nready;
	fd_set readfds, testfds;
	server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket
	server_address.sin_family = AF_INET;
	server_address.sin_addr.s_addr = htonl(INADDR_ANY);
	server_address.sin_port = htons(6666);
	server_len = sizeof(server_address);
	bind(server_sockfd, (struct sockaddr *)&server_address, server_len);
	listen(server_sockfd, 5);
	FD_ZERO(&readfds);
	FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中
    printf("server_sockfd=%d\n", server_sockfd);

    int maxfd = server_sockfd;
	while(1) {
        char buffer[200];
        int iDataNum;

		int fd;
		int nread;
		testfds = readfds;
		printf("server waiting\n");

		/*无限期阻塞,并测试文件描述符变动 */
		nready = select(maxfd+1, &testfds, (fd_set *)0, (fd_set *)0, (struct timeval *) 0);
		if(nready < 1) {
			perror("server");
			exit(1);
		}

		/*扫描所有的文件描述符*/
		for(fd = 0; fd < FD_SETSIZE; fd++) {
			/*找到相关文件描述符*/
			if(FD_ISSET(fd, &testfds))
			{
		     	/*判断是否为服务器套接字,是则表示为客户请求连接。*/
				if(fd == server_sockfd)
				{
					client_len = sizeof(client_address);
					client_sockfd = accept(server_sockfd, (struct sockaddr *)&client_address, (socklen_t*)&client_len);
					FD_SET(client_sockfd, &readfds);//将客户端socket加入到集合中
                    if (client_sockfd > maxfd) maxfd = client_sockfd;
					printf("adding client on fd %d\n", client_sockfd);
				} else {
                    /*客户端socket中有数据请求时*/
					ioctl(fd, FIONREAD, &nread);//取得数据量交给nread
					/*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */
					if(nread == 0) {
						close(fd);
						FD_CLR(fd, &readfds);
						printf("removing client on fd %d\n", fd);
					} else {
                        /*处理客户数据请求*/
                        buffer[0] = '\0';
                        iDataNum = recv(fd, buffer, 1024, 0);
                        if(iDataNum < 0) {
                            perror("recv null");
                            continue;
                        }
                        buffer[iDataNum] = '\0';
                        printf("serving client on fd %d\n", fd);
                        printf("%s\n", buffer);

                        send(fd, buffer, strlen(buffer), 0);
					}
				}
			}
		}
	}
}
    

网络I/O模型之epoll

服务器端代码:

#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

#define LISTENQ 20
#define BUFF_SIZE 1024

void setnonblocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

typedef struct _ev_data
{
    int fd; //socket
    char * buffer; //缓冲区
} ev_data;


int main(int argc, char* argv[])
{
    int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
    ssize_t n;
    char buffer[BUFF_SIZE];
    socklen_t clilen;

    if ( 2 == argc )
    {
        if( (portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr,"Usage:%s portnumber\a\n",argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr,"Usage:%s portnumber\a\n",argv[0]);
        return 1;
    }



    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev, events[20];

    //生成用于处理accept的epoll专用的文件描述符
    epfd = epoll_create(256);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    //把socket设置为非阻塞方式
    setnonblocking(listenfd);

    //设置与要处理的事件相关的文件描述符
    ev_data * data = malloc(sizeof(ev_data));
    data->fd = listenfd;
    data->buffer = NULL;
    ev.data.ptr = data;
    //设置要处理的事件类型
    ev.events=EPOLLIN|EPOLLET;

    //注册epoll事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr = "127.0.0.1";
    inet_aton(local_addr, &(serveraddr.sin_addr));//htons(portnumber);

    serveraddr.sin_port=htons(portnumber);
    bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    printf("server_sockfd=%d\n", listenfd);

    maxi = 0;
    while (1) {
        //等待epoll事件的发生

        nfds = epoll_wait(epfd, events, 20, 500);
        //处理所发生的所有事件

        for(i=0;i<nfds;++i)
        {
            if(((ev_data * )(events[i].data.ptr))->fd ==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
            {
                connfd = accept(listenfd,(struct sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }
                setnonblocking(connfd);

                //char *str = inet_ntoa(clientaddr.sin_addr);
                //printf("accapt a connection from %s", str);
                printf("adding client on fd %d\n", connfd);
                //设置用于读操作的文件描述符

                ev_data * data = malloc(sizeof(ev_data));
                data->fd = connfd;
                data->buffer = malloc(BUFF_SIZE);

                //ev.data.fd=connfd;
                ev.data.ptr = data;
                //设置用于注测的读操作事件
                ev.events=EPOLLIN|EPOLLET;

                //注册ev
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            }
            else if(events[i].events & EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。
            {
                ev_data *  pData = (ev_data * )(events[i].data.ptr);

                printf("EPOLLIN\n");
                if ( (sockfd = pData->fd) < 0)
                    continue;
                if ( (n = read(sockfd, pData->buffer, BUFF_SIZE)) < 0) {
                    if (errno == ECONNRESET) {
                        printf("ECONNRESET close fd %d\n", sockfd);
                        close(sockfd);
                        pData->fd = -1;
                    } else
                        printf("read error\n");
                } else if (n == 0) {
                    printf("read len is 0 on fd %d, close fd\n", sockfd);
                    close(sockfd);
                    pData->fd = -1;
                }
                if (pData->fd > 0) {
                    pData->buffer[n] = '\0';
                    printf("read %s\n", pData->buffer);
                    //设置用于写操作的文件描述符

                    ev.data.ptr = pData;
                    //设置用于注测的写操作事件
                    ev.events = EPOLLOUT|EPOLLET;
                    //修改sockfd上要处理的事件为EPOLLOUT
                    epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
                }
                printf("\n");
            }
            else if(events[i].events&EPOLLOUT) // 如果有数据发送
            {
                printf("EPOLLOUT\n");
                ev_data *  pData = (ev_data * )(events[i].data.ptr);
                write(pData->fd, pData->buffer, n);
                //设置用于读操作的文件描述符

                ev.data.ptr = pData;
                //设置用于注测的读操作事件

                ev.events = EPOLLIN|EPOLLET;
                //修改sockfd上要处理的事件为EPOLIN

                epoll_ctl(epfd, EPOLL_CTL_MOD, pData->fd, &ev);

                printf("\n");
            }
        }
    }
    return 0;
}