I/Oの多重化2
poll関数
#include <poll.h> int poll(struct pollfd *darray, unsigned long nfds, int timeout); 戻り値:準備のできているディスクリプタ数、タイムアウトなら0、エラーなら-1
struct pollfd { int fd; //検査するディスクリプタ short event;//fd上で興味のあるイベント short revent;//fd上で発生したイベント };
//pollによるechoサーバー #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <limits.h> #include <poll.h> #define MAXLINE 64 #define OPEN_MAX 128 //ディスクリプタからnバイト読み出す ssize_t readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft) ) < 0) { if (errno == EINTR) nread = 0;// 再度 read()を呼び出す else return -1; } else if (nread == 0) { break;// EOF } nleft -= nread; ptr += nread; } return (n - nleft); } //writen //ディスクリプタへのnバイトの書き込み ssize_t writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft) ) <= 0) { if (errno == EINTR) nwritten = 0;// 再度write()を呼び出す else return -1;// エラーの発生 } nleft -= nwritten; ptr += nwritten; } return n; } //readline //ディスクリプタから1バイトずつ1行分読み出す ssize_t readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { again: if ( (rc = read(fd, &c, 1) ) == 1) { *ptr++ = c; if (c == '\n') break; } else if (rc == 0) { if (n == 1) return 0;// EOF データなし else break; } else { if (errno == EINTR) goto again; return -1;// エラー発生 readがerrnoを設定している } } *ptr = 0;//終端をヌルにする return n; } void str_echo(int sockfd) { ssize_t n; char line[MAXLINE]; for (;;) { if ( (n = readline(sockfd, line, MAXLINE) ) == 0) { return; } writen(sockfd, line, n); } } int main() { int i, maxi, listenfd, connfd, sockfd; int nready; ssize_t n; char line[MAXLINE]; socklen_t clilen; struct pollfd client[OPEN_MAX]; struct sockaddr_in cliaddr, servaddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(8004); bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); listen(listenfd, 64); client[0].fd = listenfd; client[0].events = POLLIN; for (i = 1; i < OPEN_MAX; i++) client[i].fd = -1;//-1は利用可能なエントリを示す maxi = 0;//client[]配列の最大添え字 for (;;) { nready = poll(client, maxi + 1, 1); if (client[0].revents & POLLIN) {//新規クライアントコネクション clilen = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen); for (i = 1; i < OPEN_MAX; i++) { if (client[i].fd < 0) { client[i].fd = connfd;//ディスクリプタの保存 break; } } if (i == OPEN_MAX) { perror("too many clients"); exit(1); } client[i].events = POLLIN; if (i > maxi) maxi = i; if (-nready <= 0) continue;//読み出し可能なディスクリプタがない } for (i = 1; i <= maxi; i++) {//クライアントからのデータ検査 if ( (sockfd = client[i].fd) < 0 ) continue; if (client[i].revents & (POLLIN | POLLERR)) { if ( (n = readline(sockfd, line, MAXLINE)) < 0 ) { if (errno == ECONNRESET) { //クライアントがコネクションをリセットした close(sockfd); client[i].fd = -1; } else { perror("readline error"); exit(1); } } else if (n == 0) { close(sockfd); client[i].fd = -1; } else { writen(sockfd, line, n); } if (-nready <= 0) break;//読み出し可能なディスクリプタがない } } } return 0; }
参考文献
基本TCPソケット2−5
Tips
netstatコマンド
リスニングソケットを表示するには、-a フラグが必要。
connectは、3WHSの2番目のセグメントを受信すると、制御を戻す。
acceptは、3WHSの3番目のセグメントを受信すると、制御を戻す。
POSIXシグナル
シグナルはプロセスに対するイベント発生の通知。ソフトウェア割り込みとも呼ばれる。シグナルは普通、非同期に発生する。
シグナルは、
送ることができる。
シグナルには、処理配備(アクション)が伴う。3種類の処理配備がある。
1.シグナルハンドラにより、シグナルを捕捉する。
SIGKILLとSIGSTOPは捕捉できない。
シグナルハンドラの関数プロトタイプ
void handler(int signo);
2.シグナルを無視する
シグナルの処理配備をSIG_IGNに設定することにより、そのシグナルを無視するように設定できる。SIGKILLとSIGSTOPは無視できない。
3.デフォルトの動作に設定する
シグナル処理配備をSIG_DFLに設定することにより、そのシグナルに対するデフォルトの動作を行うように設定できる。SIGCHLDとSIGURGは、デフォルト動作がシグナル無視となっている。
シグナルの処理配備を設定するためには、sigaction関数を呼び出す。
typedef void Sigfunc(int); Sigfunc* signal(int signo, Sigfunc *func) { struct sigaction act, oact; act.sa_handler = func; sigemptyset(&act.sa_mask); act.sa_flags = 0; if (signo == SIGALRM) { act.sa_flags |= SA_RESTART; } if (sigaction(signo, &act, &oact) < 0) return SIG_ERR; return oact.sa_handler; }
singaction構造体のsa_handlerメンバにfunc引数を設定する。
シグナルハンドラが呼び出されている間ブロックされるシグナルの集合を指定できる。ブロックされたシグナルは、プロセスに配送されなくなる。Posixでは、捕捉されたシグナルは、対応するハンドラが呼び出されている間ブロックされることを保証している。
sigaction構造体のSA_RESTARTが設定されていると、シグナルによって割り込まれたシステムコールを、カーネルが自動的に再実行する。
Posix適合システムにおけるシグナル処理について
この機能を用いるとコードの危険領域(実行中にシグナルの捕捉が起きてはならない部分を保護することが可能になる)
SIGCHLDシグナルの処理
ゾンビ状態の目的は、子プロセスのリソースの利用状況(プロセスID、終了状態、CPU時間、メモリ使用量など)を親プロセスがあとで取得できるようにすること。
プロセスが終了した際、そのプロセスにゾンビ状態になっている子プロセスがあれば、これらの子プロセスの親プロセスIDはすべて1(init)に付け替えられ、initは子プロセスを引き継いで終了処理を行う(子プロセスに対してwaitを実行し、ゾンビプロセスを消滅させる)。psを実行したときに、ゾンビプロセスは、defunctと表示される。
SIGCHLDシグナルを捕捉sるうシグナルハンドラを設定し、このシグナルハンドラ中からwaitを呼び出す。
printfのような標準入出力関数をシグナルハンドラから呼び出すことは、良くない。
//waitを呼び出すシグナルハンドラ void sig_child(int signo) { pid_t pid; int stat; pid = wait(&stat); printf("Child %d terminated\n", pid);/*これは良くないが。表示して分かりやすくため*/ return; }
割り込まれたシステムコールの処理
プロセスが遅いシステムコール(永久のブロックする可能性のあるシステムコール)でブロックして、かつプロセスによるシグナルの捕捉が起き、かつシグナルハンドラから制御が戻った際に、そのシステムコールがEINTRエラーを返す可能性がある。よって、遅いシステムコールがEINTRを返す場合に備えておかなければならない。
//acceptへの割り込みへの対処の例 for (;;) { clilen = sizeof(cliaddr); if (connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen) < 0) { if (errno == EINTR) continue; else exit(1); } }
システムコールが割り込まれたことを検出し、
単にシステムコールを再実行している。
参考文献
POSIXシグナル2
同じシグナルがほぼ同時に複数配送される場合
シグナルハンドラを設定してwaitを呼び出すだけでは、ゾンビ防止には不十分。Unixのシグナルはキューイングされないために、シグナルが発生した数だけ、シグナルハンドラが実行されない場合があるから。
正しい解決方法は、waitの代わりにwaitpidを呼び出すこと。
//waitpidを呼び出すシグナルハンドラ void sig_child(int signo) { pid_t pid; int stat; while ( (pid = waitpid(-1, &stat, WNOHANG)) > 0) printf("child %d terminated\n", pid); return; }
ネットワークプログラミングで遭遇する3種類の状況
参考文献
I/Oの多重化
I/Oの多重化(I/O multiplexing)
2つ以上のI/Oに対して、どれかが入出力可能になった場合の通知をカーネルに依頼する機能I/Oの多重化が用いられる状況
I/Oモデル
最も一般的に用いられる。
ソケットを非ブロッキングに設定することは、そのソケットに要求したI/O操作が、プロセスをスリープ状態におかないと完了しないのであれば、プロセスをスリープ状態にせずエラーを返すことをカーネルに要求することになる。
I/Oの多重化(I/O multiplexing)を行う場合は、selectあるいはpollを呼び出し、これら関数の中でブロックする。利点は、複数のディスクリプタに対して、入出力の準備ができるのを待つことが可能になること。
ディスクリプタの用意ができた場合に、SIGIOシグナルを用いて通知するようにカーネルに支持すること。
非同期I/Oでは、I/O操作を開始し、操作が完了した時に通知するようにカーネルに対して指示する。
シグナル駆動I/Oモデルとの違いは、シグナル駆動I/OではカーネルはI/O操作の起動が可能になったことを通知するのに対し、非同期I/OではカーネルはI/O操作が完了したことを知らせる。
select関数
select関数を用いると、複数のイベントのいずれかが発生するまで待ち、1つ以上のイベントが発生した場合、あるいは指定した時間が経過した場合にプロセスに起こすようにカーネルに指示できる。カーネルに対して、監視すべきディスクリプタ(読み出し、書き込み、エラー状態)と待ち時間を指示する。
#include <sys/select.h> #include <sys/time.h> int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout); 戻り値:準備ができているディスクリプタの個数、 タイムアウトなら0、エラーの場合は -1 maxfdp1:検査するディスクリプタ番号の最大値 + 1 (maxfd plus 1) readset:読み出し可能性を検査するディスクリプタ集合 writeset:書き込み可能性を検査するディスクリプタ集合 exceptset:例外状態を検査するディスクリプタ集合
struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
timeout引数の扱い
指定したディスクリプタのどれかの準備が出来た場合のみ制御を返す。
timeout引数にNULLポインタを指定する。
指定したディスクリプタで入出力の準備ができた場合に制御を返すが、timeout引数が挿すtimaval構造体で指定した時間を越えて待たない。
ディスクリプタの検査を行った後即座に制御を返す。ポーリング(polling)。
timeout引数がtimeval構造体を指し、タイマ値をゼロにする。
selectでは、ディスクリプタ集合を用いる(通常が整数配列)。
配列ディスクリプタの実際のインプリメンテーションは、fd_setの定義と次の4つのマクロに隠蔽されている。
fdset中のすべてのビットをクリアする
fdset中のfdのビットをセットする
fdset中のfdのビットはクリアする
fdset中のfdビットはセットされているか?
アプリケーションは、fd_set型のディスクリプタ集合を割り当て、これら4つのマクロを用いてビットのセット、クリア、検査を行う。
FD_SETSIZE定数(
selectは、readset, writeset, exceptsetの各ポインタで指されるディスクリプタ集合の内容を変更する(結果-値引数)。戻り時のfd_set構造体内の特定のディスクリプタの状態を調べるには、FD_ISSETマクロを用いる。戻り辞には、用意できていないディスクリプタに対応するビットがクリアされている。従って、selectを呼び出すたびに、監視対象となっているディスクリプタのビットを各ディスクリプタ集合でセットしなおす必要がある。
selectの戻り値は、3つのディスクリプタ集合中でセットされいるビットの合計数。
タイムアウト時は、戻り値はゼロ。
戻り値が-1の場合は、エラー発生(シグナルの捕捉で関数が割り込まれたなど)。
ディスクリプタの用ができる条件
ソケットディスクリプタの読み出し可能条件
低水位標は、SO_RCVLOWATソケットオプションで設定することが出来る。TCP, UDPソケットのデフォルト値は1。
ソケットディスクリプタの書き込み可能条件
(鄯)ソケットが接続された状態にある
(鄱)ソケットが接続を必要としない場合(ex.UDP)
この場合ソケットが日ブロッキングに設定されていると、書込み操作はブロックせず正の値(トランスポートが受け付けたバイト数)を返す。低水位標はSO_SNDLOWATで変更でき、TCP, UDPのデフォルト値は2048。
ソケット上でエラーが起きると、selectはそのソケットを読み書き可能と見なすことに注意。
/* selectを用いたechoクライアント */ #include <stdio.h> #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <stdlib.h> #include <string.h> #include <errno.h> #define MAXLINE 64 int max(int a, int b) { return a > b ? a : b; } /* readline */ /* ディスクリプタから1バイトずつ1行分読み出す */ ssize_t readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { again: if ( (rc = read(fd, &c, 1) ) == 1) { *ptr++ = c; if (c == '\n') break; } else if (rc == 0) { if (n == 1) return 0;/* EOF データなし */ else break; } else { if (errno == EINTR) goto again; return -1;/* エラー発生 readがerrnoを設定している */ } } *ptr = 0;/* 終端をヌルにする */ return n; } /* writen */ /* ディスクリプタへのnバイトの書き込み */ ssize_t writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft) ) <= 0) { if (errno == EINTR) nwritten = 0;/* 再度write()を呼び出す */ else return -1;/* エラーの発生 */ } nleft -= nwritten; ptr += nwritten; } return n; } void str_cli(FILE *fp, int sockfd) { int maxfdp1; fd_set rset; char sendline[MAXLINE], recvline[MAXLINE]; FD_ZERO(&rset); for ( ; ; ) { FD_SET(fileno(fp), &rset); FD_SET(sockfd, &rset); maxfdp1 = max(fileno(fp), sockfd) + 1; select(maxfdp1, &rset, NULL, NULL, NULL); if (FD_ISSET(sockfd, &rset)) {/* ソケットが読み出し可能 */ if (readline(sockfd, recvline, MAXLINE) == 0) { perror("readline"); exit(1); } fputs(recvline, stdout); } if (FD_ISSET(fileno(fp), &rset)) { if (fgets(sendline, MAXLINE, fp) == NULL) return;/* 処理終了 */ writen(sockfd, sendline, strlen(sendline)); } } } int main(int argc, char *argv[]) { int sockfd; struct sockaddr_in servaddr; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(8004); servaddr.sin_addr.s_addr = inet_addr("192.168.1.3"); connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); str_cli(stdin, sockfd); return 0; }
shutdown関数
#include <sys/socket.h> int shutdown(int sockfd, int howto); 戻り値:成功なら0、エラーなら-1
howto引数の値
//単一プロセスでselectを用いるechoサーバー #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #define MAXLINE 64 //ディスクリプタからnバイト読み出す ssize_t readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft) ) < 0) { if (errno == EINTR) nread = 0;// 再度 read()を呼び出す else return -1; } else if (nread == 0) { break;// EOF } nleft -= nread; ptr += nread; } return (n - nleft); } // writen // ディスクリプタへのnバイトの書き込み ssize_t writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft) ) <= 0) { if (errno == EINTR) nwritten = 0;//再度write()を呼び出す else return -1;//エラーの発生 } nleft -= nwritten; ptr += nwritten; } return n; } //readline //ディスクリプタから1バイトずつ1行分読み出す ssize_t readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { again: if ( (rc = read(fd, &c, 1) ) == 1) { *ptr++ = c; if (c == '\n') break; } else if (rc == 0) { if (n == 1) return 0;//EOF データなし else break; } else { if (errno == EINTR) goto again; return -1;//エラー発生 readがerrnoを設定している } } *ptr = 0;//終端をヌルにする return n; } void str_echo(int sockfd) { ssize_t n; char line[MAXLINE]; for (;;) { if ( (n = readline(sockfd, line, MAXLINE) ) == 0) { return; } writen(sockfd, line, n); } } int main(){ int i, maxi, maxfd, listenfd, connfd, sockfd; int nready, client[FD_SETSIZE]; ssize_t n; fd_set rset, allset; char line[MAXLINE]; socklen_t clilen; struct sockaddr_in cliaddr, servaddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(8005); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); listen(listenfd, 64); maxfd = listenfd; //初期化 maxi = -1;//client[]配列の添え字 for (i = 0; i < FD_SETSIZE; i++) client[i] = -1;//-1は利用可能なエントリを示す FD_ZERO(&allset); FD_SET(listenfd, &allset); for (;;) { rset = allset;//構造体の代入 nready = select(maxfd + 1, &rset, NULL, NULL, NULL); if (FD_ISSET(listenfd, &rset)) {//新規クライアントコネクション clilen = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen); for (i = 0; i < FD_SETSIZE; i++) { if (client[i] < 0) { client[i] = connfd;//ディスクリプタの保存 break; } } if (i == FD_SETSIZE) { perror("too many clients"); exit(1); } FD_SET(connfd, &allset);//新しいディスクリプタを集合に加える if (connfd > maxfd) maxfd = connfd; if (i > maxi) maxi = i; if (-nready <= 0) continue; } for (i = 0; i <= maxi; i++) {//クライアントからのデータを検査 if ( (sockfd = client[i]) < 0) continue; if (FD_ISSET(sockfd, &rset)) { if ( (n = readline(sockfd, line, MAXLINE)) == 0 ) { //クライアントがコネクションをクローズした close(sockfd); FD_CLR(sockfd, &allset); client[i] = -1; } else { writen(sockfd, line, n); } if (-nready <= 0) break;//読み出し可能なディスクリプタなし } } } return 0; }
参考文献