I/Oの多重化

I/Oの多重化(I/O multiplexing)

2つ以上のI/Oに対して、どれかが入出力可能になった場合の通知をカーネルに依頼する機能

I/Oの多重化が用いられる状況

  • クライアントが複数のディスクリプタ(普通はstdinとネットワークソケット)を扱っている時
  • クラインとが複数のソケットを同時に扱う場合
  • TCPサーバーが、リスニングソケットを接続済みソケットを同時に扱う場合
  • サーバーがTCPUDPの両方を扱っている場合
  • サーバーが複数のサービスや、複数のプロトコルを扱う場合



    I/Oモデル

  • ブロッキングI/O
  • ブロッキングI/O
  • I/Oの多重化 (select, poll)
  • シグナル駆動I/O (SIGIO)
  • 非同期I/O (aio_関数群)



  • ブロッキングI/Oモデル
    最も一般的に用いられる。

  • ブロッキングI/Oモデル
    ソケットを非ブロッキングに設定することは、そのソケットに要求したI/O操作が、プロセスをスリープ状態におかないと完了しないのであれば、プロセスをスリープ状態にせずエラーを返すことをカーネルに要求することになる。

  • 多重化I/Oモデル
    I/Oの多重化(I/O multiplexing)を行う場合は、selectあるいはpollを呼び出し、これら関数の中でブロックする。利点は、複数のディスクリプタに対して、入出力の準備ができるのを待つことが可能になること。

  • シグナル駆動I/Oモデル
    ディスクリプタの用意ができた場合に、SIGIOシグナルを用いて通知するようにカーネルに支持すること。

  • 非同期I/Oモデル
    非同期I/Oでは、I/O操作を開始し、操作が完了した時に通知するようにカーネルに対して指示する。
    シグナル駆動I/Oモデルとの違いは、シグナル駆動I/OではカーネルはI/O操作の起動が可能になったことを通知するのに対し、非同期I/OではカーネルはI/O操作が完了したことを知らせる。




    select関数

    select関数を用いると、複数のイベントのいずれかが発生するまで待ち、1つ以上のイベントが発生した場合、あるいは指定した時間が経過した場合にプロセスに起こすようにカーネルに指示できる。
    カーネルに対して、監視すべきディスクリプタ(読み出し、書き込み、エラー状態)と待ち時間を指示する。

    #include <sys/select.h>
    #include <sys/time.h>
    
    int select(int maxfdp1, fd_set *readset, fd_set *writeset,
       fd_set *exceptset, const struct timeval *timeout);
    
    戻り値:準備ができているディスクリプタの個数、
    タイムアウトなら0、エラーの場合は -1
    
    maxfdp1:検査するディスクリプタ番号の最大値 + 1 (maxfd plus 1)
    readset:読み出し可能性を検査するディスクリプタ集合
    writeset:書き込み可能性を検査するディスクリプタ集合
    exceptset:例外状態を検査するディスクリプタ集合


    struct timeval {
      long tv_sec; /* seconds */
      long tv_usec; /* microseconds */
    };

    timeout引数の扱い

  • 永久に待ち続ける
    指定したディスクリプタのどれかの準備が出来た場合のみ制御を返す。
    timeout引数にNULLポインタを指定する。

  • 指定した時間だけ待つ
    指定したディスクリプタで入出力の準備ができた場合に制御を返すが、timeout引数が挿すtimaval構造体で指定した時間を越えて待たない。

  • 全く待たない
    ディスクリプタの検査を行った後即座に制御を返す。ポーリング(polling)。
    timeout引数がtimeval構造体を指し、タイマ値をゼロにする。


    selectでは、ディスクリプタ集合を用いる(通常が整数配列)。
    配列ディスクリプタの実際のインプリメンテーションは、fd_setの定義と次の4つのマクロに隠蔽されている。

  • void FD_ZERO(fd_set *fdset);
    fdset中のすべてのビットをクリアする

  • void FD_SET(int fd, fd_set *fdset);
    fdset中のfdのビットをセットする

  • void FD_CLR(int fd, fd_set *fdset);
    fdset中のfdのビットはクリアする

  • int FD_ISSET(int fd, fd_set *fdset);
    fdset中のfdビットはセットされているか?


    アプリケーションは、fd_set型のディスクリプタ集合を割り当て、これら4つのマクロを用いてビットのセット、クリア、検査を行う。


    FD_SETSIZE定数()h、fd_set型で表現できる最大ディスクリプタ数。

    selectは、readset, writeset, exceptsetの各ポインタで指されるディスクリプタ集合の内容を変更する(結果-値引数)。戻り時のfd_set構造体内の特定のディスクリプタの状態を調べるには、FD_ISSETマクロを用いる。戻り辞には、用意できていないディスクリプタに対応するビットがクリアされている。従って、selectを呼び出すたびに、監視対象となっているディスクリプタのビットを各ディスクリプタ集合でセットしなおす必要がある。


    selectの戻り値は、3つのディスクリプタ集合中でセットされいるビットの合計数。
    タイムアウト時は、戻り値はゼロ。
    戻り値が-1の場合は、エラー発生(シグナルの捕捉で関数が割り込まれたなど)。



    ディスクリプタの用ができる条件

    ソケットディスクリプタの読み出し可能条件

  • ソケットの受信バッファ中のデータのバイト数が、そのソケットの受信バッファの低水位標よりも等しいか多くなった場合。
    低水位標は、SO_RCVLOWATソケットオプションで設定することが出来る。TCP, UDPソケットのデフォルト値は1。

  • コネクションの読み出し側がクローズされた場合(そのTCPコネクションでFINが受信された場合)。そのソケットに対する呼び出し操作はブロックせず、戻り値としてゼロ(EOF)を返す。

  • ソケットがリスニングソケットであり、確立済みコネクションの数がゼロでない場合。

  • ソケット上に保留中のエラーがある場合。そのソケットに対する読み出し操作はブロックせず、errnoに特定のエラー状態を設定し、-1を戻り値として返す。保留中のエラーは、getsockopt呼び出しで、SO_ERRORソケットオプションを指定することによって、取得とエラー状態の解除が可能。

    ソケットディスクリプタの書き込み可能条件

  • ソケットの送信バッファの空きバイト数がそのソケットの送信バッファの低水位標より大きいか等しい場合、でかつ
    (鄯)ソケットが接続された状態にある
    (鄱)ソケットが接続を必要としない場合(ex.UDP)

    この場合ソケットが日ブロッキングに設定されていると、書込み操作はブロックせず正の値(トランスポートが受け付けたバイト数)を返す。低水位標はSO_SNDLOWATで変更でき、TCP, UDPのデフォルト値は2048。

  • コネクションの書き込み側がクローズされている場合。そのソケットへの書き込み操作はSIGPIPEシグナルを発生する。

  • 保留中のソケットエラーが存在する場合。


    ソケット上でエラーが起きると、selectはそのソケットを読み書き可能と見なすことに注意。



    /* selectを用いたechoクライアント */
    
    #include <stdio.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <arpa/inet.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #define MAXLINE 64
    
    int
    max(int a, int b)
    {
      return a > b ? a : b;
    }
    
    /* readline */
    /* ディスクリプタから1バイトずつ1行分読み出す */
    ssize_t
    readline(int fd, void *vptr, size_t maxlen)
    {
      ssize_t n, rc;
      char c, *ptr;
    
      ptr = vptr;
      for (n = 1; n < maxlen; n++) {
    again:
        if ( (rc = read(fd, &c, 1) ) == 1) {      *ptr++ = c;
          if (c == '\n')
            break;
        } else if (rc == 0) {
          if (n == 1)
            return 0;/* EOF データなし */
          else
            break;
        } else {
          if (errno == EINTR)
            goto again;
          return -1;/* エラー発生 readがerrnoを設定している */
        }
      }
      *ptr = 0;/* 終端をヌルにする */
      return n;
    }
    
    
    /* writen */
    /* ディスクリプタへのnバイトの書き込み */
    ssize_t
    writen(int fd, const void *vptr, size_t n)
    {
      size_t nleft;
      ssize_t nwritten;
      const char *ptr;
    
      ptr = vptr;
      nleft = n;
      while (nleft > 0) {
        if ( (nwritten = write(fd, ptr, nleft) ) <= 0) {
          if (errno == EINTR)
            nwritten = 0;/* 再度write()を呼び出す */
          else
            return -1;/* エラーの発生 */
        }
        nleft -= nwritten;
        ptr += nwritten;
      }
      return n;
    }
    
    
    void
    str_cli(FILE *fp, int sockfd)
    {
      int maxfdp1;
      fd_set rset;
      char sendline[MAXLINE], recvline[MAXLINE];
    
      FD_ZERO(&rset);
      for ( ; ; ) {
        FD_SET(fileno(fp), &rset);
        FD_SET(sockfd, &rset);
        maxfdp1 = max(fileno(fp), sockfd) + 1;
        select(maxfdp1, &rset, NULL, NULL, NULL);
    
        if (FD_ISSET(sockfd, &rset)) {/* ソケットが読み出し可能 */
          if (readline(sockfd, recvline, MAXLINE) == 0) {
            perror("readline");
            exit(1);
          }
          fputs(recvline, stdout);
        }
        if (FD_ISSET(fileno(fp), &rset)) {
          if (fgets(sendline, MAXLINE, fp) == NULL)
            return;/* 処理終了 */
          writen(sockfd, sendline, strlen(sendline));
        }
      }
    }
    
    
    int
    main(int argc, char *argv[])
    {
      int sockfd;
      struct sockaddr_in servaddr;
    
      sockfd = socket(AF_INET, SOCK_STREAM, 0);
      bzero(&servaddr, sizeof(servaddr));
    
      servaddr.sin_family = AF_INET;
      servaddr.sin_port = htons(8004);
      servaddr.sin_addr.s_addr = inet_addr("192.168.1.3");
      connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
    
      str_cli(stdin, sockfd);
    
      return 0;
    }
    



    shutdown関数

  • shutdown関数を用いると、ソケットディスクリプタの参照カウントの値に関わらず、TCPのコネクション終了シーケンス(FINで始まる4セグメントのシーケンス)を開始することが出来る。

    #include <sys/socket.h>
    int shutdown(int sockfd, int howto);
    
    戻り値:成功なら0、エラーなら-1
    

    howto引数の値

  • SHUT_RD:コネクションの読み出し側がクローズされ、そのソケットからはデータを受信することができなくなり、ソケットの受信バッファ中のデータは破棄される。プロセスは、そのソケットに対する読み出し操作をできなくなる。

  • SHUT_WR:コネクションの書き込み側がクローズされる。TCPの場合、ハーフクローズと呼ばれる。

  • SHUT_RDWR:読み出し側と書き込み側がどちらもクローズされる。

    //単一プロセスでselectを用いるechoサーバー
    
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <arpa/inet.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    
    #define MAXLINE 64
    
    
    //ディスクリプタからnバイト読み出す
    ssize_t
    readn(int fd, void *vptr, size_t n)
    {
      size_t nleft;
      ssize_t nread;
      char *ptr;
    
      ptr = vptr;
      nleft = n;
      while (nleft > 0) {
        if ( (nread = read(fd, ptr, nleft) ) < 0) {
          if (errno == EINTR)
            nread = 0;// 再度 read()を呼び出す
          else
            return -1;
        } else if (nread == 0) {
          break;// EOF
        }
        nleft -= nread;
        ptr += nread;
      }
      return (n - nleft);
    }
    
    
    
    // writen
    // ディスクリプタへのnバイトの書き込み
    ssize_t
    writen(int fd, const void *vptr, size_t n)
    {
      size_t nleft;
      ssize_t nwritten;
      const char *ptr;
    
      ptr = vptr;
      nleft = n;
      while (nleft > 0) {
        if ( (nwritten = write(fd, ptr, nleft) ) <= 0) {
          if (errno == EINTR)
            nwritten = 0;//再度write()を呼び出す
          else
            return -1;//エラーの発生
        }
        nleft -= nwritten;
        ptr += nwritten;
      }
      return n;
    }
    
    
    //readline
    //ディスクリプタから1バイトずつ1行分読み出す
    ssize_t
    readline(int fd, void *vptr, size_t maxlen)
    {
      ssize_t n, rc;
      char c, *ptr;
    
      ptr = vptr;
      for (n = 1; n < maxlen; n++) {
    again:
        if ( (rc = read(fd, &c, 1) ) == 1) {
          *ptr++ = c;
          if (c == '\n')
            break;
        } else if (rc == 0) {
          if (n == 1)
            return 0;//EOF データなし
          else
            break;
        } else {
          if (errno == EINTR)
            goto again;
          return -1;//エラー発生 readがerrnoを設定している
        }
      }
      *ptr = 0;//終端をヌルにする
      return n;
    }
    
    
    void
    str_echo(int sockfd)
    {
      ssize_t n;
      char line[MAXLINE];
      for (;;) {
        if ( (n = readline(sockfd, line, MAXLINE) ) == 0) {
          return;
        }
        writen(sockfd, line, n);
      }
    }
    
    
    int
    main(){
      int i, maxi, maxfd, listenfd, connfd, sockfd;
      int nready, client[FD_SETSIZE];
      ssize_t n;
      fd_set rset, allset;
      char line[MAXLINE];
      socklen_t clilen;
      struct sockaddr_in cliaddr, servaddr;
    
      listenfd = socket(AF_INET, SOCK_STREAM, 0);
      bzero(&servaddr, sizeof(servaddr));
      servaddr.sin_family = AF_INET;
      servaddr.sin_port = htons(8005);
      servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    
      bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
    
      listen(listenfd, 64);
    
      maxfd = listenfd; //初期化
      maxi = -1;//client[]配列の添え字
      for (i = 0; i < FD_SETSIZE; i++)
        client[i] = -1;//-1は利用可能なエントリを示す
    
      FD_ZERO(&allset);
      FD_SET(listenfd, &allset);
    
      for (;;) {
        rset = allset;//構造体の代入
        nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
        if (FD_ISSET(listenfd, &rset)) {//新規クライアントコネクション
          clilen = sizeof(cliaddr);
          connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
          for (i = 0; i < FD_SETSIZE; i++) {
            if (client[i] < 0) {
              client[i] = connfd;//ディスクリプタの保存
              break;
            }
          }
          if (i == FD_SETSIZE) {
            perror("too many clients");
            exit(1);
          }
          FD_SET(connfd, &allset);//新しいディスクリプタを集合に加える
          if (connfd > maxfd)
            maxfd = connfd;
          if (i > maxi)
            maxi = i;
          if (-nready <= 0)
            continue;
        }
        for (i = 0; i <= maxi; i++) {//クライアントからのデータを検査
          if ( (sockfd = client[i]) < 0)
            continue;
          if (FD_ISSET(sockfd, &rset)) {
            if ( (n = readline(sockfd, line, MAXLINE))  == 0 ) {
              //クライアントがコネクションをクローズした
              close(sockfd);
              FD_CLR(sockfd, &allset);
              client[i] = -1;
            } else {
              writen(sockfd, line, n);
            }
            if (-nready <= 0)
              break;//読み出し可能なディスクリプタなし
          }
        }
      }
    
      return 0;
    }
    


    参考文献

  • UNIXネットワークプログラミング スティーブンズ