Linux程序間通訊(IPC)程式設計實踐(十二)Posix訊息佇列--基本API的使用
posix訊息佇列與system v訊息佇列的差別:
(1)對posix訊息佇列的讀總是返回最高優先順序的最早訊息,對system v訊息佇列的讀則可以返回任意指定優先順序的訊息。(2)當往一個空佇列放置一個訊息時,posix訊息佇列允許產生一個訊號或啟動一個執行緒,system v訊息佇列則不提供類似機制。
佇列中的每個訊息具有如下屬性:
1、一個無符號整數優先順序(posix)或一個長整數型別(system v)2、訊息的資料部分長度(可以為0)
3、資料本身(如果長度大於0)
Posix訊息佇列操作函式如下:
1. 建立/獲取一個訊息佇列
mqd_t mq_open(const char *name, int oflag); //專用於開啟一個訊息佇列 mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
引數:
name: 訊息佇列名字;
oflag: 與open函式型別, 可以是O_RDONLY, O_WRONLY, O_RDWR, 還可以按位或上O_CREAT, O_EXCL, O_NONBLOCK.
mode: 如果oflag指定了O_CREAT, 需要指定mode引數;
attr: 指定訊息佇列的屬性;
返回值:
成功: 返回訊息佇列檔案描述符;
失敗: 返回-1;
注意-Posix IPC名字限制:
1. 必須以”/”開頭, 並且後面不能還有”/”, 形如:/file-name;
2. 名字長度不能超過NAME_MAX
3. 連結時:Link with -lrt(Makefile中使用實時連結庫-lrt)
2. 關閉一個訊息佇列
#include <mqueue.h>
int mq_close(mqd_t mqdes);
返回: 成功時為0,出錯時為-1。
功能: 關閉已開啟的訊息佇列。
注意:System V沒有此功能函式呼叫
3. 刪除一個訊息佇列
返回: 成功時為0,出錯時為-1int mq_unlink(const char *name); /** System V 訊息佇列 通過msgctl函式, 並將cmd指定為IPC_RMID來實現 int msgctl(int msqid, int cmd, struct msqid_ds *buf); **/
功能: 從系統中刪除訊息佇列。
對上述三個函式的綜合使用:
int main()
{
mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
cout << "mq_open success" << endl;
mq_close(mqid);
mq_unlink("/abc");
cout << "unlink success" << endl;
}
4. 獲取/設定訊息佇列屬性#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
均返回:成功時為0, 出錯時為-1引數:
newattr: 需要設定的屬性
oldattr: 原來的屬性
每個訊息佇列有四個屬性:
struct mq_attr
{
long mq_flags; /* message queue flag : 0, O_NONBLOCK */
long mq_maxmsg; /* max number of messages allowed on queue*/
long mq_msgsize; /* max size of a message (in bytes)*/
long mq_curmsgs; /* number of messages currently on queue */
};
int main(int argc,char **argv)
{
mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
cout << "Max messages on queue: " << attr.mq_maxmsg << endl;
cout << "Max message size: " << attr.mq_msgsize << endl;
cout << "current messages: " << attr.mq_curmsgs << endl;
mq_close(mqid);
return 0;
}
對比System V:
通過msgctl函式, 並將cmd指定為IPC_STAT/IPC_SET來實現
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
另外每個訊息均有一個優先順序,它是一個小於MQ_PRIO_MAX的無符號整數#define MQ_PRIO_MAX 32768
5. 傳送訊息/讀取訊息
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
返回:成功時為0,出錯為-1
返回:成功時為訊息中的位元組數,出錯為-1
引數: 最後一個是訊息的優先順序
訊息佇列的限制:
MQ_OPEN_MAX : 一個程序能夠同時擁有的開啟著訊息佇列的最大數目
MQ_PRIO_MAX : 任意訊息的最大優先順序值加1
/** 示例: 向訊息佇列中傳送訊息, prio需要從命令列引數中讀取 **/
struct Student
{
char name[36];
int age;
};
int main(int argc,char **argv)
{
if (argc != 2)
err_quit("./send <prio>");
mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
struct Student stu = {"xiaofang", 23};
unsigned prio = atoi(argv[1]);
if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1)
err_exit("mq_send error");
mq_close(mqid);
return 0;
}
/** 示例: 從訊息佇列中獲取訊息 **/
int main(int argc,char **argv)
{
mqd_t mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
struct Student buf;
int nrcv;
unsigned prio;
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
mq_close(mqid);
return 0;
}
6.建立/刪除訊息到達通知事件#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
返回: 成功時為0,出錯時為-1功能: 給指定佇列建立或刪除非同步事件通知
sigev_notify代表通知的方式: 一般常用兩種取值:SIGEV_SIGNAL, 以訊號方式通知; SIGEV_THREAD, 以執行緒方式通知
如果以訊號方式通知: 則需要設定一下兩個引數:
sigev_signo: 訊號的程式碼
sigev_value: 訊號的附加資料(實時訊號)
如果以執行緒方式通知: 則需要設定以下兩個引數:
sigev_notify_function
sigev_notify_attributes
union sigval
{
int sival_int; /* Integer value */
void *sival_ptr; /* pointer value */
};
struct sigevent
{
int sigev_notify; /* SIGEV_{ NONE, ISGNAL, THREAD} */
int sigev_signo; /* signal number if SIGEV_SIGNAL */
union sigval sigev_value; /* passed to signal handler or thread */
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attribute;
};
引數sevp:
NULL: 表示撤銷已註冊通知;
非空: 表示當訊息到達且訊息隊列當前為空, 那麼將得到通知;
通知方式:
1. 產生一個訊號, 需要自己繫結
2. 建立一個執行緒, 執行指定的函式
注意: 這種註冊的方式只是在訊息佇列從空到非空時才產生訊息通知事件, 而且這種註冊方式是一次性的!
** Posix IPC所特有的功能, System V沒有 **//**示例: 將下面程式多執行幾遍, 尤其是當訊息佇列”從空->非空”, 多次”從空->非空”, 當訊息佇列不空時執行該程式時, 觀察該程式的狀態;
**/
mqd_t mqid;
long size;
void sigHandlerForUSR1(int signo)
{
//將資料的讀取轉移到對訊號SIGUSR1的響應函式中來
struct Student buf;
int nrcv;
unsigned prio;
if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
}
int main(int argc,char **argv)
{
// 安裝訊號響應函式
if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
err_exit("signal error");
mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
// 獲取訊息的最大長度
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
size = attr.mq_msgsize;
// 註冊訊息到達通知事件
struct sigevent event;
event.sigev_notify = SIGEV_SIGNAL; //指定以訊號方式通知
event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//死迴圈, 等待訊號到來
while (true)
pause();
mq_close(mqid);
return 0;
}
/** 示例:多次註冊notify, 這樣就能過多次接收訊息, 但是還是不能從佇列非空的時候進行接收, 將程式改造如下:
**/
mqd_t mqid;
long size;
struct sigevent event;
void sigHandlerForUSR1(int signo)
{
// 注意: 是在訊息被讀走之前進行註冊,
// 不然該程式就感應不到訊息佇列"從空->非空"的一個過程變化了
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//將資料的讀取轉移到對訊號SIGUSR1的響應函式中來
struct Student buf;
int nrcv;
unsigned prio;
if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
}
int main(int argc,char **argv)
{
// 安裝訊號響應函式
if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
err_exit("signal error");
mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
// 獲取訊息的最大長度
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
size = attr.mq_msgsize;
// 註冊訊息到達通知事件
event.sigev_notify = SIGEV_SIGNAL; //指定以訊號方式通知
event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//死迴圈, 等待訊號到來
while (true)
pause();
mq_close(mqid);
return 0;
}
mq_notify 注意點總結:
1. 任何時刻只能有一個程序可以被註冊為接收某個給定佇列的通知;
2. 當有一個訊息到達某個先前為空的佇列, 而且已有一個程序被註冊為接收該佇列的通知時, 只有沒有任何執行緒阻塞在該佇列的mq_receive呼叫的前提下, 通知才會發出;
3. 當通知被髮送給它的註冊程序時, 該程序的註冊被撤銷. 程序必須再次呼叫mq_notify以重新註冊(如果需要的話),但是要注意: 重新註冊要放在從訊息佇列讀出訊息之前而不是之後(如同示例程式);
非同步訊號安全函式
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
可以使用sigwait函式代替訊號處理程式的訊號通知,將訊號阻塞到某個函式中,僅僅等待該訊號的遞交。採用sigwait實現上面的程式如下:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
int main(int argc,char *argv[])
{
mqd_t mqd;
int signo;
void *buff;
ssize_t n;
sigset_t newmask;
struct mq_attr attr;
struct sigevent sigev;
if(argc != 2)
{
printf("usage :mqnotify <name>");
exit(0);
}
mqd = mq_open(argv[1],O_RDONLY);
mq_getattr(mqd,&attr);
buff = malloc(attr.mq_msgsize);
sigemptyset(&newmask);
sigaddset(&newmask,SIGUSR1);
sigprocmask(SIG_BLOCK,&newmask,NULL);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
if(mq_notify(mqd,&sigev) == -1)
{
perror("mq_notify error");
exit(-1);
}
for(; ;)
{
sigwait(&newmask,&signo); //阻塞並等待該訊號
if(signo == SIGUSR1)
{
mq_notify(mqd,&sigev);
while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
printf("read %ld bytes\n",(long) n);
if(errno != EAGAIN)
{
perror("mq_receive error");
exit(-1);
}
}
}
eixt(0);
}
啟動執行緒處理訊息通知,程式如下:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc,char *argv[])
{
if(argc != 2)
{
printf("usage :mqnotify <name>");
exit(0);
}
mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
mq_getattr(mqd,&attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
if(mq_notify(mqd,&sigev) == -1)
{
perror("mq_notify error");
exit(-1);
}
for(; ;)
{
pause();
}
eixt(0);
}
static void notify_thread(union sigval arg)
{
ssize_t n;
void *buff;
printf("notify_thread started\n");
buff = malloc(attr.mq_msgsize);
mq_notify(mqd,&sigev);
while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
printf("read %ld bytes\n",(long) n);
if(errno != EAGAIN)
{
perror("mq_receive error");
exit(-1);
}
free(buff);
pthread_exit(NULL);
}
附-檢視已經成功建立的Posix訊息佇列
#其存在與一個虛擬檔案系統中, 需要將其掛載到系統中才能檢視
Mounting the message queue filesystem On Linux, message queues are created in a virtual filesystem.
(Other implementations may also provide such a feature, but the details are likely to differ.) This
file system can be mounted (by the superuser, 注意是使用root使用者才能成功) using the following commands:
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
還可以使用cat檢視該訊息佇列的狀態, rm刪除:
cat /dev/mqueue/abc
rm abc
還可umount該檔案系統
umount /dev/mqueue