I/O多路複用(select)
在之前寫過一篇五種I/O模型,感興趣的可以去看一下,今天主要講其中的一種,那就是I/O多路複用。因為I/O多路複用可以使一個程序同時處理多個連線。這對提高程式的效能至關重要。對於IO複用的概念與理解在上文說的挺清楚了。本文主要說實現IO複用的系統呼叫。
在linux下,實現IO複用的系統呼叫主要有三個:select、poll 和 epoll,下面我們將對其進行逐一講解。
select
Select是通過將需要監聽的檔案描述符加入相應的檔案描述符集合(readset、writeset,exceptset),由核心負責監視相應的檔案描述符是否就緒。
select API
select函式原形如下:
#include <sys/select.h>
#include <sys/time.h>
int select(int nfds,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
引數
Select函式的第一個引數是nfds,是監聽的檔案描述符的集合中的最大檔案描述符加1,他會告訴核心需要監聽的檔案描述符的個數。Select內部的實現是一個迴圈遍歷1024個檔案描述符,而這個nfds就是迴圈的上限。因為檔案描述符石從0開始的,迴圈從0開始,[0 , nfds),所以是最大檔案描述符加1。
第二個引數readset
這三個引數都是fd_set 結構體型別指標,fd_set結構體的定義如下:
#include <sys/select.h>
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
fd_set結構體的定義實際包含的是fds_bits位陣列,該陣列的每個元素的每一位標記一個檔案描述符其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在當前核心中數值為1024,可見每次select系統呼叫可監聽處理的檔案描述符最大數量為1024。
由於位操作過於繁瑣,所以我們採用下列一些的巨集來操作該結構體中的位:
FD_ZERO(fd_set *fdset);將指定的檔案描述符集清空。在對檔案描述符集合進行設定前,必須對其進行初始化,如果不清空,由於在系統分配記憶體空間後,通常並不作清空處理,所以結果是不可知的。
FD_SET(fd_set *fdset);用於在檔案描述符集合中增加一個新的檔案描述符。
FD_CLR(fd_set *fdset);用於在檔案描述符集合中刪除一個檔案描述符。
FD_ISSET(int fd,fd_set *fdset);用於測試指定的檔案描述符是否在該集合中。
最後一個引數是超時時間,是一個struct timeval結構體指標,該結構體定義如下:
有兩個成員,一個是秒,一個是毫秒,
struct timeval{
long tv_sec; //second
long tv_usec; //microseconds
}
超時時間可以設定到微秒級別,有三種設定情況:
NULL:阻塞等待,直到某個檔案描述符上發生了事件。
0:僅檢測描述符集合的狀態,然後立即返回。
> 0: 指定超時時間,如果在該時間段裡沒有事件發生,select將超時返回。
返回值
成功:返回就緒的檔案描述符(可讀、可寫、異常)的總數。
超時:返回0
失敗:返回-1,並設定errno
若在等待的過程中被訊號打斷,也返回-1,errno設定為EINTR。
檔案描述符就緒的條件
在網路程式設計中,下列情況下socket可讀:
1、socket核心接收快取區中的位元組數大於或等於其低水位標記SO_RCVLOWAT。此時可以無阻塞地讀該socket,並且讀操作返回的位元組數大於0。
2、socket通訊對方關閉連線。此時對該socket讀操作將返回0。
3、監聽socket上有新的連線請求。
4、socket上有未處理的錯誤。此時我們可以使用getsockopt來讀取和清除該錯誤。
下列情況下socket可寫:
1、socket核心傳送緩衝區中的可用位元組數大於或等於其低水位標記SO_SNDLOWAT。此時我們可以無阻塞寫該socket,並且寫操作返回的位元組數大於0。
2、socket寫操作被關閉。對寫操作被關閉的socket執行寫操作將觸發一個SIGPIPE訊號。
3、socket使用非阻塞connect連線成功或者失敗(超時)之後。
4、socket上有未處理的錯誤。此時我們可以使用getsockopt來讀取和清除該錯誤。
網路程式中,select能處理的異常情況只有一種:socket上接收到帶外資料。
不足:Select監聽的最大檔案描述符受限於FD_SETSIZE,UNIX系統通常會在標頭檔案 “sys/select.h” 中定義常量FD_SETSIZE,一般為1024,要想更改需要重新編譯核心。而且因為select採取的是輪詢機制,當監聽的檔案描述符過多的話,效率會大大折扣。
在下文中將介紹一種新的實現IO複用的函式,poll,與select相比,poll突破了最大檔案描述符是1024的限制。
程式示例
以select實現的簡單回射伺服器作為本文的結束。
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<ctype.h>
#define MYPORT 8888
#define BACKLOG 10
#define MAXDATASIZE 1024
int main()
{
char buf[MAXDATASIZE];
int numbytes;
int sock_fd,new_fd,connfd; //定義主動套接字和被動套接字
//定義IPV4套介面地址結構
struct sockaddr_in my_addr; //service 地址
struct sockaddr_in their_addr; //client 地址
int sin_size;
//初始化IPV4套介面地址結構
my_addr.sin_family =AF_INET; //指定該地址家族
my_addr.sin_port =htons(MYPORT); //埠
my_addr.sin_addr.s_addr = INADDR_ANY; //IPV4的地址
bzero(&(my_addr.sin_zero),8);
//socket()函式
if((sock_fd = socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket");
exit(1);
}
//地址重複利用
int on = 1;
if(setsockopt(sock_fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
{
perror("setsockopt");
exit(1);
}
//bind()函式
if(bind(sock_fd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1)
{
perror("bind");
exit(1);
}
//listen()函式
if(listen(sock_fd,BACKLOG)==-1)
{
perror("listen");
exit(1);
}
int i;
int client[FD_SETSIZE];//儲存客戶端的套接字
//初始化
for(i = 0;i < FD_SETSIZE;i++)
{
client[i] = -1;
}
int nready = 0; //接受select的返回值
int maxi = -1; //儲存陣列下標
int maxfd = sock_fd;//初始化nfds
fd_set rset; //定義可讀事件集合
fd_set allset; //備份
FD_ZERO(&allset);
FD_SET(sock_fd,&allset);
while(1)
{
rset = allset;
nready = select(maxfd +1,&rset,NULL,NULL,NULL);
if(nready < 0)
{
perror("select error!\n");
exit(1);
}
else if(0 == nready)
{
continue;
}
//當客戶端請求連線
if(FD_ISSET(sock_fd,&rset))
{
sin_size = sizeof(struct sockaddr_in);
if((new_fd=accept(sock_fd,(struct sockaddr *)&their_addr,&sin_size))==-1)
{
perror("accept");
exit(1);
}
printf("client IP: %s\t PORT : %d\n",inet_ntoa(their_addr.sin_addr),ntohs(their_addr.sin_port));
//將客戶端的套接字存入client[]
for(i = 0;i < FD_SETSIZE;i++)
{
if(client[i] < 0)
{
client[i] = new_fd;
break; //找到合適位置就沒必要繼續遍歷
}
}
//判斷是否達到連線上限
if(i == FD_SETSIZE)
{
printf("too many client!\n");
break;
}
//將新加入的客戶端放入監聽隊伍
FD_SET(new_fd,&allset);
//更新nfds
if(new_fd > maxfd)
{
maxfd = new_fd;
}
//更新maxi
if(i > maxi)
{
maxi = i;
}
//判斷是否已經處理完事件
if((--nready) == 0)
{
continue;
}
}
//當客戶端傳送資料
for(i = 0;i <= maxi;i++)
{
if((connfd = client[i]) < 0)
{
continue;
}
if(FD_ISSET(connfd,&rset))
{
memset(buf,0,sizeof(buf));
numbytes = recv(connfd,buf,MAXDATASIZE,0);
if(numbytes == -1)
{
perror("recv\n");
exit(1);
}
else if(numbytes == 0)
{
printf("client close!");
FD_CLR(connfd,&allset);
client[i] = -1;
}
send(connfd,buf,numbytes,0);
}
if((--nready) == 0)
{
continue;
}
}
}
close(sock_fd);
close(new_fd);
return 0;
}