unix网络编程:Posix 消息队列

概述

一个进程可以可以往某个队列中写入一些消息,然后终止,另一个进程在以后的某个时刻从队列中读取,这是因为消息队列具有随内核的持续性(可以简单理解为系统的一个开关机周期),管道和 FIFO 则不同,当一个管道或 FIFO 最后一次关闭发生时,其中的数据会被丢弃。

本次讲解 Posix 消息队列,后续会讲解 System V 消息队列,两者在设计上有很大的相似性,主要差别如下:

  • Posix 消息队列总是返回优先级最高的的最早的一条消息,而 System V 消息队列则可以返回任意指定优先级的消息。

  • 当向一个空队列放置一个消息时,Posix 消息队列允许产生一个信号或启动一个线程,而 System V 则没有类似的机制。

  • Posix 消息队列的中消息的优先级使用一个无符号整数表示,而 System V 则使用一个长整数类型。

消息队列不同于管道和FIFO,后者是流式结构,没有固定的消息边界,在读写时需要用户自己处理。消息队列则是使用一个链表结构,链表的头中含有当前队列的最大长度以及每个消息的最大大小:

mq_open

mq_open 函数创建一个新的消息队列或打开一个已存在的消息队列。

code c
#include <mqueue.h>
mqd_t mq_open(const char * name, int oflag, ... /* mode_t mode, struct mq_attr * attr */);

oflag 参数是 O_RDONLYO_WRONLYO_RDWR 之一,还可以按位或上 O_CREATO_EXCLO_NONBLOCK。当实际操作是创建一个新队列时(已指定 O_CREAT 标志,且所请求的消息队列尚未存在),modeattr 参数是需要的,attr 如果为空指针则使用默认值。

mq_open 的返回值称为消息队列描述符,但它不必是像文件描述符或套接字描述符那样的短整数,这个值将用作其余 7 个消息队列函数的第一个参数。

#include <mqueue.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <getopt.h>
#include <stdlib.h>

#define	FILE_MODE	(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)

int main(int argc, char** argv) {
  int flags = O_RDWR | O_CREAT;
  int opt_index = 0;
  struct option opts[] = {
    { "maxmsg", required_argument, NULL, 'm' },
    { "msgsize", required_argument, NULL, 's' },
  };
  struct mq_attr attr = { 0, 10, 8192 };
  int c;
  while ((c = getopt_long(argc, argv, "hem:s:", opts, &opt_index)) != -1) {
    switch (c) {
    case 'h':
      printf(
        "Options:\n"
        "  -h, --help\n"
        "  -m, --maxmsg=mq_maxmsg Set mq_maxmsg\n"
        "  -s, --msgsize=msgsize Set mq_msgsize\n"
        "  -e, Check file exists, if exists then exit\n"
        "Usage:\n"
        "  mq_create -m 12 -s 100 -e /some.queue\n"
      );
      exit(0);
    case 'e':
      flags |= O_EXCL;
      break;
    case 'm':
      attr.mq_maxmsg = atoi(optarg);
      break;
    case 's':
      attr.mq_msgsize = atoi(optarg);
      break;
    }
  }
  if (optind != argc - 1) {
    printf("usage: mqcreate [ -e ] <name>\n");
    return -1;
  }
  mqd_t mqd = mq_open(argv[optind], flags, FILE_MODE, &attr);
  if (mqd < 0) {
    printf("open message queue %s error [%s]\n", argv[optind], strerror(errno));
    return -1;
  }
  mq_close(mqd);
  return 0;
}

上面代码中使用了 getopt_long 来解析命令行传参,具体介绍可以参考 C 语言处理命令行参数

Makefile Makefile
# $@: 目标文件
# $^: 所有的依赖文件
# $<: 第一个依赖文件。

.c.o:
	gcc -g -c $< -lrt

objects = $(patsubst %.c,%, $(wildcard *.c))

all: $(objects)

mq_create: mq_create.o
	gcc -g -o $@ $^ -lrt

clean:
	-rm -rf *.o $(objects)

由于 mq 系列函数不在标准库中,所以在编译时需额外指定链接库 -lrt

[root@fangjin posix]# make mq_create
gcc -g -c mq_create.c -lrt
gcc -g -o mq_create mq_create.o -lrt
[root@fangjin posix]# ./mq_create -h
Options:
  -h, --help
  -m, --maxmsg=mq_maxmsg Set mq_maxmsg
  -s, --msgsize=msgsize Set mq_msgsize
  -e, Check file exists, if exists then exit
Usage:
  mq_create -m 12 -s 100 -e /some.queue
[root@fangjin posix]# ./mq_create -m 100 -s 30 /test.queue
[root@fangjin posix]# ./mq_create -e -m 100 -s 30 /test.queue # 指定 -e 参数后创建失败
open message queue /test.queue error [File exists]
[root@fangjin posix]# ls /dev/mqueue/
test.queue

Linux 下的 Posix 消息队列是在 vfs 中创建的,可以用:

code shell
mount -t mqueue none /dev/mqueue

将消息队列挂在在 /dev/mqueue 目录下,便于查看。操作系统重启后该挂载会失效,需手动再次执行该命令。

mq_close

code c
#include <mqueue.h>
int mq_close(mqd_t mqdes);

其功能与关闭一个已打开文件的 close 函数类似:调用进程可以不再使用该描述符,但其消息队列并不从系统中删除。一个进程终止时,它的所有打开着的消息队列都关闭,就像调用了 mq_close 一样。

code c
#include <mqueue.h>
int mq_unlink(const char *name);

每个消息队列有一个保存其当前打开着描述符数的引用计数器,当一个消息队列的引用计数扔大于 0 时,其 name 就能删除,但是改队列的析构(这与从系统中删除 name 不同)要到最后一个 mq_close 发生时才进行。

#include <mqueue.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

int main(int argc, char** argv) {
  if (argc < 2) {
    printf("please enter message queue name to be delete.\n");
    return -1;
  }
  if (mq_unlink(argv[1]) < 0) {
    printf("message queue unlink error: %s\n", strerror(errno));
  }
  return 0;
}

编译过程可参考上文 Makefile,后续代码使用方式类似,后文不再赘述,读者可根据自身需要自行更改。

[root@fangjin posix]# make mq_unlink
gcc -g -c mq_unlink.c -lrt
gcc -g -o mq_unlink mq_unlink.o -lrt
[root@fangjin posix]# ./mq_unlink /test.queue
[root@fangjin posix]# ls /dev/mqueue/
[root@fangjin posix]#

mq_setattrr

code c
#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);

mq_attr 的结构含有以下属性:

struct mq_attr {
  // message quque flag: 0, O_NONBLOCK
  long mq_flags;

  // max number of messages allowed on queue
  long mq_maxmsg;

  // max size of a message (in bytes)
  long mq_msgsize;

  // number of messages currenty on queue
  long mq_curmsgs;
}

指向某个 mq_attr 结构的指针可以作为 mq_open 的第四个参数传递,从而在创建消息队列时指定 mq_maxmsg 和 mq_msgsize 属性,mq_open 忽略该结构的另外两个参数。

mq_setattr 可以为指定队列设置属性,但是只有 mq_flags 生效,其余字段将被忽略。

mq_getattr

code c
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
#include <stdio.h>
#include <mqueue.h>
#include <string.h>
#include <errno.h>
#define	FILE_MODE	(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)

int main(int argc, char** argv) {
  if (argc < 2) {
    printf("please enter message queue name to be getattr.\n");
    return -1;
  }
  mqd_t mqd = mq_open(argv[1], O_RDONLY, FILE_MODE, NULL);
  if (mqd < 0) {
    printf("open message queue %s fail: %s\n", argv[1], strerror(errno));
    return -1;
  }
  struct mq_attr attr;
  mq_getattr(mqd, &attr);
  printf("flags = %ld, max #msgs = %ld, max #bytes/msg = %ld, #currently on queue = %ld\n",             
    attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
  mq_close(mqd);
  return 0;
}
[root@fangjin posix]# make mq_getattr
gcc -g -c mq_getattr.c -lrt
gcc -g -o mq_getattr mq_getattr.o -lrt
[root@fangjin posix]# ./mq_create -m 100 -s 30 /test.queue
[root@fangjin posix]# ./mq_getattr /test.queue
flags = 0, max #msgs = 100, max #bytes/msg = 30, #currently on queue = 0

mq_send

code c
#include <mqueue.h>

// 成功返回 0,失败返回 -1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

mq_send 用于往一个队列中放置一个消息,prio 参数是待发送消息的优先级,其值必须小于 MQ_PRIO_MAX,如果该值是一个非空指针,所返回消息的优先级就通过该指针存放。如果应用不必使用优先级不同的消息,那就给 mq_send 指定值为 0 的优先级。

#include <stdio.h>
#include <getopt.h>
#include <errno.h>
#include <string.h>
#include <mqueue.h>
#include <stdlib.h>

int main(int argc, char** argv) {
  int c;
  struct option opts[] = {
    { "name", required_argument, NULL, 'n' },
    { "prio", required_argument, NULL, 'p' },
  };
  int opt_index = 0;
  int prio = 0;
  char* name = NULL;
  while ((c = getopt_long(argc, argv, "hn:p:c:", opts, &opt_index)) != -1) {
    switch (c) {
    case 'h':
      printf(
        "Options:\n"
        "  -h, --help\n"
        "  -n, --name=name  Message queue name\n"
        "  -p, --prio=number  Set message priority\n"
        "Usage:\n"
        "  mq_send -n /some.queue -p 4 -c hello!\n"
      );
      exit(0);
    case 'p':
      prio = atoi(optarg);
      break;
    case 'n':
      name = optarg;
      break;
    default:
      break;
    }
  }
  if (name == NULL) {
    printf("please enter message queue name\n");
    return -1;
  }
  if (optind == argc) {
    printf("please enter content\n");
    return -1;
  }
  mqd_t mqd = mq_open(name, O_WRONLY);
  struct mq_attr attr;
  mq_getattr(mqd, &attr);
  for (int i = optind; i < argc; i++) {
    size_t len = strlen(argv[i]);
    if (len > attr.mq_msgsize) {
      printf("msgsize max %ld, but now %ld\n", attr.mq_msgsize, len);
      return -1;
    }
    if (mq_send(mqd, argv[i], len, prio) == -1) {
      printf("send content faild, %s\n", strerror(errno));
      return -1;
    }
  }
  return 0;
}
[root@fangjin posix]# ./mq_send -h
Options:
  -h, --help
  -n, --name=name  Message queue name
  -p, --prio=number  Set message priority
Usage:
  mq_send -n /some.queue -p 4 -c hello!
[root@fangjin posix]# ./mq_send -n /test.queue hello!

mq_receive

code c
#include <mqueue.h>

// 成功返回消息中的字节数,失败返回 -1
int mq_send(mqd_t mqdes, char *ptr, size_t len, unsigned int * prio);
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <errno.h>

int main(int argc, char** argv) {
  int c;
  int opt_index = 0;
  struct option opts[] = {
    { "name", required_argument, NULL, 'n' },
    { "max", required_argument, NULL, 'm' },
  };
  int prio = 0;
  char* name = NULL;
  int flag = O_RDONLY;
  while ((c = getopt_long(argc, argv, "hn:p:m:b", opts, &opt_index)) != -1) {
    switch (c) {
    case 'h':
      printf("Options:\n"
        "  -h, --help\n"
        "  -n, --name=name  Message queue name\n"
        "  -p, --prio=number  Set message priority\n"
        "Usage:\n"
        "  mq_receive -n /some.queue -p 4\n"
      );
      exit(0);
    case 'n':
      name = optarg;
      break;
    case 'p':
      prio = atoi(optarg);
      break;
    case 'b':
      flag |= O_NONBLOCK;
      break;
    default:
      break;
    }
  }
  if (name == NULL) {
    printf("please enter message queue name\n");
    return -1;
  }
  mqd_t mqd = mq_open(name, flag);
  struct mq_attr attr;
  mq_getattr(mqd, &attr);
  char* content = malloc(attr.mq_msgsize);
  while (1) {
    int len = mq_receive(mqd, content, attr.mq_msgsize, &prio);
    if (len == -1) {
      printf("receive content faild, %s\n", strerror(errno));
      return -1;
    }
    content[len] = '\0';
    printf("%s\n", content);
  }
  return 0;
}
[root@fangjin posix]# ./mq_receive -h
Options:
  -h, --help
  -n, --name=name  Message queue name
Usage:
  mq_receive -n /some.queue -p 4
[root@fangjin posix]# ./mq_receive -n /test.queue
hello!

mq_notify

此章节涉及后续高级主题,暂时略过,待后续内容完成后再来补充。