Posixメッセージキュー

Introduction

  • メッセージキューは、メッセージのリンク構造と考えることが出来る

  • 各メッセージはレコードであり、各メッセージには送信側が指定した優先度が付いている。

  • メッセージの書き込みに際しては、そのキューにおいて何らかのプロセスがメッセージの到着を待っていることが要求されない。(パイプと対照的)

  • メッセージキューはパイプと異なり、カーネル持続性(kernel persistence)を持つ。(パイプ、FIFOでは、それらが最後にクローズされる際に、残っているデータは破棄される。)

  • PosixメッセージキューとSystem V メッセージキューの違い

  • Posixメッセージキューからの読み出しは、常に最も高い優先度の最も古いメッセージを返す。System V メッセージキューでは、任意の優先度のメッセージを読み出すことが出来る。

  • Posixメッセージキューでは、空のキューにメッセージが置かれた際にシグナルの生成やスレッドの起動が行えるが、System V メッセージキューでは、このような操作が行えない。
  • キュー上のメッセージの属性

  • 優先度(Posixでは符号なし整数、System Vでは長い整数)

  • メッセージのデータ部の長さ(ゼロでも良い)

  • データ自身(長さがゼロより大きい場合)

  • mq_open,mq_close,mq_unlink関数

    mq_open関数は、新しいメッセージキューを作成するか、既存のメッセージキューをオープンする。

    #include <mqueue.h>
    
    mqd_t mq_open(const char *name,int oflag,...
        /* mode_t mode,struct mq_attr *attr */);
    
    戻り値:  成功ならメッセージキューディスクリプタを返す。失敗なら-1

  • oflag引数は、O_RDONLY,O_WRONLY,O_RDWRのいずれかで、O_CREAT,O_EXCL,O_NONBLOCKとの論理和を指定してもよい。

  • mq_openの戻り値はメッセージキューディスクリプタと呼ばれる。

  • オープンされているメッセージキューをクローズするにはmq_closeを用いる。

    #include <mqueue.h>
    
    int mq_close(mqd_t mqdes);
    
    戻り値:  成功なら0,エラーなら-1

    この関数は、呼び出したプロセスは指定したディスクリプタをそれ以降使用することは出来ないが、メッセージキュー自体はシステムから削除されないという点で、closeと似ている。プロセス終了時には、オープンされているメッセージキューはクローズされる。


    mq_openの引数に用いたnameをシステムから削除するには、mq_unlinkを呼び出す必要がある。

    #include <mqueue.h>
    
    int mq_unlink(const char *name);
    
    戻り値:  成功なら0,エラーなら-1

    メッセージキューは(ファイルのように)、オープンされている数を参照カウントとして保持し、参照カウントがゼロより大きくてもnameをシステムから削除することができるが、名前の削除ではなくキュー自身の削除は、最後のmq_closeが実行されるまで行われない。


    Posixメッセージキューは、少なくともカーネル持続性を持つ。これはキューをオープンしているプロセスが存在しなくとも、mq_unlinkによって削除され、かつ参照カウントがゼロに達するまで、その内容と共に存在を続けるということ。



    //メッセージキューを作成する
    //PosixのIPC名は、スラッシュで始める。
    #include <mqueue.h>
    #include <stdio.h>
    #include <sys/stat.h>
    
    #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    
    int
    main()
    {
        mqd_t mqd;
        int flags;
    
        flags = O_RDWR | O_CREAT;
        mqd = mq_open("/mq",flags,FILE_MODE,NULL);
        mq_close(mqd);
    
        return 0;
    }
    
    /////////////////////////////////
    //gccで-lrtでリンクしてビルドする
    //gcc -lrt mq_open.c
    





    //システムからメッセージキューを削除する
    #include <mqueue.h>
    #include <stdio.h>
    
    int
    main()
    {    
        mq_unlink("/mq");
        return 0;
    }
    
    /////////////////////////////////
    //gccで-lrtでリンクしてビルドする
    //gcc -lrt mq_unlink.c



    mq_getattr関数とmq_setattr関数

    各メッセージキューは4種類の属性を持つ。これら属性の値は、mq_getattr関数で取得することが出来る。
    1つの属性は、mq_setattr関数で値を設定することが出来る。

    int mq_getattr(mqd_t mqdes,struct mq_attr *attr);
    int mq_setattr(mqd_t mqdes,const struct mq_attr *attr,n    struct mq_attr *oattr);
    
    戻り値:2関数とも成功なら0,エラーなら-1
    mq_attr構造体
    
    struct mq_attr {
        long mq_flags;//メッセージキューフラグ:0またはO_NONBLOCK
        long mq_maxmsg;//キューは保持できる最大メッセージ数
        long mq_msgsize;//メッセージの最大長(バイト数)
        long mq_curmsgs;//キュー内の現在のメッセージ数
    };

    mq_setattrは、実際にはmq_attr構造体のmq_flagsメンバのみが使用される。mq_flagsを用いて非ブロッキングフラグの設定・解除が行われる。mq_maxmsgとmq_msgsizeはキューの作成時にしか設定することはできない。mq_curmsgsは取得できるが、設定できない。
    oattrポインタは、非ヌルであれば、キューの現在のステータスに加えて、以前の属性(mq_flags,mq_maxmsg,mq_msgsize)が返される。

    //メッセージキューの属性を取得して表示
    //mq_getattrのサンプル
    
    #include <mqueue.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/stat.h>
    
    #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    
    int
    main()
    {
        mqd_t mqd;
        struct mq_attr attr;
        int ret;
    
        memset(&attr,0,sizeof(attr)); 
        mqd = mq_open("/mq",O_RDWR | O_CREAT,FILE_MODE,NULL);
        printf("mqd = %d\n",mqd);
        ret = mq_getattr(mqd,&attr);
        printf("ret = %d\n",ret);
        printf("max #msgs = %ld,max #bytes/msg = %ld,"
                "#currently on queue = %ld\n",n            attr.mq_maxmsg,
                attr.mq_msgsize,
                attr.mq_curmsgs);
    
        mq_close(mqd);
        return 0;
    }
    
    
    ///////////////////////////////
    //ビルド
    gcc -rlt mq_getattr.c
    
    //////////////////////////////
    //実行結果
    max #msgs = 10,max #bytes/msg = 8192,#currently on queue = 0



    mq_send関数とmq_receive関数

    #include <mqueue.h>
    
    int mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio);
    
    ssize_t mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);
    
    //mq_send()サンプル
    
    #include <mqueue.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    
    int
    main()
    {
        mqd_t mqd;
        void *ptr;    unsigned int prio;
        char *msg;
    
        msg = "Hello World!";
        prio = 10; 
    
        mqd = mq_open("/mq",O_WRONLY);
    
        ptr = calloc(strlen(msg) + 1,sizeof(char));
        strcpy(ptr,msg);
        mq_send(mqd,ptr,strlen(ptr),prio);
        return 0;
    }
    
    //mq_receive()サンプル
    #include <mqueue.h>
    #include <stdio.h>
    #include <stdlib.h>
    
    int
    main()
    {
        mqd_t mqd;
        unsigned int prio;
        ssize_t n;
        void *buff;
        int flags;
        struct mq_attr attr;
        int i;
        char *p; 
    
        flags = O_RDONLY;
        mqd = mq_open("/mq",flags);
        mq_getattr(mqd,&attr);
        buff = malloc(attr.mq_msgsize);
        n = mq_receive(mqd,buff,attr.mq_msgsize,&prio);
        printf("read %ld,bytes,priority = %u\n",n        (long)n,prio);
    
        p = (char*)buff;
        for (i = 0; i < n; i++)
            putchar(p[i]);
    
        return 0;
    }
    



    メッセージキューの量的制限

    #include <unistd.h>
    #include <stdio.h>
    
    
    int
    main()
    {
        printf("MQ_OPEN_MAX = %ld,MQ_PRIO_MAX = %ld\n",n        sysconf(_SC_MQ_OPEN_MAX),n        sysconf(_SC_MQ_PRIO_MAX));
    
        return 0;
    }

    Fedora 5での実行結果
    MQ_OPEN_MAX = -1,MQ_PRIO_MAX = 32768



    mq_notify

    System Vメッセージキューの問題点の1つは、メッセージキューに置かれたことをプロセスに通知する手段がないこと。よってメッセージの到着を判定するにはポーリングする必要があり、これはCPU時間の無駄使い。

    Posixメッセージキューでは、空のメッセージキューにメッセージが置かれたときに、以下の2種類の非同期イベント通知が行える。

    1.シグナルの生成
    2.あらかじめ指定された関数を実行するスレッドの生成

    #include <mqueue.h>
    
    int mq_notify(mqd_t mqdes,const struct sigevent *notification);
    
    戻り値:  成功なら0,エラーなら-1
    union sigval {
        int sigval_int; /* 整数値 */
        void *sigval_ptr;  /* ポインタ値 */
    };
    
    struct sigevent {
        int sigev_notify;  /* SIGEV_NONE,SIGNAL,THREAD */
        int sigev_signo;  /* SIGEV_SIGNALの場合はシグナル番号 */
        union sigval sigev_value; /* シグナルハンドラまたはスレッドに渡される。 */
    };


    mq_notifyに関する一般的な規則
    1.notification引数が非ヌルであった場合、指定したキューが空の状態の時にメッセージが到着した際に、その通知をプロセスが要求していることを意味する。これをプロセスがそのキューの通知先に登録されていると呼ぶ。

    2.notification引数がヌルポインタで、かつ指定したキューにおいて、プロセスが現在の通知先に登録されている場合、その登録は抹消される。

    3.あるキューにおいて、同時に通知先に登録できるのは、1つのプロセスに限られる。

    4.空のキューにメッセージが到着し、かつあるプロセスがそのキューの通知先に登録されている場合、mq_receiveの呼び出しでブロックしているスレッドが存在しない場合に限って通知が送られる。


    5.登録されているプロセスに通知が送られると、その登録は抹消される。プロセスは、必要であれば、mq_notifyを呼び出して再登録をしなければならない。



    Posixシグナル:非同期シグナルに対して安全な関数
    シグナルハンドラから、mq_notify、mq_receive, printfを呼び出してはならない。
    Posixでは、シグナルハンドラから呼び出すことの出来る関数を、非同期シグナル安全と呼んでいる。

    //mq_notifyサンプル
    
    #include <mqueue.h>
    #include <stdio.h>
    #include <signal.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <sys/stat.h>
    
    
    #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    
    volatile sig_atomic_t mqflag; /* シグナルハンドラが非ゼロ値に設定 */
    static void sig_usr1(int);
    
    
    int
    main()
    {
        mqd_t mqd;
        void *buff;
        ssize_t n;
        sigset_t zeromask, newmask, oldmask;
        struct mq_attr attr;
        struct sigevent sigev;    
        mqd = mq_open("/mq",
                O_RDONLY | O_NONBLOCK | O_CREAT,
                FILE_MODE,
                NULL);    
        mq_getattr(mqd, &attr);
        buff = malloc(attr.mq_msgsize);
        sigemptyset(&zeromask);
        sigemptyset(&newmask);
        sigemptyset(&oldmask);
        sigaddset(&newmask, SIGUSR1);
    
        signal(SIGUSR1, sig_usr1);
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
        mq_notify(mqd, &sigev);
    
        for (;;) {
            sigprocmask(SIG_BLOCK, &newmask, &oldmask);//SIGUSR1のブロック
            while (mqflag == 0)
                sigsuspend(&zeromask);
            mqflag = 0;
       
            mq_notify(mqd, &sigev);
            while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                printf("read %ld bytes\n", (long)n);
            }
            if (errno != EAGAIN) {
                perror("mq_receive");
            }
            sigprocmask(SIG_UNBLOCK, &newmask, NULL);//SIGUSR1のブロック解除
        }
    
        return 0;
    }
    
    static void
    sig_usr1(int signo)
    {
        mqflag = 1;
        return;
    }