概述
一个进程可以可以往某个队列中写入一些消息,然后终止,另一个进程在以后的某个时刻从队列中读取,这是因为消息队列具有随内核的持续性(可以简单理解为系统的一个开关机周期)
,管道和 FIFO 则不同,当一个管道或 FIFO 最后一次关闭发生时,其中的数据会被丢弃。
本次讲解 Posix 消息队列,后续会讲解 System V 消息队列,两者在设计上有很大的相似性,主要差别如下:
-
Posix 消息队列总是返回优先级最高的的最早的一条消息,而 System V 消息队列则可以返回任意指定优先级的消息。
-
当向一个空队列放置一个消息时,Posix 消息队列允许产生一个信号或启动一个线程,而 System V 则没有类似的机制。
-
Posix 消息队列的中消息的优先级使用一个无符号整数表示,而 System V 则使用一个长整数类型。
消息队列不同于管道和FIFO,后者是流式结构,没有固定的消息边界,在读写时需要用户自己处理。消息队列则是使用一个链表结构,链表的头中含有当前队列的最大长度以及每个消息的最大大小:
mq_open
mq_open
函数创建一个新的消息队列或打开一个已存在的消息队列。
#include <mqueue.h> mqd_t mq_open(const char * name, int oflag, ... /* mode_t mode, struct mq_attr * attr */);
oflag
参数是 O_RDONLY
、O_WRONLY
或 O_RDWR
之一,还可以按位或上 O_CREAT
、O_EXCL
或 O_NONBLOCK
。当实际操作是创建一个新队列时(已指定 O_CREAT
标志,且所请求的消息队列尚未存在),mode
和 attr
参数是需要的,attr
如果为空指针则使用默认值。
mq_open 的返回值称为消息队列描述符,但它不必是像文件描述符或套接字描述符那样的短整数,这个值将用作其余 7 个消息队列函数的第一个参数。
#include <mqueue.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <getopt.h> #include <stdlib.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) int main(int argc, char** argv) { int flags = O_RDWR | O_CREAT; int opt_index = 0; struct option opts[] = { { "maxmsg", required_argument, NULL, 'm' }, { "msgsize", required_argument, NULL, 's' }, }; struct mq_attr attr = { 0, 10, 8192 }; int c; while ((c = getopt_long(argc, argv, "hem:s:", opts, &opt_index)) != -1) { switch (c) { case 'h': printf( "Options:\n" " -h, --help\n" " -m, --maxmsg=mq_maxmsg Set mq_maxmsg\n" " -s, --msgsize=msgsize Set mq_msgsize\n" " -e, Check file exists, if exists then exit\n" "Usage:\n" " mq_create -m 12 -s 100 -e /some.queue\n" ); exit(0); case 'e': flags |= O_EXCL; break; case 'm': attr.mq_maxmsg = atoi(optarg); break; case 's': attr.mq_msgsize = atoi(optarg); break; } } if (optind != argc - 1) { printf("usage: mqcreate [ -e ] <name>\n"); return -1; } mqd_t mqd = mq_open(argv[optind], flags, FILE_MODE, &attr); if (mqd < 0) { printf("open message queue %s error [%s]\n", argv[optind], strerror(errno)); return -1; } mq_close(mqd); return 0; }
上面代码中使用了
getopt_long
来解析命令行传参,具体介绍可以参考 C 语言处理命令行参数
# $@: 目标文件 # $^: 所有的依赖文件 # $<: 第一个依赖文件。 .c.o: gcc -g -c $< -lrt objects = $(patsubst %.c,%, $(wildcard *.c)) all: $(objects) mq_create: mq_create.o gcc -g -o $@ $^ -lrt clean: -rm -rf *.o $(objects)
由于 mq 系列函数不在标准库中,所以在编译时需额外指定链接库
-lrt
[root@fangjin posix]# make mq_create gcc -g -c mq_create.c -lrt gcc -g -o mq_create mq_create.o -lrt [root@fangjin posix]# ./mq_create -h Options: -h, --help -m, --maxmsg=mq_maxmsg Set mq_maxmsg -s, --msgsize=msgsize Set mq_msgsize -e, Check file exists, if exists then exit Usage: mq_create -m 12 -s 100 -e /some.queue [root@fangjin posix]# ./mq_create -m 100 -s 30 /test.queue [root@fangjin posix]# ./mq_create -e -m 100 -s 30 /test.queue # 指定 -e 参数后创建失败 open message queue /test.queue error [File exists] [root@fangjin posix]# ls /dev/mqueue/ test.queue
Linux 下的 Posix 消息队列是在 vfs 中创建的,可以用:
mount -t mqueue none /dev/mqueue
将消息队列挂在在 /dev/mqueue 目录下,便于查看。操作系统重启后该挂载会失效,需手动再次执行该命令。
mq_close
#include <mqueue.h> int mq_close(mqd_t mqdes);
其功能与关闭一个已打开文件的 close 函数类似:调用进程可以不再使用该描述符,但其消息队列并不从系统中删除。一个进程终止时,它的所有打开着的消息队列都关闭,就像调用了 mq_close 一样。
mq_unlink
#include <mqueue.h> int mq_unlink(const char *name);
每个消息队列有一个保存其当前打开着描述符数的引用计数器,当一个消息队列的引用计数扔大于 0 时,其 name 就能删除,但是改队列的析构(这与从系统中删除 name 不同)要到最后一个 mq_close 发生时才进行。
#include <mqueue.h> #include <stdio.h> #include <string.h> #include <errno.h> int main(int argc, char** argv) { if (argc < 2) { printf("please enter message queue name to be delete.\n"); return -1; } if (mq_unlink(argv[1]) < 0) { printf("message queue unlink error: %s\n", strerror(errno)); } return 0; }
编译过程可参考上文 Makefile,后续代码使用方式类似,后文不再赘述,读者可根据自身需要自行更改。
[root@fangjin posix]# make mq_unlink gcc -g -c mq_unlink.c -lrt gcc -g -o mq_unlink mq_unlink.o -lrt [root@fangjin posix]# ./mq_unlink /test.queue [root@fangjin posix]# ls /dev/mqueue/ [root@fangjin posix]#
mq_setattrr
#include <mqueue.h> int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);
mq_attr
的结构含有以下属性:
struct mq_attr { // message quque flag: 0, O_NONBLOCK long mq_flags; // max number of messages allowed on queue long mq_maxmsg; // max size of a message (in bytes) long mq_msgsize; // number of messages currenty on queue long mq_curmsgs; }
指向某个 mq_attr
结构的指针可以作为 mq_open 的第四个参数传递,从而在创建消息队列时指定 mq_maxmsg 和 mq_msgsize 属性,mq_open 忽略该结构的另外两个参数。
mq_setattr 可以为指定队列设置属性,但是只有 mq_flags 生效,其余字段将被忽略。
mq_getattr
#include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
#include <stdio.h> #include <mqueue.h> #include <string.h> #include <errno.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) int main(int argc, char** argv) { if (argc < 2) { printf("please enter message queue name to be getattr.\n"); return -1; } mqd_t mqd = mq_open(argv[1], O_RDONLY, FILE_MODE, NULL); if (mqd < 0) { printf("open message queue %s fail: %s\n", argv[1], strerror(errno)); return -1; } struct mq_attr attr; mq_getattr(mqd, &attr); printf("flags = %ld, max #msgs = %ld, max #bytes/msg = %ld, #currently on queue = %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); mq_close(mqd); return 0; }
[root@fangjin posix]# make mq_getattr gcc -g -c mq_getattr.c -lrt gcc -g -o mq_getattr mq_getattr.o -lrt [root@fangjin posix]# ./mq_create -m 100 -s 30 /test.queue [root@fangjin posix]# ./mq_getattr /test.queue flags = 0, max #msgs = 100, max #bytes/msg = 30, #currently on queue = 0
mq_send
#include <mqueue.h> // 成功返回 0,失败返回 -1 int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
mq_send 用于往一个队列中放置一个消息,prio
参数是待发送消息的优先级,其值必须小于 MQ_PRIO_MAX
,如果该值是一个非空指针,所返回消息的优先级就通过该指针存放。如果应用不必使用优先级不同的消息,那就给 mq_send 指定值为 0 的优先级。
#include <stdio.h> #include <getopt.h> #include <errno.h> #include <string.h> #include <mqueue.h> #include <stdlib.h> int main(int argc, char** argv) { int c; struct option opts[] = { { "name", required_argument, NULL, 'n' }, { "prio", required_argument, NULL, 'p' }, }; int opt_index = 0; int prio = 0; char* name = NULL; while ((c = getopt_long(argc, argv, "hn:p:c:", opts, &opt_index)) != -1) { switch (c) { case 'h': printf( "Options:\n" " -h, --help\n" " -n, --name=name Message queue name\n" " -p, --prio=number Set message priority\n" "Usage:\n" " mq_send -n /some.queue -p 4 -c hello!\n" ); exit(0); case 'p': prio = atoi(optarg); break; case 'n': name = optarg; break; default: break; } } if (name == NULL) { printf("please enter message queue name\n"); return -1; } if (optind == argc) { printf("please enter content\n"); return -1; } mqd_t mqd = mq_open(name, O_WRONLY); struct mq_attr attr; mq_getattr(mqd, &attr); for (int i = optind; i < argc; i++) { size_t len = strlen(argv[i]); if (len > attr.mq_msgsize) { printf("msgsize max %ld, but now %ld\n", attr.mq_msgsize, len); return -1; } if (mq_send(mqd, argv[i], len, prio) == -1) { printf("send content faild, %s\n", strerror(errno)); return -1; } } return 0; }
[root@fangjin posix]# ./mq_send -h Options: -h, --help -n, --name=name Message queue name -p, --prio=number Set message priority Usage: mq_send -n /some.queue -p 4 -c hello! [root@fangjin posix]# ./mq_send -n /test.queue hello!
mq_receive
#include <mqueue.h> // 成功返回消息中的字节数,失败返回 -1 int mq_send(mqd_t mqdes, char *ptr, size_t len, unsigned int * prio);
#include <getopt.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <mqueue.h> #include <errno.h> int main(int argc, char** argv) { int c; int opt_index = 0; struct option opts[] = { { "name", required_argument, NULL, 'n' }, { "max", required_argument, NULL, 'm' }, }; int prio = 0; char* name = NULL; int flag = O_RDONLY; while ((c = getopt_long(argc, argv, "hn:p:m:b", opts, &opt_index)) != -1) { switch (c) { case 'h': printf("Options:\n" " -h, --help\n" " -n, --name=name Message queue name\n" " -p, --prio=number Set message priority\n" "Usage:\n" " mq_receive -n /some.queue -p 4\n" ); exit(0); case 'n': name = optarg; break; case 'p': prio = atoi(optarg); break; case 'b': flag |= O_NONBLOCK; break; default: break; } } if (name == NULL) { printf("please enter message queue name\n"); return -1; } mqd_t mqd = mq_open(name, flag); struct mq_attr attr; mq_getattr(mqd, &attr); char* content = malloc(attr.mq_msgsize); while (1) { int len = mq_receive(mqd, content, attr.mq_msgsize, &prio); if (len == -1) { printf("receive content faild, %s\n", strerror(errno)); return -1; } content[len] = '\0'; printf("%s\n", content); } return 0; }
[root@fangjin posix]# ./mq_receive -h Options: -h, --help -n, --name=name Message queue name Usage: mq_receive -n /some.queue -p 4 [root@fangjin posix]# ./mq_receive -n /test.queue hello!
mq_notify
此章节涉及后续高级主题,暂时略过,待后续内容完成后再来补充。