1. 程式人生 > >I/O多路複用(select)

I/O多路複用(select)

  在之前寫過一篇五種I/O模型,感興趣的可以去看一下,今天主要講其中的一種,那就是I/O多路複用。因為I/O多路複用可以使一個程序同時處理多個連線。這對提高程式的效能至關重要。對於IO複用的概念與理解在上文說的挺清楚了。本文主要說實現IO複用的系統呼叫。
  在linux下,實現IO複用的系統呼叫主要有三個:selectpollepoll,下面我們將對其進行逐一講解。

select

  Select是通過將需要監聽的檔案描述符加入相應的檔案描述符集合(readset、writeset,exceptset),由核心負責監視相應的檔案描述符是否就緒。

select API

select函式原形如下:

#include <sys/select.h>
#include <sys/time.h>

int select(int nfds,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

引數

  Select函式的第一個引數是nfds,是監聽的檔案描述符的集合中的最大檔案描述符加1,他會告訴核心需要監聽的檔案描述符的個數。Select內部的實現是一個迴圈遍歷1024個檔案描述符,而這個nfds就是迴圈的上限。因為檔案描述符石從0開始的,迴圈從0開始,[0 , nfds),所以是最大檔案描述符加1。
  
  第二個引數readset

是讀事件集合,第三個引數writeset是寫事件集合,第四個exceptset是異常事件集合,這三個引數都是傳入傳出引數。使用者可以將需要監聽的檔案描述符加入對應的集合中,若如果對某一個集合不感興趣,就可以把它設為NULL。核心通過最這些引數的線上修改反饋其中就緒的事件。所以每次呼叫select都需要重置這三個引數
  這三個引數都是fd_set 結構體型別指標,fd_set結構體的定義如下:
  

#include <sys/select.h>

typedef long int __fd_mask;
#define __NFDBITS       (8 * (int) sizeof (__fd_mask))
typedef struct { /* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */ #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->fds_bits) #else __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->__fds_bits) #endif } fd_set;

  fd_set結構體的定義實際包含的是fds_bits位陣列,該陣列的每個元素的每一位標記一個檔案描述符其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在當前核心中數值為1024,可見每次select系統呼叫可監聽處理的檔案描述符最大數量為1024。
  
  由於位操作過於繁瑣,所以我們採用下列一些的巨集來操作該結構體中的位:
  FD_ZERO(fd_set *fdset);將指定的檔案描述符集清空。在對檔案描述符集合進行設定前,必須對其進行初始化,如果不清空,由於在系統分配記憶體空間後,通常並不作清空處理,所以結果是不可知的。

  FD_SET(fd_set *fdset);用於在檔案描述符集合中增加一個新的檔案描述符。

  FD_CLR(fd_set *fdset);用於在檔案描述符集合中刪除一個檔案描述符。

  FD_ISSET(int fd,fd_set *fdset);用於測試指定的檔案描述符是否在該集合中。
  最後一個引數是超時時間,是一個struct timeval結構體指標,該結構體定義如下:
  有兩個成員,一個是秒,一個是毫秒,

struct timeval{

  long tv_sec;    //second

  long tv_usec;   //microseconds

  }

超時時間可以設定到微秒級別,有三種設定情況:

  NULL:阻塞等待,直到某個檔案描述符上發生了事件。

  0:僅檢測描述符集合的狀態,然後立即返回。

  > 0: 指定超時時間,如果在該時間段裡沒有事件發生,select將超時返回。

返回值

  成功:返回就緒的檔案描述符(可讀、可寫、異常)的總數。
  超時:返回0
  失敗:返回-1,並設定errno
  若在等待的過程中被訊號打斷,也返回-1,errno設定為EINTR。

檔案描述符就緒的條件

在網路程式設計中,下列情況下socket可讀:
  1、socket核心接收快取區中的位元組數大於或等於其低水位標記SO_RCVLOWAT。此時可以無阻塞地讀該socket,並且讀操作返回的位元組數大於0。

  2、socket通訊對方關閉連線。此時對該socket讀操作將返回0。

  3、監聽socket上有新的連線請求。

  4、socket上有未處理的錯誤。此時我們可以使用getsockopt來讀取和清除該錯誤。

下列情況下socket可寫:
  1、socket核心傳送緩衝區中的可用位元組數大於或等於其低水位標記SO_SNDLOWAT。此時我們可以無阻塞寫該socket,並且寫操作返回的位元組數大於0。

  2、socket寫操作被關閉。對寫操作被關閉的socket執行寫操作將觸發一個SIGPIPE訊號。

  3、socket使用非阻塞connect連線成功或者失敗(超時)之後。

  4、socket上有未處理的錯誤。此時我們可以使用getsockopt來讀取和清除該錯誤。

網路程式中,select能處理的異常情況只有一種:socket上接收到帶外資料。

不足:Select監聽的最大檔案描述符受限於FD_SETSIZE,UNIX系統通常會在標頭檔案 “sys/select.h” 中定義常量FD_SETSIZE,一般為1024,要想更改需要重新編譯核心。而且因為select採取的是輪詢機制,當監聽的檔案描述符過多的話,效率會大大折扣。
 在下文中將介紹一種新的實現IO複用的函式,poll,與select相比,poll突破了最大檔案描述符是1024的限制。

程式示例

  以select實現的簡單回射伺服器作為本文的結束。
  

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<ctype.h>


#define   MYPORT  8888
#define  BACKLOG  10
#define MAXDATASIZE 1024

int main()
{
    char buf[MAXDATASIZE];
    int numbytes;

    int sock_fd,new_fd,connfd;   //定義主動套接字和被動套接字

    //定義IPV4套介面地址結構
    struct sockaddr_in my_addr;        //service 地址

    struct sockaddr_in their_addr;     //client 地址
    int sin_size;

    //初始化IPV4套介面地址結構

    my_addr.sin_family =AF_INET; //指定該地址家族
    my_addr.sin_port =htons(MYPORT);  //埠
    my_addr.sin_addr.s_addr = INADDR_ANY;  //IPV4的地址
    bzero(&(my_addr.sin_zero),8);

    //socket()函式
    if((sock_fd = socket(AF_INET,SOCK_STREAM,0))==-1)
    {
        perror("socket");
        exit(1);

    }

    //地址重複利用
    int on = 1;
    if(setsockopt(sock_fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
    {
        perror("setsockopt");
        exit(1);
    }

    //bind()函式
    if(bind(sock_fd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    //listen()函式
    if(listen(sock_fd,BACKLOG)==-1)
    {
        perror("listen");
        exit(1);
    }

    int i;
    int client[FD_SETSIZE];//儲存客戶端的套接字

    //初始化
    for(i = 0;i < FD_SETSIZE;i++)
    {
        client[i] = -1;
    }

    int nready = 0; //接受select的返回值
    int maxi = -1; //儲存陣列下標
    int maxfd = sock_fd;//初始化nfds

    fd_set rset;  //定義可讀事件集合
    fd_set allset; //備份

    FD_ZERO(&allset);
    FD_SET(sock_fd,&allset);


   while(1)
   {
       rset = allset;

       nready = select(maxfd +1,&rset,NULL,NULL,NULL);

       if(nready < 0)
       {
           perror("select error!\n");
           exit(1);
       }
       else if(0 == nready)
       {
           continue;
       }

       //當客戶端請求連線
       if(FD_ISSET(sock_fd,&rset))
       {

           sin_size = sizeof(struct sockaddr_in);
           if((new_fd=accept(sock_fd,(struct sockaddr *)&their_addr,&sin_size))==-1)
           {
               perror("accept");
               exit(1);
           }
           printf("client IP: %s\t PORT : %d\n",inet_ntoa(their_addr.sin_addr),ntohs(their_addr.sin_port));

           //將客戶端的套接字存入client[]
           for(i = 0;i < FD_SETSIZE;i++)
           {
               if(client[i] < 0)
               {
                   client[i] = new_fd;
                   break; //找到合適位置就沒必要繼續遍歷
               }
           }

           //判斷是否達到連線上限
           if(i == FD_SETSIZE)
           {
               printf("too many client!\n");
               break;
           }
           //將新加入的客戶端放入監聽隊伍
           FD_SET(new_fd,&allset);

           //更新nfds
           if(new_fd > maxfd)
           {
               maxfd = new_fd;
           }

           //更新maxi
           if(i > maxi)
           {
               maxi = i;
           }
           //判斷是否已經處理完事件
           if((--nready) == 0)
           {
               continue;
           }
       }
   //當客戶端傳送資料
       for(i = 0;i <= maxi;i++)
       {
           if((connfd = client[i]) < 0)
           {
               continue;
           }
           if(FD_ISSET(connfd,&rset))
           {

               memset(buf,0,sizeof(buf));
               numbytes = recv(connfd,buf,MAXDATASIZE,0);

               if(numbytes == -1)
               {
                   perror("recv\n");
                   exit(1);
               }
               else if(numbytes == 0)
               {
                   printf("client close!");
                   FD_CLR(connfd,&allset);
                   client[i] = -1;
               }
               send(connfd,buf,numbytes,0);
           }
           if((--nready) == 0)
           {
               continue;
           }
       }
   }

   close(sock_fd);
   close(new_fd);


   return 0;
}