说一下这个从内核接收数据到EPOLL原理

1.网卡发现 MAC 地址符合,就将包收进来;发现 IP 地址符合,根据 IP 头中协议项,知道上一层是 TCP 协议;

2.DMA把TCP数据包copy到内核缓冲区;

3.触发CPU中断,中断程序摘除TCP头通过socket五要素(源IP/PORT、目的IP/PORT、协议)找到对应的socket文件,并把原始二进制数据报copy到socket接收缓冲区;

4.中断程序唤醒被阻塞的内核线程;

5.内核线程切换到用户线程把数据从socket接口缓冲区copy到应用内存;

二、中断处理流程

中断处理.png

I/O发出的信号的异常代码,拿到异常代码之后,CPU就会触发异常处理的流程。计算机在内存里会保存中断向量表,用来存放不同的异常代码对应的异常处理程序所在的地址。

CPU在拿到了异常码之后,会先把当前的程序执行的现场,保存到程序栈里面,然后根据异常码查询,找到对应的异常处理程序,最后把后续指令执行的指挥权,交给这个异常处理程序。

异常处理程序结束之后返回到原来指令执行的位置继续执行;

三、阻塞不占用 cpu

网卡何时接收到数据是依赖发送方和传输路径的,这个延迟通常都很高,是毫秒(ms)级别的。而应用程序处理数据是纳秒(ns)级别的。也就是说整个过程中,内核态等待数据,处理协议栈是个相对很慢的过程。这么长的时间里,用户态的进程是无事可做的,因此用到了“阻塞(挂起)”。

阻塞是进程调度的关键一环,指的是进程在等待某事件发生之前的等待状态。请看下表,在 Linux 中,进程状态大致有 7 种(在

include/linux/sched.h 中有更多状态):

从说明中可以发现,“可运行状态”会占用 CPU 资源,另外创建和销毁进程也需要占用 CPU 资源(内核)。重点是,当进程被”阻塞/挂起”时,是不会占用

CPU 资源的。

为了支持多任务,Linux 实现了进程调度的功能(CPU

时间片的调度)。而这个时间片的切换,只会在“可运行状态”的进程间进行。因此“阻塞/挂起”的进程是不占用 CPU 资源的。

四、工作队列和等待队列

工作队列和等待队列.png

工作队列:为了方便时间片的调度,所有“可运行状态”状态的进程组成的队列;

fd文件列表:内核打开的文件句柄,Linux一切皆文件,用户线程执行创建Socket时内核就会创建一个由文件系统管理的sock对象;

sock:socket内核中的数据结构,主要包含发送缓冲区、接收缓冲区、等待队列;

struct sock {

    __u32   daddr;  /* 外部IP地址   */

    __u32   rcv_saddr; /* 绑定的本地IP地址  */

    __u16   dport;  /* 目标端口   */

    __u16   sport;  /* 源端口    */

    unsigned short  family;  /* 地址簇   */

    int   rcvbuf;  /* 接受缓冲区长度(单位:字节) */

    struct sk_buff_head receive_queue; /* 接收包队列   */

    int   sndbuf;  /* 发送缓冲区长度(单位:字节)  */

    struct sk_buff_head write_queue; /* 包发送队列   */

    wait_queue_head_t *sleep;  /* 等待队列,通常指向socket的wait域 */

    ......

}

等待队列:等待当前socket的线程;

工作队列中线程执行到阻塞操作等待socket时,会从工作队列中移除,移动到该socket的等待队列中;当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。

五、BIO

public static void main(String[] args) throws IOException {

    ServerSocket serverSocket = new ServerSocket(9000);

    while (true) {

        // 没有连接-阻塞

        Socket socket = serverSocket.accept();

        byte[] bytes = new byte[1024];

        InputStream inputStream = socket.getInputStream();

        while (true) {

            // 没有数据-阻塞

            int read = inputStream.read(bytes);

            if (read != -1) {

                System.out.println(new String(bytes, 0, read));

            } else {

                break;

            }

        }

        socket.close();

    }

}

BIO模式存在两个阻塞点,一个时accept阻塞等待客户端连接,一个是阻塞等待socket请求数据;简单跟以下源码就会发现

new ServerSocket(9000)最终通过

  • int newfd = socket0(stream, false /*v6 Only*/);调用Linux int socket(int domain, int type, int protocol);创建服务端socket;
  • bind0(nativefd, address, port, exclusiveBind);调用Linux int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);绑定端口;
  • listen0(nativefd, backlog);调用Linux int listen(int sockfd, int backlog);设置监听;

接下来serverSocket.accept()最终通过

  • newfd = accept0(nativefd, isaa);调用Linux int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);阻塞等待客户端连接,会创建一个socket用于跟该客户端进行通信,返回socket文件描述符fd;

最后inputStream.read(bytes);调用Linux `ssize_t recv(int sockfd, void *buf,

size_t len, int flags);`阻塞从socket读缓冲区读取客户端请求数据;

可以通过 man 命令查看Linux 系统调用方法具体描述;

通过传统BIO的操作方式可以看出一个请求必须要创建一个内核线程进行处理,recv只能监视单个socket,当并发比较高时就会消耗大量系统资源,也就是所谓的C10K问题;那么如何解决这个问题呢?后面出现的多路复用

select / poll / epoll思路都是使用一个线程来处理若干个连接(监视若干个socket),类似餐厅服务员的角色。

六、select

select 方案是用一个 fd_set 结构体来告诉内核同时监控多个socket,当其中有socket的状态发生变化或超时,则调用返回。之后应用使用

FD_ISSET 来逐个查看是哪个socket的状态发生了变化。

6.1 select原理

以下面select伪代码为例:

int s = socket(AF_INET, SOCK_STREAM, 0);  

bind(s, ...)

listen(s, ...)

//接受客户端连接

int c = accept(s, ...)

int readSet[] =  存放需要监听的socket文件描述符

while(1){

    int n = select(..., readSet, ...)

    for(int i=0; i < readSet.count; i++){

        if(FD_ISSET(readSet[i], ...)){

            //fds[i]的数据处理

        }

    }

}

select 系统调用函数

# maxfd:表示的是待测试的描述符基数,它的值是待测试的最大描述符加 1

# readset:读描述符集合

# writeset:写描述符集合

# exceptset:错误描述符集合

# timeout:超时时间

int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);

上面示例代码中,先准备一个数组 readSet 存放着所有需要监视读事件的socket。然后调用select,如果 readSet

中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒线程。用户可以遍历

readSet,通过FD_ISSET (对这个数组进行检测,判断出对应socket的元素 readSetfd是 0 还是 1)判断具体哪个 socket

收到数据,然后做出处理。

6.2 select流程

select-1.png

线程 A 调用 select readSet 数组元素为sock1、sock2、sock3 ,此时3个socket 都没有数据可读,就把线程A

从工作队列中移除,并分别添加到3个sock 的等待队列中;

select-2.png

当3个sock中任意一个有数据可读时,中断程序都会把线程A

从所有sock等待队列中移除并重新加入工作队列,等待cpu时间片继续执行(即从select中返回)。但是此时线程A

并不知道哪个socket有数据,于是还要遍历readSet使用 FD_ISSET 来逐个查看是哪个socket的有数据可读。

6.3 select的缺点

1、每次调用select都需要将线程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fd_set列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。

2、由于只有一个字段记录关注和发生事件,每次调用之前要重新初始化 fd_set 结构体。

3、线程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。

七、poll

总结以下 select 的缺点就是句柄上限+重复初始化+逐个排查所有文件句柄状态效率不高。poll 主要解决 select

的前两个问题:通过改变跟内核交互的数据结构突破了文件描述符的限制,同时使用不同字段分别标注关注事件和发生事件,来避免重复初始化。

int poll(struct pollfd *fds, unsigned long nfds, int timeout); 

# fd:描述符 fd

# events:描述符上待检测的事件类型events,注意这里的 events 可以表示多个不同的事件,

#         具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLINPOLLOUT 可以表示读和写事件

# revents:

struct pollfd {

    int    fd;       /* file descriptor */    

    short  events;   /* events to look for */    

    short  revents;  /* events returned */ 

};

和 select 非常不同的地方在于:

  • poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。
  • 在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置;而在 poll 函数里,可以控制 pollfd 结构的数组大小,这意味着可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。

八、epoll

epoll 使用 eventpoll 作为中间层,线程不用加入在被监视的每一个 socket 阻塞队列中,也不用再遍历 socket

列表查看哪些有事件发生。

epoll提供了三个函数:

# 创建了一个 epoll 实例 epollevent

int epoll_create(int size);

# 往这个 epoll 实例增加或删除监控的事件

# epfd:epoll_create 创建的 epoll 实例句柄

# op:增加还是删除一个监控事件

# fd:注册的事件的文件描述符

# event:注册的事件类型,并且可以在这个结构体里设置用户需要的数据

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

# 类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发

# epfd:实例描述字,也就是 epoll 句柄

# events:用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定,这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样,这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据

# maxevents:大于 0 的整数,表示 epoll_wait 可以返回的最大事件值

# timeout:阻塞调用的超时值,如果这个值设置为 -1,表示不超时;如果设置为 0 则立即返回,即使没有任何 I/O 事件发生

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

8.1 代码示例
public class ServerConnect {

    private static final int BUF_SIZE = 1024;

    private static final int PORT = 8080;

    private static final int TIMEOUT = 3000;

    public static void main(String[] args) {

        selector();

    }

    public static void selector() {

        Selector selector = null;

        ServerSocketChannel ssc = null;

        try {

            // 打开一个Channel

            ssc = ServerSocketChannel.open();

            // 将Channel绑定端口

            ssc.socket().bind(new InetSocketAddress(PORT));

            // 设置Channel为非阻塞,如果设置为阻塞,其实和BIO差不多了。

            ssc.configureBlocking(false);

            // 打开一个Slectore

            selector = Selector.open();

            // 向selector中注册Channel和感兴趣的事件

            ssc.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {

                // selector监听事件,select会被阻塞,直到selector监听的channel中有事件发生或者超时,会返回一个事件数量

                //TIMEOUT就是超时时间,selector初始化的时候会添加一个用于主动唤醒的pipe,待会源码分析会说

                if (selector.select(TIMEOUT) == 0) {

                    System.out.println("==");

                    continue;

                }

                /**

                 * SelectionKey的组成是selector和Channel
                 * 有事件发生的channel会被包装成selectionKey添加到selector的publicSelectedKeys属性中
                 * publicSelectedKeys是SelectionKey的Set集合
                 *下面这一部分遍历,就是遍历有事件的channel
                 */
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    }
                    if (key.isReadable()) {
                        handleRead(key);
                    }
                    if (key.isWritable() && key.isValid()) {
                        handleWrite(key);
                    }
                    if (key.isConnectable()) {
                        System.out.println("isConnectable = true");
                    }
                    //每次使用完,必须将该SelectionKey移除,否则会一直存储在publicSelectedKeys中
                    //下一次遍历又会重复处理
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (selector != null) {
                    selector.close();
                }
                if (ssc != null) {
                    ssc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
        SocketChannel sc = ssChannel.accept();
        sc.configureBlocking(false);
        sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
    }
    public static void handleRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = sc.read(buf);
        while (bytesRead > 0) {
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = sc.read(buf);
        }
        if (bytesRead == -1) {
            sc.close();
        }
    }
    public static void handleWrite(SelectionKey key) throws IOException {
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip();
        SocketChannel sc = (SocketChannel) key.channel();
        while (buf.hasRemaining()) {
            sc.write(buf);
        }
        buf.compact();
    }
}

调用到内核的流程如下:

1、ssc = ServerSocketChannel.open() -> EPollSelectorProvider -> int

socket(int domain, int type, int protocol);

2、ssc.socket().bind(newInetSocketAddress(PORT)); -> int bind(int sockfd,

const struct sockaddr _addr, socklen_t addrlen);

3、getImpl().listen(backlog); -> int listen(int sockfd, int backlog);

4、ssc.configureBlocking(false); -> int fcntl(int fd, int cmd, … /_ arg

*/ );

5、selector=Selector.open(); -> EPollSelectorImpl epollCreate() -> int

epoll_create(int size);

6、ssc.register(selector,SelectionKey.OP_ACCEPT); -> int epoll_ctl(int

epfd, int op, int fd, struct epoll_event *event);

while

7、selector.select(TIMEOUT) -> int epoll_wait(int epfd, struct epoll_event

*events, int maxevents, int timeout);

8.2 epoll流程

epoll.png

如图,eventpoll 主要包含3个结构

  • 监视队列:epoll_ctl添加或删除所要监听的socket。例如:图中通过epoll_ctl添加sock1、sock2的监视,内核会将eventpoll添加到这socket的等待队列中(图中虚线)。为了方便的添加和移除,还要便于搜索,以避免重复添加,使用红黑树结构进行存储(搜索、插入和删除时间复杂度都是O(log(N)),效率较好。
  • 等待队列:eventpoll也是一个文件句柄,因此也有等待队列,记录阻塞的线程,例如:图中线程A 执行了 epoll_wait 操作,被从工作队列中移除,然后被eventpoll 的等待队列引用;
  • 就绪队列:当某个socket有事件发生时,中断处理程序就会把该socket加入到就绪队列,同时唤醒eventpoll 阻塞队列中的线程,此时线程只需要遍历就绪队列就可以知道哪个socket有事件发生,例如:图中sock2有数据到来时,中断处理程序先把sock2 放入就绪队列中,然后唤醒等待队列中的线程A,这时线程A 被重新加入工作队列中,等到CPU时间片轮询到线程A时,遍历就绪队列中的socket进行处理。

九、总结

select,poll,epoll都是IO多路复用机制,即可以同时监视多个文件描述符,一旦某个描述符就绪(读或写就绪),能够通知程序进行相应读写操作

  • select,poll需要多次遍历文件描述符集合 fd_set,把阻塞线程放到每一个socket中,当某个描述符就绪时再把这些线程从socket阻塞队列中移除;而epoll 通过eventpoll作为中介,只需要把eventpoll放到socket 的阻塞队列中,当有描述符就绪时只需要遍历就绪队列,而不需要遍历所有fd;
  • select,poll每次调用都要把整个fd_set集合从用户态往内核态拷贝一次,而epoll只要一次拷贝,这也能节省不少的开销。
正文完