System V消息队列使用消息队列标识符标识。跟Posix消息队列一样,进程在往消息队列写入消息之前,不要求另外某个进程在该队列上等待一个消息的到达。
消息队列的结构:
Posix消息队列:#include <sys/msg.h>
struct msqid_ds
{ struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ };
其中
struct ipc_perm
{
uid_t uid;/*owner's ...*/
gid_t gid;
uid_t cuid;/*creator's ...*/
gid_t cgid;
mode_t mode;
ulong_t seq;
key_t key;
};
内核中System V消息队列的结构如下图所示:
可见,消息队列中每个消息由类型、长度、数据、指向下一个消息的指针构成。
几个API之间的关系如下图:
总的System V 的三种IPC的函数关系如下:
System V IPC使用key_t作为它们的名字,函数ftok把一个已存在的路径名和一个整数转换成一个key_t值,称为IPC键
#include <sys/ipc.h>
key_t ftok(const chr *pathname, int id);
返回值:成功,IPC键;出错,-1
本机环境为centos 6.3下,key_t值的构成:id的低8位+st_dev的低8位+st_ino的低16位。(st_dev和st_ino来自ftok的路径名的stat结构里)
创建一个新的消息队列或打开一个已存在的消息队列
int msgget(key_t key, int oflag);
返回值:成功,标识符;出错,-1
key:既可以是ftok的返回值,也可以是IPC_PRIVATE(保证创建一个新的消息队列)
oflag:读写权限位与IPC_CREAT或IPC_EXCL的按位或。其中读写权限位如下:
注意:图中的几个常量MSG_R、SEM_R 等等是要自己定义的宏,如#define MSG_R 0400
例1.创建一个消息队列
程序:
#include <stdio.h> #include <sys/msg.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #define err_exit(m)\ {\ printf("%s() error %d: %s\n", m, errno, strerror(errno));\ exit(EXIT_FAILURE);\ } #define MSG_R 0400 #define MSG_W 0200 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >> 3 | MSG_R >> 6) int main(int argc, char *argv[]) { //创建方式设置 int oflag = SVMSG_MODE | IPC_CREAT; int c; while ((c = getopt(argc, argv, "e")) != -1) { switch(c) { case 'e': oflag |= IPC_EXCL; break; } } if (optind != argc -1) { printf("usage: ./msgcreate [-e] <name>\n"); exit(-1); } //创建消息队列 key_t key; if ((key = ftok(argv[optind], 0)) == -1) err_exit("ftok"); if (msgget(key, oflag) == -1) err_exit("msgget"); exit(0); }分析:
1.要查看系统的Posix消息队列,可以使用命令ipcs -q查看
2.访问权限的宏是要自己定义的,不是在哪个头文件里面的。
结果:
向消息队列发送一个消息
int msgsnd(int msgid, const void *ptr, size_t length, int flag);
返回值:成功,0;出错,-1
msgid:msgget返回的标识符
ptr:一个结构指针,形式如下:
struct msgbuf
{
long mtype; //消息类型,必须大于0
char mtext[1]; //消息数据
};
length:待发送的消息的长度(字节为单位),有关系length + sizeof(long) = sizeof(Message)
flag:0或IPC_NOWAIT(非阻塞,没有空间可用,则立刻返回)
例2.向已创建的消息队列发送消息,发送时指定消息大小及类型
程序:
#include <stdio.h> #include <sys/msg.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #define err_exit(m)\ {\ printf("%s() error %d: %s\n", m, errno, strerror(errno));\ exit(EXIT_FAILURE);\ } #define MSG_W 0200 //自定义消息结构 struct msgbuf { long mtype; char mtext[100]; }; int main(int argc, char *argv[]) { if (argc != 4) { printf("usage: ./msgsnd <name> <#bytes> <type>"); exit(-1); } //打开消息队列 int msgid; if ((msgid = msgget(ftok(argv[1], 0), MSG_W)) == -1) err_exit("msgget"); //发送消息 size_t len = atoi(argv[2]);//消息长度 long type = atoi(argv[3]);//消息类型 struct msgbuf *buff = calloc(sizeof(long) + len, sizeof(char)); buff->mtype = type; if (msgsnd(msgid, buff, len, 0) == -1)//发送消息 err_exit("msgsnd"); exit(0); } 分析:1.要自己定义消息结构
2.路径名--->ftok--->key_t--->msgget--->msgsnd 结果:
从消息队列中读出一个消息
ssize_t msgrcv(int msgid, void *ptr, size_t length, long type, int flag);
返回值:成功,读入缓冲区的字节数;出错,-1
msgid、ptr:和msgsnd函数一样
length:ptr指向的缓冲区中数据部分大小
type:为0,返回队列中第一个消息;大于0,返回类型为type的第一个消息;小于0,返回类型小于type绝对值的最小类型消息
flag:IPC_NOWAIT,不阻塞,立即返回;MSG_NOERROR,接收数据大于length时,截断而不返回错误
例3.读取消息队列中的消息,使得选择非阻塞模式和指定类型这两个选项是可选的。
程序:
#include <stdio.h> #include <sys/msg.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #define err_exit(m)\ {\ printf("%s() error %d: %s\n", m, errno, strerror(errno));\ exit(EXIT_FAILURE);\ } #define MSG_R 0400 #define MAXMSG (8192 + sizeof(long)) //自定义消息结构 struct msgbuf { long mtype; char mtext[100]; }; int main(int argc, char *argv[]) { ssize_t n; int c; int msgid; int type, flag; struct msgbuf *buff; type = flag = 0; while ((c = getopt(argc, argv, "nt:")) != -1) { switch(c) { case 'n'://非阻塞 flag |= IPC_NOWAIT; break; case 't'://类型 type = atol(optarg); break; } } if (optind != argc -1) { printf("usage: ./msgrcv [-n] [-t type] <name>\n"); exit(-1); } if ((msgid = msgget(ftok(argv[optind], 0), MSG_R)) == -1)//打开消息队列 err_exit("msgid"); buff = malloc(MAXMSG); if (buff == NULL) err_exit("malloc"); if ((n = msgrcv(msgid, buff, MAXMSG, type, flag)) == -1)//读出消息 err_exit("msgrcv"); printf("read %d bytes, type = %d\n", n, buff->mtype); exit(0); } 分析: 1.自定义消息结构2.msgrcv的参数length是缓冲区的最大值,最大值算法为:sizeof(long)+length
结果:
对一个消息队列进行各种控制
int msgctl(int msgid, int cmd, struct msqid_ds *buff);
cmd:
IPC_RMID:删除msgid指定的消息队列,此时buff被忽略
IPC_STAT:返回消息队列当前的msqid_ds结构
IPC_SET:设置消息队列的msqid_ds结构
例4.删除一个消息队列
#include <stdio.h> #include <sys/msg.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #define err_exit(m)\ {\ printf("%s() error %d: %s\n", m, errno, strerror(errno));\ exit(EXIT_FAILURE);\ } int main(int argc, char *argv[]) { if (argc != 2) { printf("usage: ./msgrmid <name>\n"); exit(-1); } key_t key; int msgid; if ((key = ftok(argv[1], 0)) == -1)//取得消息队列名 err_exit("ftok"); if ((msgid = msgget(key, 0)) == -1)//取得消息队列标识符 err_exit("msgget"); if (msgctl(msgid, IPC_RMID, NULL) == -1)//删除消息队列 err_exit("msgget"); exit(0); } 结果:
使用System V 消息队列实现一个客户端和一个服务器端之间的通信。用一个消息队列实现客户端发送消息到服务器,用另一个消息队列实现服务器发送消息到客户端。
思路:服务器创建两个消息队列,通信结束后客户端销毁两个消息队列。客户端发消息--->服务器接收消息--->服务器发消息---->客户端接收消息--->结束
程序:
公共头文件 svmsg.h:
#ifndef SVMSG_H_H #define SVMSG_H_H #include <sys/msg.h> #include <stdio.h> #include <errno.h> #include <stdlib.h> #define KEY_1 12345L //客户端到服务器的消息队列 #define KEY_2 23456L //服务器到客户端的消息队列 #define MAXMSGDATA 1024 #define MAXMSG (1024 + sizeof(long))//消息的最大数据量 //消息队列打开模式 #define MSG_R 0400 #define MSG_W 0200 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >> 3 | MSG_R >> 6) //消息结构 struct msgbuff { long msg_type; char msg_data[1]; }; //错误处理 #define err_exit(m);\ {\ printf("%s() error %d: %s\n", m, errno, strerror(errno));\ exit(EXIT_FAILURE);\ } #endif client.c: #include <stdio.h> #include <sys/msg.h> #include "svmsg.h" void client(int readid, int writeid); int main() { int readid; //从KEY_2中读 int writeid; //往KEY_1中写 //创建消息队列 if ((writeid = msgget(KEY_1, 0)) == -1) { err_exit("msgget"); } if ((readid = msgget(KEY_2, 0)) == -1) { err_exit("msgget"); } client(readid, writeid); //删除消息队列 if (msgctl(readid, IPC_RMID, NULL) == -1) { err_exit("msgctl"); } if (msgctl(writeid, IPC_RMID, NULL) == -1) { err_exit("msgctl"); } exit(0); } void client(int readid, int writeid) { struct msgbuff *buff, *ptr; long type; char content[MAXMSGDATA]; size_t len; ssize_t n; printf("client:\nenter message type: "); scanf("%ld", &type);//输入消息类型 printf("enter message content: "); scanf("%s", content);//输入消息内容 len = strlen(content) + sizeof(long); ptr = calloc(len, sizeof(char)); ptr->msg_type = type; strcpy(ptr->msg_data, content); if (msgsnd(writeid, ptr, len, 0) == -1)//发送消息到KEY_1 { err_exit("msgsnd"); } free(ptr); buff = malloc(MAXMSGDATA); if (buff == NULL) { printf("malloc() error"); exit(-1); } if ((n = msgrcv(readid, buff, MAXMSG, 0, 0)) == -1)//从KEY_2接收消息 { err_exit("msgrcv"); } printf("received message from server, read %d bytes, type = %d, content = %s\n", n, buff->msg_type, buff->msg_data); free(buff); } server.c: #include <stdio.h> #include <sys/msg.h> #include "svmsg.h" void server(int readid, int writeid); int main() { int readid; //从KEY_1中读 int writeid; //往KEY_2中写 //创建消息队列 if ((readid = msgget(KEY_1, SVMSG_MODE | IPC_CREAT)) == -1) { err_exit("msgget, KEY_1"); } if ((writeid = msgget(KEY_2, SVMSG_MODE | IPC_CREAT)) == -1) { err_exit("msgget, KEY_2"); } server(readid, writeid); exit(0); } void server(int readid, int writeid) { struct msgbuff *buff, *ptr; ssize_t n; long type; char content[MAXMSGDATA]; size_t len; buff = (struct msgbuff *)malloc(MAXMSGDATA); if ((n = msgrcv(readid, buff, MAXMSGDATA, 0, 0)) == -1)//服务器接收消息 { err_exit("msgrcv"); } printf("server:\nreceived message from client: read %d bytes, type = %d, content = %s\n", n, buff->msg_type, buff->msg_data); free(buff); printf("enter message type: "); scanf("%ld", &type); printf("enter message content: "); scanf("%s", content); len = strlen(content) + sizeof(long); ptr = calloc(len, sizeof(char)); ptr->msg_type = type; strcpy(ptr->msg_data, content); if (msgsnd(writeid, ptr, len, 0) == -1)//服务器发送消息 { err_exit("msgrcv"); } free(ptr); } 结果: