【Linux】进程间通信(IPC)之消息队列详解及测试用例

    xiaoxiao2021-03-25  66

    学习环境 Centos6.5 Linux 内核 2.6

    什么是消息队列?

    消息队列是SystemV版本中三种进程通信机制之一,另外两种是信号量和共享存储段。消息队列提供了进程间发送数据块的方法,而且每个数据块都有一个类型标识。消息队列是基于消息的,而管道是基于字节流。创建的消息队列,生命周期随内核,只有内核重启或用户主动去删除,才可以真正关闭消息队列。

    背景知识:

    I P C 标识符:每一个内核中的IPC结构(消息队列,信号量,共享存储段)都用一个非负整数的标识符(identifier)加以引用。当一个消息队列发送或取消息,只需要知道其队列标示符。 // 内核为每个IPC对象维护一个数据结构(/usr/include/linux/ipc.h) struct ipc_perm { key_t __key; /* key supplied to xxxget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permission */ unsigned short __seq; /* Sequeence number*/ } IPC关键字:因为IPC标识符是IPC结构的内部名。为使多个合作进程能够在同一IPC对象上会合,需要提供一个外部名方案。即键(key)每一个IPC对象都与一个键相关联,于是键就作为该结构的外部名。要想获得一个唯一标识符,必须使用一个IPC关键字。server和client进程必须双方都同意此关键字。 可以使用ftok( )函数为客户端和服务器产生关键字值。 //消息队列的结构 ( /usr/include/linux/msg.h) // message queue id // defined in <linux/ipc.h> struct msqid_ds { struct ipc_perm msg_perm; struct msg* msg_first; /* first message on queue, unused */ struct msg* msg_last; /* last message in queue, unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto == 同上... */ unsigned short msg_cbytes; /* current number of butes on queue */ unsigned short msg_qnum; /* number of messages in queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ }

    有关命令:

    ipcs -q 消息队列列表ipcrm -q msqid(要删除的消息队列ID) 示例:

    消息队列相关函数。

    1、ftok函数

    #include <sys/ipc.h> #include <sys/types.h> key_t ftok(const char* path, int id); ftok 函数把一个已存在的路径名和一个整数标识转换成一个key_t值,即IPC关键字path 参数就是你指定的文件名(已经存在的文件名),一般使用当前目录。当产生键时,只使用id参数的低8位。id 是子序号, 只使用8bit (1-255)返回值:若成功返回键值,若出错返回(key_t)-1 在一般的UNIX实现中,是将文件的索引节点号取出(inode),前面加上子序号的到key_t的返回值

    2、msgget函数

    #include <sys/msg.h> #include <sys/ipc.h> int msgget(key_t key, int msgflag); msgget 通常是调用的第一个函数,功能是创建一个新的或已经存在的消息队列。此消息队列与key相对应。key 参数 即ftok函数生成的关键字flag参数 : IPC_CREAT: 如果IPC不存在,则创建一个IPC资源,否则打开已存在的IPC。 IPC_EXCL :只有在共享内存不存在的时候,新的共享内存才建立,否则就产生错误。IPC_EXCL与IPC_CREAT一起使用,表示要创建的消息队列已经存在。如果该IPC资源存在,则返回-1。IPC_EXCL标识本身没有太大的意义,但是和IPC_CREAT标志一起使用可以用来保证所得的对象时新建的,而不是打开已有的对象。返回值 若成功返回消息队列ID,若出错则返回-1

    3、msgsnd函数和msgrcv函数

    #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd(int msqid, const void* msgp, size_t msgsz, int msgflag); ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int msgflag); msgsnd 将数据放到消息队列中 msgrcv 从消息队列中读取数据msqid:消息队列的识别码msgp:指向消息缓冲区的指针,用来暂时存储发送和接受的消息。是一个允许用户定义的通用结构,如下: struct msgubf { long mtype; // 消息类型, 必须大于零 char mtext[SIZE]; // 消息文本 } msgsz:消息的大小ptr 指向一个长整形数,将返回的消息类型存储在其中(即结构体 struct msgbuf 的mtype成员)nbyte 是存放实际消息数据的缓冲区的长度type :可以指定想要哪一种消息 type == 0 返回队列的第一个消息type > 0 返回队列中消息类型type的第一个消息type < 0 返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个,则类型值最小的消息msgflag:消息类型,这个参数是控制函数行为的标识。取值可以是0,标识忽略。 IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回给调用函数的进程。如果不指定这个参数,那么进程将被阻塞知道函数可以从队列中取得符合条件的消息为止。0 表示不关心,忽略此行为返回值:成功执行返回消息的数据部分的长度,若出错则返回-1 msgrcv成功执行时,内核更新与该消息队列相关联的msqid_ds结构以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并将队列中的消息数(msg_qnum)减1。*

    4、msgctl函数

    #include <sys/types.g> #include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf); msgctl 函数 可以直接控制消息队列的行为msqid 消息队列idcmd :命令 IPC_STAT 读取消息队列的数据结构msqid_ds, 并将其存储在 buf指定的地址中IPC_SET 设置消息队列的数据结构msqid_ds 中的ipc_perm元素的值,这个值取自buf 参数IPC_RMID 从内核中移除消息队列。返回值:如果成功返回0,失败返回-1

    代码示例

    Makefile

    client_=client server_=server cc=gcc clientSrc=client.c common.c serverSrc=server.c common.c .PHONY:all all:$(client_) $(server_) $(client_):$(clientSrc) $(cc) -o $@ $^ $(server_):$(serverSrc) $(cc) -o $@ $^ .PHONY:clean clean: rm -f $(client_) $(server_)

    common_h

    #ifndef _COMMON_H_ #define _COMMON_H_ #include <string.h> // strcpy #include <stdio.h> #include <sys/types.h> #include <unistd.h> // read #include <sys/ipc.h> #include <sys/msg.h> #define PATHNAME "./" #define PROJ_ID 0x666 #define MSGSIZE 1024 #define SERVER_TYPE 1 // 服务端发送消息类型 #define CLIENT_TYPE 2 // 客户端发送消息类型 struct msgbuf // 消息结构 { long mtype; // 消息类型 char mtext[MSGSIZE]; // 消息buf }; int createMsgQueue(); // 创建消息队列 int destroyMsgQueue( int msqid); // 销毁消息队列 int getMsgQueue(); // 获取消息队列 int sendMsg( int msqid, long type, const char *_sendInfo); // 发送消息 int recvMsg(int msqid, long type, char buf[]); // 接收消息 #endif /* _COMMON_H*/

    common_c

    #include "common.h" int commMsg(int msgflag) { // 生成IPC 关键字 key_t _k = ftok(PATHNAME, PROJ_ID); int msqid = msgget(_k, msgflag); // 获取消息队列ID if(msqid < 0) { perror("msgget"); return -2; } return msqid; } int createMsgQueue() // 创建消息队列 { return commMsg(IPC_CREAT|IPC_EXCL|0666); } int destroyMsgQueue( int msqid) // 销毁消息队列 { int _ret = msgctl(msqid, IPC_RMID, 0); if(_ret < 0) { perror("msgctl"); return -1; } return 0; } int getMsgQueue() // 获取消息队列 { return commMsg(IPC_CREAT); } int sendMsg( int msqid, long type, const char *_sendInfo) // 发送消息 { struct msgbuf msg; msg.mtype = type; strcpy(msg.mtext, _sendInfo); int _snd = msgsnd(msqid, &msg, sizeof(msg.mtext), 0); if( _snd < 0) { perror("msgsnd"); return -1; } return 0; } int recvMsg(int msqid, long type, char buf[]) // 接收消息 { struct msgbuf msg; int _rcv = msgrcv(msqid, &msg, sizeof(msg.mtext), type, 0); if( _rcv < 0) { perror("msgrcv"); return -1; } strcpy(buf, msg.mtext); return 0; }

    client_c

    #include "common.h" void client() { int msqid = getMsgQueue(); char buf[MSGSIZE]; while(1) { printf("Please enter :"); fflush(stdout); ssize_t _s = read(0, buf, sizeof(buf)-1); if(_s > 0) { buf[_s -1] = '\0'; sendMsg(msqid, CLIENT_TYPE, buf); } recvMsg(msqid, SERVER_TYPE, buf); if(strcmp("exit",buf) == 0) { printf("服务端退出,客户端自动退出\n"); break; } printf("服务端说:%s\n", buf); } } int main() { client(); return 0; }

    server_c

    #include "common.h" void server() { int msqid = createMsgQueue(); char buf[MSGSIZE]; while(1) { // 服务端先接收 recvMsg(msqid, CLIENT_TYPE, buf); printf("客户端说:%s\n ", buf); printf("Please enter :"); fflush(stdout); ssize_t _s = read(0, buf, sizeof(buf)-1); if(_s > 0) { buf[_s-1] = '\0'; sendMsg(msqid, SERVER_TYPE, buf); if(strcmp(buf, "exit") == 0) break; } } destroyMsgQueue(msqid); } int main() { server(); return 0; }

    通信截图示例:

    注意:如果在启动server 后 强制结束掉(ctrl+c)程序,则消息队列会一直存在,这时再次执行server会执行失败,需要使用命令 ipcrm -q xxx xx表示要关闭的msqid。

    总结:消息队列发送的是数据块, 生命周期随内核,依赖于系统接口实现。适用于无血缘关系多个进程之间通信。

    转载请注明原文地址: https://ju.6miu.com/read-35908.html

    最新回复(0)