I/Oの多重化
I/Oの多重化(I/O multiplexing)
2つ以上のI/Oに対して、どれかが入出力可能になった場合の通知をカーネルに依頼する機能I/Oの多重化が用いられる状況
I/Oモデル
最も一般的に用いられる。
ソケットを非ブロッキングに設定することは、そのソケットに要求したI/O操作が、プロセスをスリープ状態におかないと完了しないのであれば、プロセスをスリープ状態にせずエラーを返すことをカーネルに要求することになる。
I/Oの多重化(I/O multiplexing)を行う場合は、selectあるいはpollを呼び出し、これら関数の中でブロックする。利点は、複数のディスクリプタに対して、入出力の準備ができるのを待つことが可能になること。
ディスクリプタの用意ができた場合に、SIGIOシグナルを用いて通知するようにカーネルに支持すること。
非同期I/Oでは、I/O操作を開始し、操作が完了した時に通知するようにカーネルに対して指示する。
シグナル駆動I/Oモデルとの違いは、シグナル駆動I/OではカーネルはI/O操作の起動が可能になったことを通知するのに対し、非同期I/OではカーネルはI/O操作が完了したことを知らせる。
select関数
select関数を用いると、複数のイベントのいずれかが発生するまで待ち、1つ以上のイベントが発生した場合、あるいは指定した時間が経過した場合にプロセスに起こすようにカーネルに指示できる。カーネルに対して、監視すべきディスクリプタ(読み出し、書き込み、エラー状態)と待ち時間を指示する。
#include <sys/select.h> #include <sys/time.h> int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout); 戻り値:準備ができているディスクリプタの個数、 タイムアウトなら0、エラーの場合は -1 maxfdp1:検査するディスクリプタ番号の最大値 + 1 (maxfd plus 1) readset:読み出し可能性を検査するディスクリプタ集合 writeset:書き込み可能性を検査するディスクリプタ集合 exceptset:例外状態を検査するディスクリプタ集合
struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
timeout引数の扱い
指定したディスクリプタのどれかの準備が出来た場合のみ制御を返す。
timeout引数にNULLポインタを指定する。
指定したディスクリプタで入出力の準備ができた場合に制御を返すが、timeout引数が挿すtimaval構造体で指定した時間を越えて待たない。
ディスクリプタの検査を行った後即座に制御を返す。ポーリング(polling)。
timeout引数がtimeval構造体を指し、タイマ値をゼロにする。
selectでは、ディスクリプタ集合を用いる(通常が整数配列)。
配列ディスクリプタの実際のインプリメンテーションは、fd_setの定義と次の4つのマクロに隠蔽されている。
fdset中のすべてのビットをクリアする
fdset中のfdのビットをセットする
fdset中のfdのビットはクリアする
fdset中のfdビットはセットされているか?
アプリケーションは、fd_set型のディスクリプタ集合を割り当て、これら4つのマクロを用いてビットのセット、クリア、検査を行う。
FD_SETSIZE定数(
selectは、readset, writeset, exceptsetの各ポインタで指されるディスクリプタ集合の内容を変更する(結果-値引数)。戻り時のfd_set構造体内の特定のディスクリプタの状態を調べるには、FD_ISSETマクロを用いる。戻り辞には、用意できていないディスクリプタに対応するビットがクリアされている。従って、selectを呼び出すたびに、監視対象となっているディスクリプタのビットを各ディスクリプタ集合でセットしなおす必要がある。
selectの戻り値は、3つのディスクリプタ集合中でセットされいるビットの合計数。
タイムアウト時は、戻り値はゼロ。
戻り値が-1の場合は、エラー発生(シグナルの捕捉で関数が割り込まれたなど)。
ディスクリプタの用ができる条件
ソケットディスクリプタの読み出し可能条件
低水位標は、SO_RCVLOWATソケットオプションで設定することが出来る。TCP, UDPソケットのデフォルト値は1。
ソケットディスクリプタの書き込み可能条件
(鄯)ソケットが接続された状態にある
(鄱)ソケットが接続を必要としない場合(ex.UDP)
この場合ソケットが日ブロッキングに設定されていると、書込み操作はブロックせず正の値(トランスポートが受け付けたバイト数)を返す。低水位標はSO_SNDLOWATで変更でき、TCP, UDPのデフォルト値は2048。
ソケット上でエラーが起きると、selectはそのソケットを読み書き可能と見なすことに注意。
/* selectを用いたechoクライアント */ #include <stdio.h> #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <stdlib.h> #include <string.h> #include <errno.h> #define MAXLINE 64 int max(int a, int b) { return a > b ? a : b; } /* readline */ /* ディスクリプタから1バイトずつ1行分読み出す */ ssize_t readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { again: if ( (rc = read(fd, &c, 1) ) == 1) { *ptr++ = c; if (c == '\n') break; } else if (rc == 0) { if (n == 1) return 0;/* EOF データなし */ else break; } else { if (errno == EINTR) goto again; return -1;/* エラー発生 readがerrnoを設定している */ } } *ptr = 0;/* 終端をヌルにする */ return n; } /* writen */ /* ディスクリプタへのnバイトの書き込み */ ssize_t writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft) ) <= 0) { if (errno == EINTR) nwritten = 0;/* 再度write()を呼び出す */ else return -1;/* エラーの発生 */ } nleft -= nwritten; ptr += nwritten; } return n; } void str_cli(FILE *fp, int sockfd) { int maxfdp1; fd_set rset; char sendline[MAXLINE], recvline[MAXLINE]; FD_ZERO(&rset); for ( ; ; ) { FD_SET(fileno(fp), &rset); FD_SET(sockfd, &rset); maxfdp1 = max(fileno(fp), sockfd) + 1; select(maxfdp1, &rset, NULL, NULL, NULL); if (FD_ISSET(sockfd, &rset)) {/* ソケットが読み出し可能 */ if (readline(sockfd, recvline, MAXLINE) == 0) { perror("readline"); exit(1); } fputs(recvline, stdout); } if (FD_ISSET(fileno(fp), &rset)) { if (fgets(sendline, MAXLINE, fp) == NULL) return;/* 処理終了 */ writen(sockfd, sendline, strlen(sendline)); } } } int main(int argc, char *argv[]) { int sockfd; struct sockaddr_in servaddr; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(8004); servaddr.sin_addr.s_addr = inet_addr("192.168.1.3"); connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); str_cli(stdin, sockfd); return 0; }
shutdown関数
#include <sys/socket.h> int shutdown(int sockfd, int howto); 戻り値:成功なら0、エラーなら-1
howto引数の値
//単一プロセスでselectを用いるechoサーバー #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #define MAXLINE 64 //ディスクリプタからnバイト読み出す ssize_t readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft) ) < 0) { if (errno == EINTR) nread = 0;// 再度 read()を呼び出す else return -1; } else if (nread == 0) { break;// EOF } nleft -= nread; ptr += nread; } return (n - nleft); } // writen // ディスクリプタへのnバイトの書き込み ssize_t writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft) ) <= 0) { if (errno == EINTR) nwritten = 0;//再度write()を呼び出す else return -1;//エラーの発生 } nleft -= nwritten; ptr += nwritten; } return n; } //readline //ディスクリプタから1バイトずつ1行分読み出す ssize_t readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { again: if ( (rc = read(fd, &c, 1) ) == 1) { *ptr++ = c; if (c == '\n') break; } else if (rc == 0) { if (n == 1) return 0;//EOF データなし else break; } else { if (errno == EINTR) goto again; return -1;//エラー発生 readがerrnoを設定している } } *ptr = 0;//終端をヌルにする return n; } void str_echo(int sockfd) { ssize_t n; char line[MAXLINE]; for (;;) { if ( (n = readline(sockfd, line, MAXLINE) ) == 0) { return; } writen(sockfd, line, n); } } int main(){ int i, maxi, maxfd, listenfd, connfd, sockfd; int nready, client[FD_SETSIZE]; ssize_t n; fd_set rset, allset; char line[MAXLINE]; socklen_t clilen; struct sockaddr_in cliaddr, servaddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(8005); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); listen(listenfd, 64); maxfd = listenfd; //初期化 maxi = -1;//client[]配列の添え字 for (i = 0; i < FD_SETSIZE; i++) client[i] = -1;//-1は利用可能なエントリを示す FD_ZERO(&allset); FD_SET(listenfd, &allset); for (;;) { rset = allset;//構造体の代入 nready = select(maxfd + 1, &rset, NULL, NULL, NULL); if (FD_ISSET(listenfd, &rset)) {//新規クライアントコネクション clilen = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen); for (i = 0; i < FD_SETSIZE; i++) { if (client[i] < 0) { client[i] = connfd;//ディスクリプタの保存 break; } } if (i == FD_SETSIZE) { perror("too many clients"); exit(1); } FD_SET(connfd, &allset);//新しいディスクリプタを集合に加える if (connfd > maxfd) maxfd = connfd; if (i > maxi) maxi = i; if (-nready <= 0) continue; } for (i = 0; i <= maxi; i++) {//クライアントからのデータを検査 if ( (sockfd = client[i]) < 0) continue; if (FD_ISSET(sockfd, &rset)) { if ( (n = readline(sockfd, line, MAXLINE)) == 0 ) { //クライアントがコネクションをクローズした close(sockfd); FD_CLR(sockfd, &allset); client[i] = -1; } else { writen(sockfd, line, n); } if (-nready <= 0) break;//読み出し可能なディスクリプタなし } } } return 0; }
参考文献