I/O多路复用 - select、poll、epoll讲解(epoll工作图解介绍+红黑树)_epoll 红黑树-程序员宅基地

技术标签: c语言  运维  linux  服务器  unix  UNIX环境高级编程  


当有多个多种事件同一段时间发生时,多进程/多线程模式可以解决,但是创建进程和创建线程都是需要时间开销的。在编写服务器客户端程序的时候,如果服务器性能一般,但是客户端连接的又太多的时候,这会造成很大的代价。

该篇要说的多路复用就是解决这中问题的办法之一。通过多路复用来监听,是否有事件发生,并把发生的事件是什么告诉服务器端,然后执行相关操作。这种方法叫做多路复用。

多路复用有三种,下面一一介绍:select、poll以及epoll多路复用。


1- select多路复用

(1)工作原理

我们先讲解select的工作原理,然后再用代码讲解进行实践。

可以理解为select监视并等待多个文件描述符的属性发生变化,它监视的属性分3类,分别是read-fds(文件描述符有数据到来可读)、write_fds(文件描述符可写)、和except_fds(文件描述符异常)。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间)发生函数才返回。当select()函数返回后,可以通过遍历 fdset,来找到究竟是哪些文件描述符就绪。

函数原型:

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *timeout);
  • nfds:
    指待测试的fd的总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到max_fd-1扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是8,9,10,那么Linux内核实际也要监测0-7,此时真正带测试的文件描述符是0-10总共11个,即max(8,9,10)+1,所以第一个参数是所有要监听的文件描述符中最大的+1;
  • read_fds、write_fds、except_fds:
    中间三个参数read_fds、write_fds和except_fds指定要让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL;
  • timeout:
    最后一个参数是设置select的超时时间,如果设置为NULL则永不超时。timeval结构体: struct timeval { long tv_sec; /* seconds 秒*/ long tv_usec; /* microseconds 微妙*/ };

操作文件描述符的几个函数:

在这里插入图片描述

注意:

在Linux内核有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数中,这也意味着select所用到的FD_SET是有限的,也正是这个原因select()默认只能同时处理1024个客户端的连接请求: /linux/posix_types.h: #define __FD_SETSIZE 1024

(2)select-服务器编程

socket_server_socket.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void msleep(unsigned long ms);
static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        fds_array[1024];
	fd_set   rdset;
	int        maxfd = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS- successful execution of a
				//program (程序的成功执行)   EXIT_FAILUEEunsuccessful execution of a program (程序的不成功执行) 

			default:
					break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", programname, server_port);

	if(daemon_run)
	{
    
		daemon(0, 0);
	}

	for( i = 0; i < ARRAY_SIZE(fds_array); i++)
	{
    
		fds_array[i] = -1;//将数组里面的内容都设为-1,为什么不设为0,因为文件描述符也可能为0
	}
	fds_array[0] = listenfd;//刚开始只有fds_array【0】有内容listenfd
	
	for( ; ; )
	{
    
		FD_ZERO(&rdset);//清空rdset集合中的值
		for( i = 0; i < ARRAY_SIZE(fds_array); i++)
		{
    
			if(fds_array[i] < 0)//初始值为-1,<0表示还未被占用
			{
    
				continue;
			}
			maxfd = fds_array[i] >maxfd ? fds_array[i] : maxfd;//三目运算符
			FD_SET(fds_array[i], &rdset);//将数组中的描述符全部放到rdset集合中去,第一次将listenfd放到集合中去
		}

		rv = select(maxfd+1, &rdset, NULL, NULL, NULL);//不关心写,异常和超时;
		if(rv < 0)
		{
    
			printf("select get timeout");
			continue;
		}

		if( FD_ISSET(listenfd, &rdset) )//判断集合中是否有listenfd
		{
    
			if( (connfd = accept( listenfd, (struct sockaddr*)NULL, NULL)) < 0)
			{
    
				printf("accept new client failure:%s\n", strerror(errno));
				continue;
			}
			found = 0;
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i] < 0)
				{
    
					printf("accept new client[%d] and add it into array\n", connfd);
					fds_array[i] = connfd;
					found =1;
					break;
				}
			}

			if( !found )
			{
    
				printf("accept new client[%d] but full, so refuse it\n", connfd);
				close(connfd);
			}
		}

		else/*data arrive from already connected client*/
		{
    
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i]<0 || !FD_ISSET(fds_array[i], &rdset) )
				{
    
					continue;
				}
                memset(buf, 0, sizeof(buf));
				if( (rv = read(fds_array[i], buf, sizeof(buf))) <= 0 )
				{
    
					printf("socket[%d] read failure or get disconnect.\n", fds_array[i]);
					close(fds_array[i]);
					fds_array[i] = -1;
				}
				else
				{
    
					printf("socket[%d] read get %d bytes data:%s\n", fds_array[i], rv, buf);
					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}

					if( (write(fds_array[i], buf, rv)) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", fds_array[i], strerror(errno));
						close(fds_array[i]);
						fds_array[i] = -1;
					}
				}
			}
		}
	}
Cleanup:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)//inline函数也称为内联函数或内嵌函数,_inline定义的类的内联函数,函数代码被放入符号调用表,使用时直接展开,不需要调用,即在编译期间将所调用的函数的代码直接嵌入到主调函数中,是一种以空间换时间的函数。如果不加static,则表示该函数有可能会被其他编译单元所调用,所以一定会产生函数本身的代码。所以加了static,一般可令可执行文件变小。
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
		return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}
Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

static inline void msleep(unsigned long ms)
{
    
	struct timeval   tv;
	tv.tv_sec = ms/1000;
	tv.tv_usec = (ms%1000)*1000;

	select(0, NULL, NULL, NULL, &tv);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

 ./socket_server_select -p 12345
 
socket_server_select server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data:I am 4 hello
socket[5] read get 12 bytes data:I am 5 hello

在这里插入图片描述


2- poll多路复用

(1)工作原理

我们先讲解poll的工作原理,然后再用代码讲解进行实践。

poll()的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

函数原型:

#include <poll.h>
struct pollfd{
    
 int fd; /* 文件描述符 */
 short events; /* 等待的事件 */
 short revents; /* 实际发生了的事件 */
} ;
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • nfds:
    指定数组中监听的元素个数;
  • timeout:
    指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。
  • fds:
    用来指向一个struct pollfd类型的数组,每一个pollfd结构体指定了一个被监视的文件描述符,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域,events域中请求的任何事件都可能在revents域中返回。下表列出指定 events 标志以及测试 revents 标志的一些常值:

在这里插入图片描述
返回值注意事项:
该函数成功调用时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:

  • EBADF:一个或多个结构体中指定的文件描述符无效。
  • EFAULTfds:指针指向的地址超出进程的地址空间。
  • EINTR:请求的事件之前产生一个信号,调用可以重新发起。
  • EINVALnfds:参数超出PLIMIT_NOFILE值。
  • ENOMEM:可用内存不足,无法完成请求。

(2)poll-服务器编程

socket_server_poll.c服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>

#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))

static inline void print_usage(char *progname);
int socket_server_init(char *listen_ip, int listen_port);

int main(int argc, char **argv)
{
    
    int listenfd, connfd;
    int serv_port = 0;
    int daemon_run = 0;
    char *progname = NULL;
    int opt;
    int rv;
    int i, j;
    int found;
    int max;
    char buf[1024];
    struct pollfd fds_array[1024];
    struct option long_options[] =
    {
     
        {
    "daemon", no_argument, NULL, 'b'},
        {
    "port", required_argument, NULL, 'p'},
        {
    "help", no_argument, NULL, 'h'},
        {
    NULL, 0, NULL, 0}
    }; 
    progname = basename(argv[0]);
    /* Parser the command line parameters */
    while ((opt = getopt_long(argc, argv, "bp:h", long_options, NULL)) != -1)
    {
     
        switch (opt)
        {
     
            case 'b':
                daemon_run=1;
                break;
            case 'p':
                serv_port = atoi(optarg);
                break;
            case 'h': /* Get help information */
                print_usage(progname);
                return EXIT_SUCCESS;
            default:
                break;
        } 
    } 
    if( !serv_port )
    {
     
        print_usage(progname);
        return -1;
    }
    if( (listenfd=socket_server_init(NULL, serv_port)) < 0 )
    {
    
        printf("ERROR: %s server listen on port %d failure\n", argv[0],serv_port);
        return -2;
    }
    printf("%s server start to listen on port %d\n", argv[0],serv_port);
    /* set program running on background */
    if( daemon_run )
    {
    
        daemon(0, 0);
    }
    for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
    {
    
        fds_array[i].fd=-1;
    }
    fds_array[0].fd = listenfd;
    fds_array[0].events = POLLIN;
    max = 0;
    for ( ; ; )
    {
    
        /* program will blocked here */
        rv = poll(fds_array, max+1, -1);
        if(rv < 0)
        {
    
            printf("select failure: %s\n", strerror(errno));
            break;
        }
        else if(rv == 0)
        {
    
            printf("select get timeout\n");
            continue;
        }
        /* listen socket get event means new client start connect now */
        if (fds_array[0].revents & POLLIN)
        {
    
            if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL)) < 0)
            {
    
                printf("accept new client failure: %s\n", strerror(errno));
                continue;
            }
            found = 0;
            for(i=1; i<ARRAY_SIZE(fds_array) ; i++)
            {
    
                if( fds_array[i].fd < 0 )
                {
    
                    printf("accept new client[%d] and add it into array\n", connfd );
                    fds_array[i].fd = connfd;
                    fds_array[i].events = POLLIN;
                    found = 1;
                    break;
                }
            }
            if( !found )
            {
    
                printf("accept new client[%d] but full, so refuse it\n", connfd);
                close(connfd);
                continue;
            }

            max = i>max ? i : max;
            if (--rv <= 0)
                continue;
        }
        else /* data arrive from already connected client */
        {
    
            for(i=1; i<ARRAY_SIZE(fds_array); i++)
            {
    
                memset(buf, 0, sizeof(buf));
                if( fds_array[i].fd < 0 )
                    continue;
                if( (rv=read(fds_array[i].fd, buf, sizeof(buf))) <= 0)
                {
    
                    printf("socket[%d] read failure or get disconncet.\n", fds_array[i].fd);
                    close(fds_array[i].fd);
                    fds_array[i].fd = -1;
                }
                else
                {
    
                    printf("socket[%d] read get %d bytes data: %s\n", fds_array[i].fd, rv, buf);
                    /* convert letter from lowercase to uppercase */
                    for(j=0; j<rv; j++)
                        buf[j]=toupper(buf[j]);
                    if( write(fds_array[i].fd, buf, rv) < 0 )
                    {
    
                        printf("socket[%d] write failure: %s\n", fds_array[i].fd, strerror(errno));
                        close(fds_array[i].fd);
                        fds_array[i].fd = -1;
                    }
                }
            }
        }
    }
CleanUp:
    close(listenfd);
    return 0;
}
static inline void print_usage(char *progname)
{
    
    printf("Usage: %s [OPTION]...\n", progname);

    printf(" %s is a socket server program, which used to verify client and echo back string from it\n",
            progname);
    printf("\nMandatory arguments to long options are mandatory for short options too:\n");

    printf(" -b[daemon ] set program running on background\n");
    printf(" -p[port ] Socket server port address\n");
    printf(" -h[help ] Display this help information\n");

    printf("\nExample: %s -b -p 8900\n", progname);
    return ;
}
int socket_server_init(char *listen_ip, int listen_port)
{
    
    struct sockaddr_in servaddr;
    int rv = 0;
    int on = 1;
    int listenfd;
    if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
    
        printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
        return -1;
    }
    /* Set socket port reuseable, fix 'Address already in use' bug when socket server restart */
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET; 
    servaddr.sin_port = htons(listen_port);
    if( !listen_ip ) /* Listen all the local IP address */
    {
    
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 
    }
    else /* listen the specified IP address */
    {
    
        if (inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
        {
    
            printf("inet_pton() set listen IP address failure.\n");
            rv = -2;
            goto CleanUp;
        }
    }
    if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -3;
        goto CleanUp;
    }
    if(listen(listenfd, 13) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -4;
        goto CleanUp;
    }
CleanUp:
    if(rv<0)
        close(listenfd);
    else
        rv = listenfd;
    return rv;
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_poll -p 12345

./socket_server_poll server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data: I am 4 hello
socket[5] read get 12 bytes data: I am 5 hello
socket[4] read get 18 bytes data: I am 4 hello hello
socket[5] read get 18 bytes data: I am 5 hello hello

在这里插入图片描述


3- epoll多路复用

(1)工作原理

我们先讲解epoll的工作原理,然后再用代码讲解进行实践。

在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等IO多路复用的方法来实现并发服务程序。自Linux 2.6内核正式引入epoll以来,epoll已经成为了目前实现高性能网络服务器的必备技术,在大数据、高并发、集群等一些名词唱得火热之年代,select和poll的用武之地越来越有限,风头已经被epoll占尽。

epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分(三个函数):
创建epoll实例:epoll_create()

#include <sys/epoll.h>
int epoll_create(int size);

系统调用epoll_create()创建了一个新的epoll实例,其对应的兴趣列表初始化为空。若成功返回文件描述符,若出错返回-1。参数size指定了我们想要通过epoll实例来检查的文件描述符个数。该参数并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。
作为函数返回值,epoll_create()返回了代表新创建的epoll实例的文件描述符。这个文件描述符在其他几个epoll系统调用中用
来表示epoll实例。当这个文件描述符不再需要时,应该通过close()来关闭。

修改epoll的兴趣列表:epoll_ctl()

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

系统调用epoll_ctl()能够修改由文件描述符epfd所代表的epoll实例中的兴趣列表。若成功返回0,若出错返回-1。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • op:
    第二个参数op用来指定需要执行的操作,它可以是如下几种值:
    在这里插入图片描述

  • fd:
    第三个参数fd指明了要修改兴趣列表中的哪一个文件描述符的设定。

  • ev:
    第四个参数ev是指向结构体epoll_event的指针,结构体的定义如下:

typedef union epoll_data{
    
 void *ptr; /* Pointer to user-defind data */
 int fd; /* File descriptor */
 uint32_t u32; /* 32-bit integer */
 uint64_t u64; /* 64-bit integer */
} epoll_data_t;

struct epoll_event{
    
 uint32_t events; /* epoll events(bit mask) */
 epoll_data_t data; /* User data */
};

事件等待:epoll_wait()

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

系统调用epoll_wait()返回epoll实例中处于就绪态的文件描述符信息,单个epoll_wait()调用能够返回多个就绪态文件描述符的信息。调用成功后epoll_wait()返回数组evlist中的元素个数,如果在timeout超时间隔内没有任何文件描述符处于就绪态的话就返回0,出错时返回-1并在errno中设定错误码以表示错误原因。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • evlist:
    第二个参数evlist所指向的结构体数组中返回的是有关就绪态文件描述符的信息,数组evlist的空间由调用者负责申请;

  • maxevents:
    第三个参数maxevents指定所evlist数组里包含的元素个数;

  • timeout
    第四个参数timeout用来确定epoll_wait()的阻塞行为,有如下几种:

    timeout = -1:调用将一直阻塞,直到兴趣列表中的文件描述符上有事件产生或者直到捕获到一个信号为止。
    timeout =  0:执行一次非阻塞式地检查,看兴趣列表中的描述符上产生了哪个事件。
    timeout >  0:调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止。  
    

(2)select-服务器编程

socket_server_epoll.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/epoll.h>
#include<sys/resource.h>

#define MAX_EVENTS         512
#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);
void set_socket_rlimit(void);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	int       epollfd;
	struct  epoll_event    event;
	struct  epoll_event    event_array[MAX_EVENTS];
	int       events;

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS  successful execution of a
						    //program (程序的成功执行)   EXIT_FAILUEE  unsuccessful execution of a program (程序的不成功执行) 

			default:
				break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	set_socket_rlimit();

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", argv[0], server_port);

	if(daemon_run < 0)
	{
    
		daemon(0, 0);
	}

	if( (epollfd = epoll_create(MAX_EVENTS)) < 0 )
			/*listen socket get event means new client start connect now*/

	{
    
		printf("epoll_create() failure: %s\n", strerror(errno));
		return -3;
	}
	event.events = EPOLLIN;
	event.data.fd = listenfd;

	if( epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0 )
	{
    
		printf("epoll:%d,listenfd:%d\n", epollfd, listenfd);
		printf("epoll add listen socket failure: %s\n", strerror(errno));
		return -4;
	}

	for( ; ; )
	{
    
		/*program will broked here*/
		events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1);
		if( events < 0 )
		{
    
			printf("epoll failure: %s\n", strerror(errno));
			break;
		}
		else if( events == 0)
		{
    
			printf("epoll get timeout\n");
			continue;
		}

		for( i=0; i<events; i++)
		{
    
			if( (event_array[i].events&EPOLLERR) || (event_array[i].events&EPOLLHUP) )
			{
    
				printf("epoll_wait get error on fd[%d]: %s\n", event_array[i].data.fd, strerror(errno) );
				epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
				close(event_array[i].data.fd);
			}

			/*listen socket get event means new client start connect now*/
			if( event_array[i].data.fd == listenfd )
			{
    
				if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL ) )< 0 )
				{
    
					printf("accept new client failure:%s\n", strerror(errno) );
					continue;
				}
				event.data.fd = connfd;
				event.events = EPOLLIN;
				if( epoll_ctl( epollfd, EPOLL_CTL_ADD, connfd, &event) < 0 )
				{
    
					printf("epoll add client socket failure: %s\n", strerror(errno) );
					close(event_array[i].data.fd);
					continue;
				}
				printf("epoll add new client socket[%d] ok.\n",connfd);
			}

			else
			{
    
                memset(buf, 0, sizeof(buf));
				if( (rv = read(event_array[i].data.fd, buf, sizeof(buf))) < 0 )
				{
    
					printf("socket[%d] read failure or get disconnected and will be removed.\n", event_array[i].data.fd);
					epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
					close(event_array[i].data.fd);
					continue;
				}

				else
				{
    
					printf("socket[%d] read get %d bytes data: %s\n", event_array[i].data.fd, rv, buf);

					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}
					if(write(event_array[i].data.fd, buf, rv) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", event_array[i].data.fd, strerror(errno) );
						epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
						close(event_array[i].data.fd);
					}
				}
			}
		}
	}	
CleanUp:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
	return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}

Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

void set_socket_rlimit(void)
{
    
	struct rlimit limit = {
    0};

	getrlimit(RLIMIT_NOFILE, &limit);
	limit.rlim_cur = limit.rlim_max;
	setrlimit(RLIMIT_NOFILE, &limit);

	printf("set socket open fd max count to %ld\n", limit.rlim_max);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_epoll -p 12345

set socket open fd max count to 1048576
./socket_server_epoll server start to listen on port 12345
epoll add new client socket[5] ok.
socket[5] read get 24 bytes data: I am 5 hello hello hello
epoll add new client socket[6] ok.
socket[6] read get 24 bytes data: I am 6 hello hello hello

在这里插入图片描述


4- epoll工作原理底层介绍

(1)工作机制

epoll的实现机制与select/poll机制完全不同。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

  • LT(level triggered)水平触发:是缺省的工作方式。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
  • ET (edge-triggered)边缘触发:是高速工作方式。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

(2)epoll工作图解

在这里插入图片描述

  1. epollfd = epoll_create()内核创建epoll实例(创建红黑树Red_Tree和就绪链表Read_List);
  2. epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event)对红黑树进行操作,添加listenfd节点;
  3. events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1)内核查找红黑树中就绪的socket,放入就绪列表;然后就绪列表将类容复制到struct epoll_event List数组中;
  4. 然后对数组中需要处理的socket直接进行判断读写操作。

至于红黑树具体是什么,怎么实现的,等我弄懂了再分享…好吧还是去看看吧,但是增删改查太麻烦了,有点难受,先理解一下浅层面的吧。

真心想了解的可以参考这篇文章:【数据结构】史上最好理解的红黑树讲解,让你彻底搞懂红黑树

(3)红黑树浅层理解

我们一步一步探索了解一下红黑树…

二叉搜索树
一棵空树或者满足以下性质的二叉树被称之为二叉搜索树:

  • 如果左子树不为空,则左子树所有结点值都小于根结点的值
  • 如果右子树不为空,则右子树所有结点值都大于或等于根结点的值
  • 任意一棵子树也是一棵二叉搜索树

在这里插入图片描述
但是极端情况下的搜索效率很低,于是就有了平衡二叉树(AVL数)

平衡二叉树(AVL数)
平衡树:任意节点的子树的高度差都小于等于1;

平衡二叉树:满足二叉树以及平衡树的特点,就是两者的结合体嘛。

平衡二叉树可以有效的减少二叉树的深度,从而提高了查询。但是效率严格平衡,代价高,还是不推荐。

红黑树(Red Black Tree R-B Tree)
一种特化的平衡二叉树(AVL树),在插入和删除时通过特定操作保持二叉查找树的相对平衡,从而获得较高的查找性能。
在这里插入图片描述
符合二叉树的基本特征,同时还具备以下特点:

  • 节点非黑即红 (每个节点要么是黑色,要么是红色)
  • 其根节点是黑色
  • 叶子节点是黑色(为了简单期间,一般会省略该节点)
  • 相邻节点不同为红色(红色节点的子节点必是黑色)
  • 从一个节点到该节点的下叶子节点的所有路径上包含的黑节点数量相等(这一点是平衡的关键)

记忆方法:黑根黑叶红不邻,同祖等高只数黑


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_51429770/article/details/129364697

智能推荐

JWT(Json Web Token)实现无状态登录_无状态token登录-程序员宅基地

文章浏览阅读685次。1.1.什么是有状态?有状态服务,即服务端需要记录每次会话的客户端信息,从而识别客户端身份,根据用户身份进行请求的处理,典型的设计如tomcat中的session。例如登录:用户登录后,我们把登录者的信息保存在服务端session中,并且给用户一个cookie值,记录对应的session。然后下次请求,用户携带cookie值来,我们就能识别到对应session,从而找到用户的信息。缺点是什么?服务端保存大量数据,增加服务端压力 服务端保存用户状态,无法进行水平扩展 客户端请求依赖服务.._无状态token登录

SDUT OJ逆置正整数-程序员宅基地

文章浏览阅读293次。SDUT OnlineJudge#include<iostream>using namespace std;int main(){int a,b,c,d;cin>>a;b=a%10;c=a/10%10;d=a/100%10;int key[3];key[0]=b;key[1]=c;key[2]=d;for(int i = 0;i<3;i++){ if(key[i]!=0) { cout<<key[i.

年终奖盲区_年终奖盲区表-程序员宅基地

文章浏览阅读2.2k次。年终奖采用的平均每月的收入来评定缴税级数的,速算扣除数也按照月份计算出来,但是最终减去的也是一个月的速算扣除数。为什么这么做呢,这样的收的税更多啊,年终也是一个月的收入,凭什么减去12*速算扣除数了?这个霸道(不要脸)的说法,我们只能合理避免的这些跨级的区域了,那具体是那些区域呢?可以参考下面的表格:年终奖一列标红的一对便是盲区的上下线,发放年终奖的数额一定一定要避免这个区域,不然公司多花了钱..._年终奖盲区表

matlab 提取struct结构体中某个字段所有变量的值_matlab读取struct类型数据中的值-程序员宅基地

文章浏览阅读7.5k次,点赞5次,收藏19次。matlab结构体struct字段变量值提取_matlab读取struct类型数据中的值

Android fragment的用法_android reader fragment-程序员宅基地

文章浏览阅读4.8k次。1,什么情况下使用fragment通常用来作为一个activity的用户界面的一部分例如, 一个新闻应用可以在屏幕左侧使用一个fragment来展示一个文章的列表,然后在屏幕右侧使用另一个fragment来展示一篇文章 – 2个fragment并排显示在相同的一个activity中,并且每一个fragment拥有它自己的一套生命周期回调方法,并且处理它们自己的用户输_android reader fragment

FFT of waveIn audio signals-程序员宅基地

文章浏览阅读2.8k次。FFT of waveIn audio signalsBy Aqiruse An article on using the Fast Fourier Transform on audio signals. IntroductionThe Fast Fourier Transform (FFT) allows users to view the spectrum content of _fft of wavein audio signals

随便推点

Awesome Mac:收集的非常全面好用的Mac应用程序、软件以及工具_awesomemac-程序员宅基地

文章浏览阅读5.9k次。https://jaywcjlove.github.io/awesome-mac/ 这个仓库主要是收集非常好用的Mac应用程序、软件以及工具,主要面向开发者和设计师。有这个想法是因为我最近发了一篇较为火爆的涨粉儿微信公众号文章《工具武装的前端开发工程师》,于是建了这么一个仓库,持续更新作为补充,搜集更多好用的软件工具。请Star、Pull Request或者使劲搓它 issu_awesomemac

java前端技术---jquery基础详解_简介java中jquery技术-程序员宅基地

文章浏览阅读616次。一.jquery简介 jQuery是一个快速的,简洁的javaScript库,使用户能更方便地处理HTML documents、events、实现动画效果,并且方便地为网站提供AJAX交互 jQuery 的功能概括1、html 的元素选取2、html的元素操作3、html dom遍历和修改4、js特效和动画效果5、css操作6、html事件操作7、ajax_简介java中jquery技术

Ant Design Table换滚动条的样式_ant design ::-webkit-scrollbar-corner-程序员宅基地

文章浏览阅读1.6w次,点赞5次,收藏19次。我修改的是表格的固定列滚动而产生的滚动条引用Table的组件的css文件中加入下面的样式:.ant-table-body{ &amp;amp;::-webkit-scrollbar { height: 5px; } &amp;amp;::-webkit-scrollbar-thumb { border-radius: 5px; -webkit-box..._ant design ::-webkit-scrollbar-corner

javaWeb毕设分享 健身俱乐部会员管理系统【源码+论文】-程序员宅基地

文章浏览阅读269次。基于JSP的健身俱乐部会员管理系统项目分享:见文末!

论文开题报告怎么写?_开题报告研究难点-程序员宅基地

文章浏览阅读1.8k次,点赞2次,收藏15次。同学们,是不是又到了一年一度写开题报告的时候呀?是不是还在为不知道论文的开题报告怎么写而苦恼?Take it easy!我带着倾尽我所有开题报告写作经验总结出来的最强保姆级开题报告解说来啦,一定让你脱胎换骨,顺利拿下开题报告这个高塔,你确定还不赶快点赞收藏学起来吗?_开题报告研究难点

原生JS 与 VUE获取父级、子级、兄弟节点的方法 及一些DOM对象的获取_获取子节点的路径 vue-程序员宅基地

文章浏览阅读6k次,点赞4次,收藏17次。原生先获取对象var a = document.getElementById("dom");vue先添加ref <div class="" ref="divBox">获取对象let a = this.$refs.divBox获取父、子、兄弟节点方法var b = a.childNodes; 获取a的全部子节点 var c = a.parentNode; 获取a的父节点var d = a.nextSbiling; 获取a的下一个兄弟节点 var e = a.previ_获取子节点的路径 vue

推荐文章

热门文章

相关标签