实验报告
实验内容
进程间通信—消息机制。
(1) 编译运行课件 Lecture 09 例程代码:Algorithms 9-1 ~ 9-2.
(2) 修改代码,观察在 msgsnd 和 msgrcv 并发执行情况下消息队列的变化情况。
(3) 仿照 alg.8-4~8-6,编制基于 POSIX API 的进程间消息发送和消息接收例程。
实验环境
Ubuntu 20.04.2.0(64位)
实验过程
一. alg.9-0 ~ alg.9-2
(一)对代码中不熟悉的内容进行解析:
msgget()
(参考资料)
函数原型:int msgget(key_t key, int msgflg);
函数功能: 用来创建和访问一个消息队列。
参数:
key:IPC key 。
msgflg: 权限标志。
返回值: 成功返回文件标识符,失败返回-1。
fopen()
(参考资料)
函数原型:FILE *fopen(char *filename, *type);
函数功能: 用于打开文件。
参数:
FILE: 一个包括了文件管理有关信息的数据结构。
filename: 表示文件名, 可以包含路径和文件名两部分。
type: 表示打开文件的类型。参数:
"r" 打开文字文件只读
"w" 创建文字文件只写
"a" 增补, 如果文件不存在则创建一个
"r+" 打开一个文字文件读/写
"w+" 创建一个文字文件读/写
"a+" 打开或创建一个文件增补
"b" 二进制文件(可以和上面每一项合用)
"t" 文本文件(默认项)"rb" 表示打开文字文件只读的二进制文件
返回值: 成功返回文件指针,失败返回NULL。
msgctl()
(参考资料)
函数原型:int msgctl(int msgid, int command, struct msgid_ds *buf);
函数功能: 用来控制消息队列,它与共享内存的shmctl函数相似。
参数:
msgid:消息队列对象的标识符。
command: 是将要采取的动作。它可以取3个值:
IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖 msgid_ds 的值。IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值IPC_RMID:删除消息队列
buf: 指向 msgid_ds 结构的指针,它指向消息队列模式和访问权限的结构。
返回值: 成功返回0,失败返回-1
feof()
(参考资料)
函数原型:int feof(FILE *stream);
函数功能: 检测流上的文件结束符的函数。
参数:
stream:指向 FILE 对象的指针,该 FILE 对象标识了流。
返回值: 当检测到与流关联的文件结束标识符时,该函数返回一个非零值,否则返回零。
fscanf
(参考资料)
函数原型:int fscanf(FILE *stream, const char *format, ...);
函数功能: 从流 stream 读取格式化输入。
返回值:
stream: 指向 FILE 对象的指针,该 FILE 对象标识了流。
format: C 字符串。
例:
代码中
fscanf(fp, "%ld %s", &msg_type, buffer);第一次循环:
msg_type = 1
buffer = "Luffy"
msgsnd()
(参考资料)
函数原型:int msgsnd(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
函数功能: 用来把消息添加到消息队列中。
参数:
msgid:消息队列对象的标识符。
msg_ptr: 一个指向准备发送消息的指针。
msg_sz: msg_ptr指向的消息的长度。
msgflg: 控制函数行为的标志。
0: 阻塞方式IPC_NOWAIT:非阻塞方式
返回值: 消息数据的一份副本将被放到消息队列中,并返回0,失败时返回-1。
fclose()
(参考资料)
函数原型:int fclose(FILE *stream);
函数功能: 关闭流 stream,刷新所有的缓冲区。
参数:
stream: 指向 FILE 对象的指针,该 FILE 对象指定了要被关闭的流。
返回值: 如果流成功关闭,则该方法返回零。如果失败,则返回 EOF。
msgrcv()
(参考资料)
函数原型:int msgrcv(int msgid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
函数功能: 用来从一个消息队列获取消息。
参数:
msgid:消息队列对象的标识符。
msg_ptr: 一个指向准备发送消息的指针。
msg_sz: msg_ptr指向的消息的长度。
msgtype: 可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。
msgflg: 控制函数行为的标志。
0: 阻塞方式IPC_NOWAIT:非阻塞方式
返回值: 调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1。
(二)运行
-
查看本机的消息队列大小限制
-
运行alg.9-1
创建了一个名为 msg 消息队列,一开始队列中没有消息,有32个空位。将alg.9-0-msgsnd.txt
中的信息逐个加入到队列中,完成后队列中有10个消息,用掉了10个空位,即用掉了5120bytes。 -
运行alg.9-2
msgtype = 0: 输出消息队列中的第一个消息,所以循环之后,就是按顺序输出消息队列中的元素。然后不删除消息队列。
观察到此时消息队列中的消息数为0。通过
ipcrm -q
删除msqid = 6
的消息队列(即此队列)。
消息队列已被删除。 -
改变传入的参数再次运行 alg.9-2
此时 msgtype = 1,通过循环输出消息队列中标号为1的消息。进程 alg.9-2 接收到了3个消息,消息队列中还有7个消息。继续改变 msgtype。
消息队列中没有标号为7的消息,所以进程没有接收到消息。继续改变 msgtype。
进程接收到标号为4的两个消息,消息队列中只剩下5个消息。将队列中剩余的消息全部输出。
将消息队列删除。
二. 修改代码,观察在 msgsnd 和 msgrcv 并发执行情况下消息队列的变化情况。
(一) 修改后的代码
- alg.9-1-msgsnd.c
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/msg.h>#include <sys/stat.h>#include <fcntl.h>#include "alg.9-0-msgdata.h"int main(int argc, char *argv[]){char pathname[80];struct stat fileattr;key_t key;struct msg_struct data;long int msg_type;char buffer[TEXT_SIZE];int msqid, ret, count = 0;FILE *fp;if(argc < 2) {printf("Usage: ./a.out pathname\n");return EXIT_FAILURE;}strcpy(pathname, argv[1]);if(stat(pathname, &fileattr) == -1) {ret = creat(pathname, O_RDWR);if (ret == -1) {ERR_EXIT("creat()");}printf("shared file object created\n");}key = ftok(pathname, 0x27); /* project_id can be any nonzero integer */if(key < 0) {ERR_EXIT("ftok()");}printf("\nIPC key = 0x%x\n", key); msqid = msgget((key_t)key, 0666 | IPC_CREAT);if(msqid == -1) {ERR_EXIT("msgget()");}struct msqid_ds msqattr;ret = msgctl(msqid, IPC_STAT, &msqattr);printf("number of messages remainded = %ld, empty slots = %ld\n", msqattr.msg_qnum, 16384/TEXT_SIZE-msqattr.msg_qnum);printf("NonBlocking Sending ... \n");while (1) {printf("Enter the number of msgtype and name(when msgtype < 0, quit):\n");scanf("%ld", &msg_type);if(msg_type < 0){break;}getchar();fgets(buffer, TEXT_SIZE, stdin);data.msg_type = msg_type;strcpy(data.mtext, buffer);ret = msgsnd(msqid, (void *)&data, TEXT_SIZE, 0); /* 0: blocking send, waiting when msg queue is full */if(ret == -1) {ERR_EXIT("msgsnd()");}count++;printf("number of sent messages = %d\n", count);printf("\n");}system("ipcs -q");exit(EXIT_SUCCESS);}
- alg.9-2-msgrcv.c
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/msg.h>#include <sys/stat.h>#include "alg.9-0-msgdata.h" int main(int argc, char *argv[]) /* Usage: ./b.out pathname msg_type */{key_t key;struct stat fileattr;char pathname[80];int msqid, ret, count = 0, sum = 0;struct msg_struct data;long int msgtype = 0; /* 0 - type of any messages */if(argc < 2) {printf("Usage: ./b.out pathname msg_type\n");return EXIT_FAILURE;}strcpy(pathname, argv[1]);if(stat(pathname, &fileattr) == -1) {ERR_EXIT("shared file object stat error");}if((key = ftok(pathname, 0x27)) < 0) {ERR_EXIT("ftok()");}printf("\nIPC key = 0x%x\n", key);msqid = msgget((key_t)key, 0666); /* do not create a new msg queue */if(msqid == -1) {ERR_EXIT("msgget()");}struct msqid_ds msqattr;ret = msgctl(msqid, IPC_STAT, &msqattr);while (1) {printf("Enter the number of msgtype(when msgtype < 0, quit):\n");scanf("%ld", &msgtype);getchar();if(msgtype < 0){break;}while(1){ret = msgrcv(msqid, (void *)&data, TEXT_SIZE, msgtype, IPC_NOWAIT); /* Non_blocking receive */if(ret == -1) { /* end of this msgtype */printf("number of received messages = %d\n", count);break;}printf("%ld %s\n", data.msg_type, data.mtext);count++;}sum += count;count = 0;ret = msgctl(msqid, IPC_STAT, &msqattr);printf("number of messages remainding = %ld\n", msqattr.msg_qnum); system("ipcs -q");}printf("number of messages remainding = %ld\n", msqattr.msg_qnum); printf("sum of received messages = %d\n", sum);if(msqattr.msg_qnum == 0) {printf("do you want to delete this msg queue?(y/n)\n");if(getchar() == 'y') {if(msgctl(msqid, IPC_RMID, 0) == -1)perror("msgctl(IPC_RMID)");}}system("ipcs -q");exit(EXIT_SUCCESS);
}
(二) 运行并观察
-
多终端同时运行 alg.9-1 和 alg.9-2。
-
先试试接收(通过 alg.9-2)。
此时消息队列里没有消息,所以接收不到消息。 -
发送3条消息(通过 alg.9-1)。
-
在查看消息队列的变化(通过 alg.9-2)。
此时队列中有3条消息,与前面的输入相符。 -
接收消息(通过 alg.9-2)。
-
发送一条消息后退出(通过 alg.9-1)。
-
直接退出并且不删除消息队列(通过 alg.9-2)。
从显示看,最后消息队列还在,且队列中还有一条消息。 -
删除消息队列。
三. 仿照 alg.8-4~8-6,编制基于 POSIX API 的进程间消息发送和消息接收例程。
(一)对代码中不熟悉的内容进行解析:
mq_open()
(参考资料)
函数原型:mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr );
函数功能: 用来创建和访问一个消息队列。
参数:
name:表示消息队列的名字,它符合POSIX IPC的名字规则。
oflag:表示打开的方式,和open函数的类似。有必选的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。
mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。
attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。
返回值: 成功返回消息队列描述符,失败返回-1。
mq_close()
(参考资料)
函数原型:mqd_t mq_close(mqd_t mqdes);
函数功能: 用来关闭一个消息队列。
参数:
mqdes: 消息队列的描述符。
返回值: 成功返回0,失败返回-1。
mq_unlink()
(参考资料)
函数原型:mqd_t mq_unlink(const char *name);
函数功能: 用来删除一个消息队列。
参数:
name: 指向消息队列名字的指针,名字的形式是 “/somename”。
返回值: 成功返回0,失败返回-1。
mq_send()
(参考资料)
函数原型:mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
函数功能: 向消息队列中写入一条消息。
参数:
mqdes:消息队列描述符;
msg_ptr:指向消息结构体体缓冲区的指针;
msg_len:消息体的长度。
msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
返回值: 成功返回0,失败返回-1。
mq_receive()
(参考资料)
函数原型:mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
函数功能: 从消息队列中获取消息。
参数:
mqdes:消息队列描述符;
msg_ptr:指向消息结构体缓冲区的指针;
msg_len:消息体的长度,一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。
msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
返回值: 成功返回0,失败返回-1。
mq_getattr()
(参考资料)
函数原型:int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
函数功能: 获取消息队列的属性,取得的结果存放在参数 mqstat 指针指向的内存。
参数:
mqdes:消息队列描述符;
mqstat:指向消息队列属性结构体的指针
struct mq_attr {long mq_flags //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞 long mq_maxmsg //消息队列的最大消息数long mq_msgsize //消息队列中每个消息的最大字节数long mq_curmsgs //消息队列中当前的消息数目
};
返回值: 成功返回0,失败返回-1。
(二) 代码
- 发送单个固定信息
alg.9-3-mqctr.c
/* gcc -lrt */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <mqueue.h>#include "alg.9-0-msgdata.h"int main(int argc, char *argv[])
{int ret;pid_t childpid1, childpid2;printf("Control:\n");mqd_t mqID;mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);if(mqID == -1){ERR_EXIT("mq_open()");}printf("\nmqID = %d\n", mqID);struct mq_attr mqAttr;mq_getattr(mqID, &mqAttr);printf("number of messages remainded = %ld, empty slots = %ld\n", mqAttr.mq_curmsgs, 16384 / TEXT_SIZE - mqAttr.mq_curmsgs);printf("\n");system("ls -l /dev/mqueue/"); printf("\n");char *argv1[] = {" ", argv[1], 0};childpid1 = vfork();if(childpid1 < 0) {ERR_EXIT("mqctr: 1st vfork()");} else if(childpid1 == 0) {execv("./alg.9-4-mqsnd.o", argv1); /* call sender with filename */}else {childpid2 = vfork();if(childpid2 < 0) {ERR_EXIT("mqctr: 2nd vfork()");}else if (childpid2 == 0) {execv("./alg.9-5-mqrcv.o", argv1); /* call receiver with filename */}else {wait(&childpid1);wait(&childpid2);printf("Control:\n\n");ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); if(mqAttr.mq_curmsgs == 0) {printf("do you want to delete this msg queue?(y/n)\n");if(getchar() == 'y') {if(mq_unlink("/anonymQueue") == -1){ERR_EXIT("mq_unlink()");} }}system("ls -l /dev/mqueue/"); }}exit(EXIT_SUCCESS);
}
alg.9-4-mqsnd.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <unistd.h>
#include <fcntl.h>#include "alg.9-0-msgdata.h"int main(int argc, char *argv[])
{char buffer[TEXT_SIZE] = "POSIX message queue test.";int ret;printf("Sender:\n");mqd_t mqID;mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);if(mqID == -1){ERR_EXIT("mq_open");}printf("\nmqID = %d\n", mqID);struct mq_attr mqAttr;mq_getattr(mqID, &mqAttr);printf("number of messages remainded = %ld, empty slots = %ld\n", mqAttr.mq_curmsgs, 16384 / TEXT_SIZE - mqAttr.mq_curmsgs);printf("Max msgsize = %ld\n", mqAttr.mq_msgsize);printf("Sending ... \n");ret = mq_send(mqID, buffer, TEXT_SIZE, 0); if(ret == -1) {ERR_EXIT("mq_send()");}ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("Send message: %s\n", buffer);printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); system("ls -l /dev/mqueue/"); printf("\n"); if(mq_close(mqID) == -1){ERR_EXIT("mq_close()");} exit(EXIT_SUCCESS);
}
alg.9-5-mqrcv.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mqueue.h>#include "alg.9-0-msgdata.h" int main(int argc, char *argv[])
{char buffer[TEXT_SIZE];int ret;sleep(1);printf("Receiver:\n");mqd_t mqID;mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);if(mqID == -1){ERR_EXIT("mq_open");}printf("\nmqID = %d\n", mqID);struct mq_attr mqAttr;ret = mq_getattr(mqID, &mqAttr);printf("Max msgsize = %ld\n", mqAttr.mq_msgsize);ret = mq_receive(mqID, buffer, mqAttr.mq_msgsize, NULL);if(ret == -1) { ERR_EXIT("mq_reveive()");}printf("Receive message: %s\n",buffer);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); system("ls -l /dev/mqueue/"); printf("\n");if(mq_close(mqID) == -1){ERR_EXIT("mq_close()");} exit(EXIT_SUCCESS);
}
运行
mqctr 先创建了消息队列 anonymQueue,此时队列为空。
然后 mqsnd 连接到消息队列,发送了一条消息到消息队列中,可以看到此时队列中有一条消息。发送完毕后 mqsnd 关闭消息队列并结束进程。
mqrcv 连接到消息队列中并从消息队列中将 mqsnd 发送的那条消息接收并打印出来。结束进程。
两个子进程结束后父进程 mqctr 继续执行。此时消息队列为空。
不删除消息队列,结束父进程。
再运行一次。这次删除消息队列。
可以看到消息队列已被删除。
- 循环发送固定信息。
- 并发执行
alg.9-4-mqsnd.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <unistd.h>
#include <fcntl.h>#include "alg.9-0-msgdata.h"int main(int argc, char *argv[])
{char buffer[TEXT_SIZE];int ret, count = 0;int flag;mqd_t mqID;mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);if(mqID == -1){ERR_EXIT("mq_open");}printf("\nmqID = %d\n", mqID);struct mq_attr mqAttr;mq_getattr(mqID, &mqAttr);printf("number of messages remainded = %ld, empty slots = %ld\n", mqAttr.mq_curmsgs, 16384 / TEXT_SIZE - mqAttr.mq_curmsgs);printf("Max msgsize = %ld\n", mqAttr.mq_msgsize);printf("Sending ... \n");while(1){printf("Enter flag(1: send; 0: quit):\n");scanf("%d", &flag);getchar();if(flag == 0){break;}printf("Enter message:\n");fgets(buffer, TEXT_SIZE, stdin);ret = mq_send(mqID, buffer, TEXT_SIZE, 0); if(ret == -1) {ERR_EXIT("mq_send()");}ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("Send message: %s\n", buffer);printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); system("ls -l /dev/mqueue/"); printf("\n"); }ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); printf("do you want to delete this msg queue?(y/n)\n");if(getchar() == 'y') {if(mq_unlink("/anonymQueue") == -1){ERR_EXIT("mq_unlink()");} }else{if(mq_close(mqID) == -1){ERR_EXIT("mq_close()");}}system("ls -l /dev/mqueue/"); exit(EXIT_SUCCESS);
}
alg.9-5-mqrcv.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mqueue.h>#include "alg.9-0-msgdata.h" int main(int argc, char *argv[])
{char buffer[TEXT_SIZE];int ret, count = 0;int flag;mqd_t mqID;mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);if(mqID == -1){ERR_EXIT("mq_open");}printf("\nmqID = %d\n", mqID);struct mq_attr mqAttr;ret = mq_getattr(mqID, &mqAttr);printf("Max msgsize = %ld\n", mqAttr.mq_msgsize);while(1){printf("Enter flag(1: receive; 0: quit):\n");scanf("%d", &flag);getchar();if(flag == 0){break;}ret = mq_receive(mqID, buffer, mqAttr.mq_msgsize, NULL);if(ret == -1) { ERR_EXIT("mq_reveive()");}printf("Receive message: %s\n",buffer);ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs);system("ls -l /dev/mqueue/"); printf("\n");}ret = mq_getattr(mqID, &mqAttr);if(ret == -1){ERR_EXIT("mq_getattr()");}printf("number of messages remainding = %ld\n", mqAttr.mq_curmsgs); system("ls -l /dev/mqueue/"); if(mq_close(mqID) == -1){ERR_EXIT("mq_close()");} exit(EXIT_SUCCESS);
}
运行
(1)两个程序并发运行
(2)发送消息
发送3条手动输入的消息,发送成功,可以看到队列中有3条消息。
(3)接收消息
手动接收了2条消息,消息内容与发送的一致。
(4)退出
mqrcv
mqsnd
都显示此时队列中还有一条消息。不删除消息队列。
(5)再运行一次
输出消息队列中剩下的消息。
删除队列。