Posixメッセージキュー
Introduction
PosixメッセージキューとSystem V メッセージキューの違い
キュー上のメッセージの属性
mq_open,mq_close,mq_unlink関数
mq_open関数は、新しいメッセージキューを作成するか、既存のメッセージキューをオープンする。
#include <mqueue.h> mqd_t mq_open(const char *name,int oflag,... /* mode_t mode,struct mq_attr *attr */); 戻り値: 成功ならメッセージキューディスクリプタを返す。失敗なら-1
オープンされているメッセージキューをクローズするにはmq_closeを用いる。
#include <mqueue.h> int mq_close(mqd_t mqdes); 戻り値: 成功なら0,エラーなら-1
この関数は、呼び出したプロセスは指定したディスクリプタをそれ以降使用することは出来ないが、メッセージキュー自体はシステムから削除されないという点で、closeと似ている。プロセス終了時には、オープンされているメッセージキューはクローズされる。
mq_openの引数に用いたnameをシステムから削除するには、mq_unlinkを呼び出す必要がある。
#include <mqueue.h> int mq_unlink(const char *name); 戻り値: 成功なら0,エラーなら-1
メッセージキューは(ファイルのように)、オープンされている数を参照カウントとして保持し、参照カウントがゼロより大きくてもnameをシステムから削除することができるが、名前の削除ではなくキュー自身の削除は、最後のmq_closeが実行されるまで行われない。
Posixメッセージキューは、少なくともカーネル持続性を持つ。これはキューをオープンしているプロセスが存在しなくとも、mq_unlinkによって削除され、かつ参照カウントがゼロに達するまで、その内容と共に存在を続けるということ。
//メッセージキューを作成する //PosixのIPC名は、スラッシュで始める。 #include <mqueue.h> #include <stdio.h> #include <sys/stat.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) int main() { mqd_t mqd; int flags; flags = O_RDWR | O_CREAT; mqd = mq_open("/mq",flags,FILE_MODE,NULL); mq_close(mqd); return 0; } ///////////////////////////////// //gccで-lrtでリンクしてビルドする //gcc -lrt mq_open.c
//システムからメッセージキューを削除する #include <mqueue.h> #include <stdio.h> int main() { mq_unlink("/mq"); return 0; } ///////////////////////////////// //gccで-lrtでリンクしてビルドする //gcc -lrt mq_unlink.c
mq_getattr関数とmq_setattr関数
各メッセージキューは4種類の属性を持つ。これら属性の値は、mq_getattr関数で取得することが出来る。1つの属性は、mq_setattr関数で値を設定することが出来る。
int mq_getattr(mqd_t mqdes,struct mq_attr *attr); int mq_setattr(mqd_t mqdes,const struct mq_attr *attr,n struct mq_attr *oattr); 戻り値:2関数とも成功なら0,エラーなら-1
mq_attr構造体 struct mq_attr { long mq_flags;//メッセージキューフラグ:0またはO_NONBLOCK long mq_maxmsg;//キューは保持できる最大メッセージ数 long mq_msgsize;//メッセージの最大長(バイト数) long mq_curmsgs;//キュー内の現在のメッセージ数 };
mq_setattrは、実際にはmq_attr構造体のmq_flagsメンバのみが使用される。mq_flagsを用いて非ブロッキングフラグの設定・解除が行われる。mq_maxmsgとmq_msgsizeはキューの作成時にしか設定することはできない。mq_curmsgsは取得できるが、設定できない。
oattrポインタは、非ヌルであれば、キューの現在のステータスに加えて、以前の属性(mq_flags,mq_maxmsg,mq_msgsize)が返される。
//メッセージキューの属性を取得して表示 //mq_getattrのサンプル #include <mqueue.h> #include <stdio.h> #include <string.h> #include <sys/stat.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) int main() { mqd_t mqd; struct mq_attr attr; int ret; memset(&attr,0,sizeof(attr)); mqd = mq_open("/mq",O_RDWR | O_CREAT,FILE_MODE,NULL); printf("mqd = %d\n",mqd); ret = mq_getattr(mqd,&attr); printf("ret = %d\n",ret); printf("max #msgs = %ld,max #bytes/msg = %ld," "#currently on queue = %ld\n",n attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); mq_close(mqd); return 0; } /////////////////////////////// //ビルド gcc -rlt mq_getattr.c ////////////////////////////// //実行結果 max #msgs = 10,max #bytes/msg = 8192,#currently on queue = 0
mq_send関数とmq_receive関数
#include <mqueue.h> int mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio); ssize_t mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);
//mq_send()サンプル #include <mqueue.h> #include <stdio.h> #include <stdlib.h> #include <string.h> int main() { mqd_t mqd; void *ptr; unsigned int prio; char *msg; msg = "Hello World!"; prio = 10; mqd = mq_open("/mq",O_WRONLY); ptr = calloc(strlen(msg) + 1,sizeof(char)); strcpy(ptr,msg); mq_send(mqd,ptr,strlen(ptr),prio); return 0; }
//mq_receive()サンプル #include <mqueue.h> #include <stdio.h> #include <stdlib.h> int main() { mqd_t mqd; unsigned int prio; ssize_t n; void *buff; int flags; struct mq_attr attr; int i; char *p; flags = O_RDONLY; mqd = mq_open("/mq",flags); mq_getattr(mqd,&attr); buff = malloc(attr.mq_msgsize); n = mq_receive(mqd,buff,attr.mq_msgsize,&prio); printf("read %ld,bytes,priority = %u\n",n (long)n,prio); p = (char*)buff; for (i = 0; i < n; i++) putchar(p[i]); return 0; }
メッセージキューの量的制限
#include <unistd.h> #include <stdio.h> int main() { printf("MQ_OPEN_MAX = %ld,MQ_PRIO_MAX = %ld\n",n sysconf(_SC_MQ_OPEN_MAX),n sysconf(_SC_MQ_PRIO_MAX)); return 0; }
Fedora 5での実行結果
MQ_OPEN_MAX = -1,MQ_PRIO_MAX = 32768
mq_notify
System Vメッセージキューの問題点の1つは、メッセージキューに置かれたことをプロセスに通知する手段がないこと。よってメッセージの到着を判定するにはポーリングする必要があり、これはCPU時間の無駄使い。Posixメッセージキューでは、空のメッセージキューにメッセージが置かれたときに、以下の2種類の非同期イベント通知が行える。
1.シグナルの生成
2.あらかじめ指定された関数を実行するスレッドの生成
#include <mqueue.h> int mq_notify(mqd_t mqdes,const struct sigevent *notification); 戻り値: 成功なら0,エラーなら-1
union sigval { int sigval_int; /* 整数値 */ void *sigval_ptr; /* ポインタ値 */ }; struct sigevent { int sigev_notify; /* SIGEV_NONE,SIGNAL,THREAD */ int sigev_signo; /* SIGEV_SIGNALの場合はシグナル番号 */ union sigval sigev_value; /* シグナルハンドラまたはスレッドに渡される。 */ };
mq_notifyに関する一般的な規則
1.notification引数が非ヌルであった場合、指定したキューが空の状態の時にメッセージが到着した際に、その通知をプロセスが要求していることを意味する。これをプロセスがそのキューの通知先に登録されていると呼ぶ。
2.notification引数がヌルポインタで、かつ指定したキューにおいて、プロセスが現在の通知先に登録されている場合、その登録は抹消される。
3.あるキューにおいて、同時に通知先に登録できるのは、1つのプロセスに限られる。
4.空のキューにメッセージが到着し、かつあるプロセスがそのキューの通知先に登録されている場合、mq_receiveの呼び出しでブロックしているスレッドが存在しない場合に限って通知が送られる。
5.登録されているプロセスに通知が送られると、その登録は抹消される。プロセスは、必要であれば、mq_notifyを呼び出して再登録をしなければならない。
Posixシグナル:非同期シグナルに対して安全な関数
シグナルハンドラから、mq_notify、mq_receive, printfを呼び出してはならない。
Posixでは、シグナルハンドラから呼び出すことの出来る関数を、非同期シグナル安全と呼んでいる。
//mq_notifyサンプル #include <mqueue.h> #include <stdio.h> #include <signal.h> #include <errno.h> #include <stdlib.h> #include <sys/stat.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) volatile sig_atomic_t mqflag; /* シグナルハンドラが非ゼロ値に設定 */ static void sig_usr1(int); int main() { mqd_t mqd; void *buff; ssize_t n; sigset_t zeromask, newmask, oldmask; struct mq_attr attr; struct sigevent sigev; mqd = mq_open("/mq", O_RDONLY | O_NONBLOCK | O_CREAT, FILE_MODE, NULL); mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); sigemptyset(&zeromask); sigemptyset(&newmask); sigemptyset(&oldmask); sigaddset(&newmask, SIGUSR1); signal(SIGUSR1, sig_usr1); sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqd, &sigev); for (;;) { sigprocmask(SIG_BLOCK, &newmask, &oldmask);//SIGUSR1のブロック while (mqflag == 0) sigsuspend(&zeromask); mqflag = 0; mq_notify(mqd, &sigev); while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) { printf("read %ld bytes\n", (long)n); } if (errno != EAGAIN) { perror("mq_receive"); } sigprocmask(SIG_UNBLOCK, &newmask, NULL);//SIGUSR1のブロック解除 } return 0; } static void sig_usr1(int signo) { mqflag = 1; return; }