学习环境 Centos6.5 Linux 内核 2.6
什么是消息队列?
消息队列是SystemV版本中三种进程通信机制之一,另外两种是信号量和共享存储段。消息队列提供了进程间发送数据块的方法,而且每个数据块都有一个类型标识。消息队列是基于消息的,而管道是基于字节流。创建的消息队列,生命周期随内核,只有内核重启或用户主动去删除,才可以真正关闭消息队列。
背景知识:
I P C 标识符:每一个内核中的IPC结构(消息队列,信号量,共享存储段)都用一个非负整数的标识符(identifier)加以引用。当一个消息队列发送或取消息,只需要知道其队列标示符。
struct ipc_perm
{
key_t __key;
uid_t uid;
gid_t gid;
uid_t cuid;
gid_t cgid;
unsigned short mode;
unsigned short __seq;
}
IPC关键字:因为IPC标识符是IPC结构的内部名。为使多个合作进程能够在同一IPC对象上会合,需要提供一个外部名方案。即键(key)每一个IPC对象都与一个键相关联,于是键就作为该结构的外部名。要想获得一个唯一标识符,必须使用一个IPC关键字。server和client进程必须双方都同意此关键字。 可以使用ftok( )函数为客户端和服务器产生关键字值。
struct msqid_ds
{
struct ipc_perm msg_perm;
struct msg* msg_first;
struct msg* msg_last;
__kernel_time_t msg_stime;
__kernel_time_t msg_rtime;
__kernel_time_t msg_ctime;
unsigned long msg_lcbytes;
unsigned long msg_lqbytes;
unsigned short msg_cbytes;
unsigned short msg_qnum;
__kernel_ipc_pid_t msg_lspid;
__kernel_ipc_pid_t msg_lrpid;
}
有关命令:
ipcs -q 消息队列列表ipcrm -q msqid(要删除的消息队列ID) 示例:
消息队列相关函数。
1、ftok函数
#include <sys/ipc.h>
#include <sys/types.h>
key_t ftok(
const char* path,
int id);
ftok 函数把一个已存在的路径名和一个整数标识转换成一个key_t值,即IPC关键字path 参数就是你指定的文件名(已经存在的文件名),一般使用当前目录。当产生键时,只使用id参数的低8位。id 是子序号, 只使用8bit (1-255)返回值:若成功返回键值,若出错返回(key_t)-1 在一般的UNIX实现中,是将文件的索引节点号取出(inode),前面加上子序号的到key_t的返回值
2、msgget函数
#include <sys/msg.h>
#include <sys/ipc.h>
int msgget(key_t key,
int msgflag);
msgget 通常是调用的第一个函数,功能是创建一个新的或已经存在的消息队列。此消息队列与key相对应。key 参数 即ftok函数生成的关键字flag参数 :
IPC_CREAT: 如果IPC不存在,则创建一个IPC资源,否则打开已存在的IPC。 IPC_EXCL :只有在共享内存不存在的时候,新的共享内存才建立,否则就产生错误。IPC_EXCL与IPC_CREAT一起使用,表示要创建的消息队列已经存在。如果该IPC资源存在,则返回-1。IPC_EXCL标识本身没有太大的意义,但是和IPC_CREAT标志一起使用可以用来保证所得的对象时新建的,而不是打开已有的对象。返回值 若成功返回消息队列ID,若出错则返回-1
3、msgsnd函数和msgrcv函数
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(
int msqid,
const void* msgp, size_t msgsz,
int msgflag);
ssize_t msgrcv(
int msqid,
void *ptr, size_t nbytes,
long type,
int msgflag);
msgsnd 将数据放到消息队列中 msgrcv 从消息队列中读取数据msqid:消息队列的识别码msgp:指向消息缓冲区的指针,用来暂时存储发送和接受的消息。是一个允许用户定义的通用结构,如下:
struct msgubf
{
long mtype;
char mtext[SIZE];
}
msgsz:消息的大小ptr 指向一个长整形数,将返回的消息类型存储在其中(即结构体 struct msgbuf 的mtype成员)nbyte 是存放实际消息数据的缓冲区的长度type :可以指定想要哪一种消息
type == 0 返回队列的第一个消息type > 0 返回队列中消息类型type的第一个消息type < 0 返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个,则类型值最小的消息msgflag:消息类型,这个参数是控制函数行为的标识。取值可以是0,标识忽略。
IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回给调用函数的进程。如果不指定这个参数,那么进程将被阻塞知道函数可以从队列中取得符合条件的消息为止。0 表示不关心,忽略此行为返回值:成功执行返回消息的数据部分的长度,若出错则返回-1 msgrcv成功执行时,内核更新与该消息队列相关联的msqid_ds结构以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并将队列中的消息数(msg_qnum)减1。*
4、msgctl函数
#include <sys/types.g>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(
int msqid,
int cmd,
struct msqid_ds *buf);
msgctl 函数 可以直接控制消息队列的行为msqid 消息队列idcmd :命令
IPC_STAT 读取消息队列的数据结构msqid_ds, 并将其存储在 buf指定的地址中IPC_SET 设置消息队列的数据结构msqid_ds 中的ipc_perm元素的值,这个值取自buf 参数IPC_RMID 从内核中移除消息队列。返回值:如果成功返回0,失败返回-1
代码示例
Makefile
client
_=client
server
_=server
cc=gcc
clientSrc=client.c common.c
serverSrc=server.c common.c
.
PHONY:all
all:$(client
_)
$(server
_)
$(client
_)
:$(clientSrc)
$(cc) -o
$@ $^
$(server
_)
:$(serverSrc)
$(cc) -o
$@ $^
.
PHONY:clean
clean:
rm -f
$(client
_)
$(server
_)
common_h
#ifndef _COMMON_H_
#define _COMMON_H_
#include <string.h> // strcpy
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h> // read
#include <sys/ipc.h>
#include <sys/msg.h>
#define PATHNAME "./"
#define PROJ_ID 0x666
#define MSGSIZE 1024
#define SERVER_TYPE 1 // 服务端发送消息类型
#define CLIENT_TYPE 2 // 客户端发送消息类型
struct msgbuf
{
long mtype;
char mtext[MSGSIZE];
};
int createMsgQueue();
int destroyMsgQueue(
int msqid);
int getMsgQueue();
int sendMsg(
int msqid,
long type,
const char *_sendInfo);
int recvMsg(
int msqid,
long type,
char buf[]);
#endif /* _COMMON_H*/
common_c
#include "common.h"
int commMsg(
int msgflag)
{
key_t _k = ftok(PATHNAME, PROJ_ID);
int msqid = msgget(_k, msgflag);
if(msqid <
0)
{
perror(
"msgget");
return -
2;
}
return msqid;
}
int createMsgQueue()
{
return commMsg(IPC_CREAT|IPC_EXCL|
0666);
}
int destroyMsgQueue(
int msqid)
{
int _ret = msgctl(msqid, IPC_RMID,
0);
if(_ret <
0)
{
perror(
"msgctl");
return -
1;
}
return 0;
}
int getMsgQueue()
{
return commMsg(IPC_CREAT);
}
int sendMsg(
int msqid,
long type,
const char *_sendInfo)
{
struct msgbuf msg;
msg.mtype = type;
strcpy(msg.mtext, _sendInfo);
int _snd = msgsnd(msqid, &msg,
sizeof(msg.mtext),
0);
if( _snd <
0)
{
perror(
"msgsnd");
return -
1;
}
return 0;
}
int recvMsg(
int msqid,
long type,
char buf[])
{
struct msgbuf msg;
int _rcv = msgrcv(msqid, &msg,
sizeof(msg.mtext), type,
0);
if( _rcv <
0)
{
perror(
"msgrcv");
return -
1;
}
strcpy(buf, msg.mtext);
return 0;
}
client_c
#include "common.h"
void client()
{
int msqid = getMsgQueue();
char buf[MSGSIZE];
while(
1)
{
printf(
"Please enter :");
fflush(stdout);
ssize_t _s = read(
0, buf,
sizeof(buf)-
1);
if(_s >
0)
{
buf[_s -
1] =
'\0';
sendMsg(msqid, CLIENT_TYPE, buf);
}
recvMsg(msqid, SERVER_TYPE, buf);
if(
strcmp(
"exit",buf) ==
0)
{
printf(
"服务端退出,客户端自动退出\n");
break;
}
printf(
"服务端说:%s\n", buf);
}
}
int main()
{
client();
return 0;
}
server_c
#include "common.h"
void server()
{
int msqid = createMsgQueue();
char buf[MSGSIZE];
while(
1)
{
recvMsg(msqid, CLIENT_TYPE, buf);
printf(
"客户端说:%s\n ", buf);
printf(
"Please enter :");
fflush(stdout);
ssize_t _s = read(
0, buf,
sizeof(buf)-
1);
if(_s >
0)
{
buf[_s-
1] =
'\0';
sendMsg(msqid, SERVER_TYPE, buf);
if(
strcmp(buf,
"exit") ==
0)
break;
}
}
destroyMsgQueue(msqid);
}
int main()
{
server();
return 0;
}
通信截图示例:
注意:如果在启动server 后 强制结束掉(ctrl+c)程序,则消息队列会一直存在,这时再次执行server会执行失败,需要使用命令 ipcrm -q xxx xx表示要关闭的msqid。
总结:消息队列发送的是数据块, 生命周期随内核,依赖于系统接口实现。适用于无血缘关系多个进程之间通信。
转载请注明原文地址: https://ju.6miu.com/read-35908.html