1. 程式人生 > >多執行緒 執行緒池 sock IO複用

多執行緒 執行緒池 sock IO複用

前言
之前有看到用很幽默的方式講解Windows的socket IO模型,
借用這個故事,講解下linux的socket IO模型;

老陳有一個在外地工作的女兒,不能經常回來,老陳和她通過信件聯絡。
他們的信會被郵遞員投遞到他們小區門口的收發室裡。這和Socket模型非常類似。
下面就以老陳接收信件為例講解linux的 Socket I/O模型。

一、同步阻塞模型
老陳的女兒第一次去外地工作,送走她之後,老陳非常的掛心她安全到達沒有;
於是老陳什麼也不幹,一直在小區門口收發室裡等著她女兒的報平安的信到;

這就是linux的同步阻塞模式;
在這個模式中,使用者空間的應用程式執行一個系統呼叫,並阻塞,

直到系統呼叫完成為止(資料傳輸完成或發生錯誤)。

Socket設定為阻塞模式,當socket不能立即完成I/O操作時,程序或執行緒進入等待狀態,直到操作完成。
如圖1所示:



/*
 * \brief
 * tcp client
 */


#include
#include
#include
#include
#include
#define SERVPORT 8080
#define MAXDATASIZE 100


int main(int argc, char *argv[])
{
  int sockfd, recvbytes;

  char rcv_buf[MAXDATASIZE]; /*./client 127.0.0.1 hello */
  char snd_buf[MAXDATASIZE];
  struct hostent *host;             /* struct hostent
                                     * {
                                     * char *h_name; // general hostname
                                     * char **h_aliases; // hostname's alias

                                     * int h_addrtype; // AF_INET
                                     * int h_length; 
                                     * char **h_addr_list;
                                     * };
                                     */
  struct sockaddr_in server_addr;


  if (argc < 3)
  {
    printf("Usage:%s [ip address] [any string]\n", argv[0]);
    return 1;
  }


  *snd_buf = '\0';
  strcat(snd_buf, argv[2]);


  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  {
    perror("socket:");
    exit(1);
  }


  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(SERVPORT);
  inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
  memset(&(server_addr.sin_zero), 0, 8);


  /* create the connection by socket 
   * means that connect "sockfd" to "server_addr"
   * 同步阻塞模式 
   */
  if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
  {
    perror("connect");
    exit(1);
  }


  /* 同步阻塞模式  */
  if (send(sockfd, snd_buf, sizeof(snd_buf), 0) == -1)
  {
    perror("send:");
    exit(1);
  }
  printf("send:%s\n", snd_buf);


   /* 同步阻塞模式  */
  if ((recvbytes = recv(sockfd, rcv_buf, MAXDATASIZE, 0)) == -1)
  {
    perror("recv:");
    exit(1);
  }


  rcv_buf[recvbytes] = '\0';
  printf("recv:%s\n", rcv_buf);


  close(sockfd);
  return 0;
}

顯然,程式碼中的connect, send, recv都是同步阻塞工作模式,
在結果沒有返回時,程式什麼也不做。
這種模型非常經典,也被廣泛使用。
優勢在於非常簡單,等待的過程中佔用的系統資源微乎其微,程式呼叫返回時,必定可以拿到資料;
但簡單也帶來一些缺點,程式在資料到來並準備好以前,不能進行其他操作,
需要有一個執行緒專門用於等待,這種代價對於需要處理大量連線的伺服器而言,是很難接受的。

二、同步非阻塞模型
收到平安信後,老陳稍稍放心了,就不再一直在收發室前等信;
而是每隔一段時間就去收發室檢查信箱;
這樣,老陳也能在間隔時間內休息一會,或喝杯荼,看會電視,做點別的事情;

這就是同步非阻塞模型;
同步阻塞 I/O 的一種效率稍低的變種是同步非阻塞 I/O。
在這種模型中,系統呼叫是以非阻塞的形式開啟的。
這意味著 I/O 操作不會立即完成, 操作可能會返回一個錯誤程式碼,
說明這個命令不能立即滿足(EAGAIN 或 EWOULDBLOCK),
非阻塞的實現是 I/O 命令可能並不會立即滿足,需要應用程式呼叫許多次來等待操作完成。
這可能效率不高,
因為在很多情況下,當核心執行這個命令時,應用程式必須要進行忙碌等待,直到資料可用為止,或者試圖執行其他工作。
因為資料在核心中變為可用到使用者呼叫 read 返回資料之間存在一定的間隔,這會導致整體資料吞吐量的降低。
如圖2所示:



/*
 * \brief
 * tcp client
 */


#include
#include
#include
#include
#include
#include
#include
#include
#include

#define SERVPORT 8080
#define MAXDATASIZE 100

int main(int argc, char *argv[])
{
  int sockfd, recvbytes;
  char rcv_buf[MAXDATASIZE]; /*./client 127.0.0.1 hello */
  char snd_buf[MAXDATASIZE];
  struct hostent *host;             /* struct hostent
                                     * {
                                     * char *h_name; // general hostname
                                     * char **h_aliases; // hostname's alias
                                     * int h_addrtype; // AF_INET
                                     * int h_length; 
                                     * char **h_addr_list;
                                     * };
                                     */
  struct sockaddr_in server_addr;
  int flags;
  int addr_len;


  if (argc < 3)
  {
    printf("Usage:%s [ip address] [any string]\n", argv[0]);
    return 1;
  }


  *snd_buf = '\0';
  strcat(snd_buf, argv[2]);


  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  {
    perror("socket:");
    exit(1);
  }


  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(SERVPORT);
  inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
  memset(&(server_addr.sin_zero), 0, 8);
  addr_len = sizeof(struct sockaddr_in);


  /* Setting socket to nonblock */
  flags = fcntl(sockfd, F_GETFL, 0);
  fcntl(sockfd, flags|O_NONBLOCK);


  /* create the connection by socket 
   * means that connect "sockfd" to "server_addr"
   * 同步阻塞模式  
  */
  if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
  {
    perror("connect");
    exit(1);
  }


  /* 同步非阻塞模式 */
  while (send(sockfd, snd_buf, sizeof(snd_buf), MSG_DONTWAIT) == -1)
  {
    sleep(10);
    printf("sleep\n");
  }
  printf("send:%s\n", snd_buf);

  /* 同步非阻塞模式 */
  while ((recvbytes = recv(sockfd, rcv_buf, MAXDATASIZE, MSG_DONTWAIT)) == -1)
  {
    sleep(10);
    printf("sleep\n");
  }


  rcv_buf[recvbytes] = '\0';
  printf("recv:%s\n", rcv_buf);


  close(sockfd);
  return 0;
}

這種模式在沒有資料可以接收時,可以進行其他的一些操作,
比如有多個socket時,可以去檢視其他socket有沒有可以接收的資料;
實際應用中,這種I/O模型的直接使用並不常見,因為它需要不停的查詢,
而這些查詢大部分會是無必要的呼叫,白白浪費了系統資源;
非阻塞I/O應該算是一個鋪墊,為I/O複用和訊號驅動奠定了非阻塞使用的基礎。


我們可以使用 fcntl(fd, F_SETFL, flag | O_NONBLOCK); 
將套接字標誌變成非阻塞,呼叫recv,
如果裝置暫時沒有資料可讀就返回-1,同時置errno為EWOULDBLOCK(或者EAGAIN,這兩個巨集定義的值相同),
表示本來應該阻塞在這裡(would block,虛擬語氣),事實上並沒有阻塞而是直接返回錯誤,呼叫者應該試著再讀一次(again)。
這種行為方式稱為輪詢(Poll),呼叫者只是查詢一下,而不是阻塞在這裡死等,這樣可以同時監視多個裝置:

while(1)
{
  非阻塞read(裝置1);
  if(裝置1有資料到達)
    處理資料;

  非阻塞read(裝置2);
  if(裝置2有資料到達)
    處理資料;

  ..............................
}

如果read(裝置1)是阻塞的,那麼只要裝置1沒有資料到達就會一直阻塞在裝置1的read呼叫上,
即使裝置2有資料到達也不能處理,使用非阻塞I/O就可以避免裝置2得不到及時處理。
非阻塞I/O有一個缺點,如果所有裝置都一直沒有資料到達,呼叫者需要反覆查詢做無用功,如果阻塞在那裡,
作業系統可以排程別的程序執行,就不會做無用功了,在實際應用中非阻塞I/O模型比較少用

三、I/O複用(非同步阻塞)模式
頻繁地去收發室對老陳來說太累了,在間隔的時間內能做的事也很少,而且取到信的效率也很低.
於是,老陳向小區物業提了建議;
小區物業改進了他們的信箱系統:
住戶先向小區物業註冊,之後小區物業會在已註冊的住戶的家中新增一個提醒裝置,
每當有註冊住房的新的信件來臨,此裝置會發出 "新信件到達"聲,
提醒老陳去看是不是自己的信到了。


這就是非同步阻塞模型;
在這種模型中,配置的是非阻塞 I/O,然後使用阻塞 select 系統呼叫來確定一個 I/O 描述符何時有操作。
使 select 呼叫非常有趣的是它可以用來為多個描述符提供通知,而不僅僅為一個描述符提供通知。
對於每個提示符來說,我們可以請求這個描述符可以寫資料、有讀資料可用以及是否發生錯誤的通知

I/O複用模型能讓一個或多個socket可讀或可寫準備好時,應用能被通知到;
I/O複用模型早期用select實現,它的工作流程如下圖:
圖3



用select來管理多個I/O,當沒有資料時select阻塞,如果在超時時間內資料到來則select返回,
再呼叫recv進行資料的複製,recv返回後處理資料。


下面的C語言實現的例子,它從網路上接受資料寫入一個檔案中:
/*
 * \brief
 * tcp client
 */

#include
#include
#include
#include
#include
#include
#include

#include
#include
#include
#define SERVPORT 8080
#define MAXDATASIZE 100
#define TFILE "data_from_socket.txt"

int main(int argc, char *argv[])
{
  int sockfd, recvbytes;
  char rcv_buf[MAXDATASIZE]; /*./client 127.0.0.1 hello */
  char snd_buf[MAXDATASIZE];
  struct hostent *host;             /* struct hostent
                                     * {
                                     * char *h_name; // general hostname
                                     * char **h_aliases; // hostname's alias
                                     * int h_addrtype; // AF_INET
                                     * int h_length; 
                                     * char **h_addr_list;
                                     * };
                                     */
  struct sockaddr_in server_addr;

  /* */
  fd_set readset, writeset;
  int check_timeval = 1;
  struct timeval timeout={check_timeval,0}; //阻塞式select, 等待1秒,1秒輪詢
  int maxfd;
  int fp;
  int cir_count = 0;
  int ret;

  if (argc < 3)
  {
    printf("Usage:%s [ip address] [any string]\n", argv[0]);
    return 1;
  }

  *snd_buf = '\0';
  strcat(snd_buf, argv[2]);

  if ((fp = open(TFILE,O_WRONLY)) < 0)    //不是用fopen
  {
    perror("fopen:");
    exit(1);
  }

  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  {
    perror("socket:");
    exit(1);
  }

  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(SERVPORT);
  inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
  memset(&(server_addr.sin_zero), 0, 8);


  /* create the connection by socket 
   * means that connect "sockfd" to "server_addr"
   */
  if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
  {
    perror("connect");
    exit(1);
  }




  /**/
  if (send(sockfd, snd_buf, sizeof(snd_buf), 0) == -1)
  {
    perror("send:");
    exit(1);
  }
  printf("send:%s\n", snd_buf);

  while (1)
  {
    FD_ZERO(&readset);            //每次迴圈都要清空集合,否則不能檢測描述符變化
    FD_SET(sockfd, &readset);     //新增描述符       
    FD_ZERO(&writeset);
    FD_SET(fp,     &writeset);


    maxfd = sockfd > fp ? (sockfd+1) : (fp+1);    //描述符最大值加1


    ret = select(maxfd, &readset, NULL, NULL, NULL);   // 阻塞模式
    switch( ret)
    {
      case -1:
        exit(-1);
        break;
      case 0:
        break;
      default:
        if (FD_ISSET(sockfd, &readset))  //測試sock是否可讀,即是否網路上有資料
        {
          recvbytes = recv(sockfd, rcv_buf, MAXDATASIZE, MSG_DONTWAIT);
          rcv_buf[recvbytes] = '\0';
          printf("recv:%s\n", rcv_buf);


          if (FD_ISSET(fp, &writeset))
          {
            write(fp, rcv_buf, strlen(rcv_buf));   // 不是用fwrite
          }
          goto end;
        }
    }
    cir_count++;
    printf("CNT : %d \n",cir_count);
  }


end:
  close(fp);
  close(sockfd);




  return 0;
}

perl實現:
#! /usr/bin/perl
###############################################################################
# \File
#  tcp_client.pl
# \Descript
#  send message to server
###############################################################################
use IO::Socket;
use IO::Select;




#hash to install IP Port
%srv_info =(
#"srv_ip"  => "61.184.93.197",
      "srv_ip"  => "192.168.1.73",
      "srv_port"=> "8080",
      );

my $srv_addr = $srv_info{"srv_ip"};
my $srv_port = $srv_info{"srv_port"};

my $sock = IO::Socket::INET->new(
      PeerAddr => "$srv_addr",
      PeerPort => "$srv_port",
      Type     => SOCK_STREAM,
      Blocking => 1,
#     Timeout  => 5,
      Proto    => "tcp")
or die "Can not create socket connect. $@";

$sock->send("Hello server!\n", 0) or warn "send failed: $!, $@";
$sock->autoflush(1);

my $sel = IO::Select->new($sock);
while(my @ready = $sel->can_read)
{
  foreach my $fh(@ready)
  {
    if($fh == $sock)
    {
      while()
      {
        print $_;
      }
      $sel->remove($fh);
      close $fh;
    }
  }
}
$sock->close();

四、訊號驅動I/O模型
老陳接收到新的信件後,一般的程式是:
開啟信封----掏出信紙 ----閱讀信件----回覆信件 ......
為了進一步減輕使用者負擔,小區物業又開發了一種新的技術:
住戶只要告訴小區物業對信件的操作步驟,小區物業信箱將按照這些步驟去處理信件,
不再需要使用者親自拆信 /閱讀/回覆了!

這就是訊號驅動I/O模型
我們也可以用訊號,讓核心在描述字就緒時傳送SIGIO訊號通知我們。
首先開啟套介面的訊號驅動 I/O功能,並通過sigaction系統呼叫安裝一個訊號處理函式。
該系統呼叫將立即返回,我們的程序繼續工作,也就是說沒被阻塞。
當資料報準備好讀取時,核心就為該程序產生一個SIGIO訊號,
我們隨後既可以在訊號處理函式中呼叫recvfrom讀取資料報,並通知主迴圈資料已準備好待處理,
也可以立即通知主迴圈,讓它讀取資料報。


無論如何處理SIGIO訊號,這種模型的優勢在於等待資料報到達期間,程序不被阻塞,主迴圈可以繼續執行,
只要不時地等待來自訊號處理函式的通知:既可以是資料已準備好被處理,也可以是資料報已準備好被讀取。

五、非同步非阻塞模式
linux下的asynchronous IO其實用得很少。
與前面的訊號驅動模型的主要區別在於:訊號驅動 I/O是由核心通知我們何時可以啟動一個 I/O操作,
而非同步 I/O模型是由核心通知我們 I/O操作何時完成 。 
先看一下它的流程:
圖5:

這就是非同步非阻塞模式
以read系統呼叫為例
steps:
a. 呼叫read;
b. read請求會立即返回,說明請求已經成功發起了。
c. 在後臺完成讀操作這段時間內,應用程式可以執行其他處理操作。
d. 當 read 的響應到達時,就會產生一個訊號或執行一個基於執行緒的回撥函式來完成這次 I/O 處理過程。

/*
 * \brief
 * tcp client
 */

#include
#include
#include
#include
#include
#include
#include

#include
#include
#include
#define SERVPORT 8080
#define MAXDATASIZE 100
#define TFILE "data_from_socket.txt"




int main(int argc, char *argv[])
{
  int sockfd, recvbytes;
  char rcv_buf[MAXDATASIZE]; /*./client 127.0.0.1 hello */
  char snd_buf[MAXDATASIZE];
  struct hostent *host;             /* struct hostent
                                     * {
                                     * char *h_name; // general hostname
                                     * char **h_aliases; // hostname's alias
                                     * int h_addrtype; // AF_INET
                                     * int h_length; 
                                     * char **h_addr_list;
                                     * };
                                     */
  struct sockaddr_in server_addr;




  /* */
  fd_set readset, writeset;
  int check_timeval = 1;
  struct timeval timeout={check_timeval,0}; //阻塞式select, 等待1秒,1秒輪詢
  int maxfd;
  int fp;
  int cir_count = 0;
  int ret;




  if (argc < 3)
  {
    printf("Usage:%s [ip address] [any string]\n", argv[0]);
    return 1;
  }




  *snd_buf = '\0';
  strcat(snd_buf, argv[2]);




  if ((fp = open(TFILE,O_WRONLY)) < 0)    //不是用fopen
  {
    perror("fopen:");
    exit(1);
  }




  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  {
    perror("socket:");
    exit(1);
  }




  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(SERVPORT);
  inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
  memset(&(server_addr.sin_zero), 0, 8);




  /* create the connection by socket 
   * means that connect "sockfd" to "server_addr"
   */
  if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
  {
    perror("connect");
    exit(1);
  }




  /**/
  if (send(sockfd, snd_buf, sizeof(snd_buf), 0) == -1)
  {
    perror("send:");
    exit(1);
  }
  printf("send:%s\n", snd_buf);




  while (1)
  {
    FD_ZERO(&readset);            //每次迴圈都要清空集合,否則不能檢測描述符變化
    FD_SET(sockfd, &readset);     //新增描述符       
    FD_ZERO(&writeset);
    FD_SET(fp,     &writeset);


    maxfd = sockfd > fp ? (sockfd+1) : (fp+1);    //描述符最大值加1


    ret = select(maxfd, &readset, NULL, NULL, &timeout);   // 非阻塞模式
    switch( ret)
    {
      case -1:
        exit(-1);
        break;
      case 0:
        break;
      default:
        if (FD_ISSET(sockfd, &readset))  //測試sock是否可讀,即是否網路上有資料
        {
          recvbytes = recv(sockfd, rcv_buf, MAXDATASIZE, MSG_DONTWAIT);
          rcv_buf[recvbytes] = '\0';
          printf("recv:%s\n", rcv_buf);




          if (FD_ISSET(fp, &writeset))
          {
            write(fp, rcv_buf, strlen(rcv_buf));   // 不是用fwrite
          }
          goto end;
        }
    }
    timeout.tv_sec = check_timeval;    // 必須重新設定,因為超時時間到後會將其置零


    cir_count++;
    printf("CNT : %d \n",cir_count);
  }


end:
  close(fp);
  close(sockfd);


  return 0;
}


server端程式:
/*
 * \brief
 * tcp server
 */
#include
#include
#include
#include
#include
#include
#include
#define SERVPORT 8080
#define BACKLOG 10 // max numbef of client connection
#define MAXDATASIZE 100




int main(char argc, char *argv[])
{
  int sockfd, client_fd, addr_size, recvbytes;
  char rcv_buf[MAXDATASIZE], snd_buf[MAXDATASIZE];
  char* val;
  struct sockaddr_in server_addr;
  struct sockaddr_in client_addr;
  int bReuseaddr = 1;




  char IPdotdec[20];




  /* create a new socket and regiter it to os .
   * SOCK_STREAM means that supply tcp service, 
   * and must connect() before data transfort.
   */
  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  {
    perror("socket:");
    exit(1);
  }


  /* setting server's socket */
  server_addr.sin_family = AF_INET;         // IPv4 network protocol
  server_addr.sin_port = htons(SERVPORT);
  server_addr.sin_addr.s_addr = INADDR_ANY; // auto IP detect
  memset(&(server_addr.sin_zero),0, 8);


  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(int));
  if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr))== -1)
  {
    perror("bind:");
    exit(1);
  }


  /* 
   * watting for connection , 
   * and server permit to recive the requestion from sockfd 
   */
  if (listen(sockfd, BACKLOG) == -1) // BACKLOG assign thd max number of connection
  {
    perror("listen:");
    exit(1);                                                                 
  }                                                                          

  while(1)                                                                   
  {                                                                          
    addr_size = sizeof(struct sockaddr_in);                                  

    /*                                                                       
     * accept the sockfd's connection,                                       
     * return an new socket and assign far host to client_addr               
     */                                                                      
    printf("watting for connect...\n");                                      
    if ((client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &addr_size)) == -1)   
    {                                                                        
      /* Nonblocking mode */                                                 
      perror("accept:");                                                     
      continue;                                                              
    }                                                                        

    /* network-digital to ip address */                                      
    inet_ntop(AF_INET, (void*)&client_addr, IPdotdec, 16);                   
    printf("connetion from:%d : %s\n",client_addr.sin_addr, IPdotdec);       

    //if (!fork())                                                           
    {                                                                        
      /* child process handle with the client connection */                  

      /* recive the client's data by client_fd */                            
      if ((recvbytes = recv(client_fd, rcv_buf, MAXDATASIZE, 0)) == -1)      
      {                                                                      
        perror("recv:");                                                     
        exit(1);                                                             
      }                                                                      
      rcv_buf[recvbytes]='\0';                                               
      printf("recv:%s\n", rcv_buf);                                          


      *snd_buf='\0';                                                         
      strcat(snd_buf, "welcome");                                            

      sleep(3);                                                              
      /* send the message to far-hosts by client_fd */                       
      if (send(client_fd, snd_buf, strlen(snd_buf), 0) == -1)                
      {                                                                      
        perror("send:");                                                     
        exit(1);                                                             
      }                                                                      
      printf("send:%s\n", snd_buf);                                          

      close(client_fd);