【并发程序设计】14.消息队列

2024-06-02 23:12

本文主要是介绍【并发程序设计】14.消息队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

14.消息队列

消息队列(Message Queue)是一种通信机制,用于在分布式系统中传递和管理消息的队列型数据结构

  • 消息队列通常是一个先进先出(FIFO)的数据结构,它允许多个进程或线程之间以异步方式进行通信。
  • 它可以被看作是一个系统内核中的内部链表,其中发送进程将消息添加到队列中,接收进程从队列中读取消息进行处理。
  • 这种通信机制传递的数据通常是结构化的,而不是简单的字节流。

在这里插入图片描述

消息队列使用步骤

  • 发送端:
    1. 申请Key ftok
    2. 打开/创建消息队列 msgget
    3. 向消息队列发送消息 msgsnd
  • 接收端:
    1. 打开/创建消息队列 msgget
    2. 从消息队列接收消息 msgrcv
    3. 控制(删除)消息队列 msgctl

发送端

1.申请Key

ftok函数

  1. 原型key_t ftok(const char *pathname, int proj_id);
  2. 功能:生成一个唯一的键值,通常用于创建共享内存或消息队列等
  3. 参数
    • pathname:一个已经存在的文件路径,通常是程序中已经打开的文件。
    • proj_id:一个整数,用于与pathname组合生成唯一的键值。
  4. 返回值
    • 成功,返回一个key_t类型的键值,用于后续的系统调用(如shmget、msgget等)
    • 失败,返回-1

2.打开/创建消息队列

msgget函数

  1. 原型int msgget(key_t key, int msgflg);
  2. 功能:创建或打开一个消息队列
  3. 参数
    • key:一个唯一的键值,通常由ftok函数生成。
    • msgflg:消息队列的访问权限和创建标志,IPC_CREAT、IPC_EXCL、IPC_NOWAIT等
      1. IPC_CREAT
        • 若消息队列不存在,则创建一个新的消息队列。
      2. IPC_EXCL
        • 当与 IPC_CREAT 同时使用时,如果已经存在与 key 相关联的消息队列,msgget 会失败并返回 -1。
        • 如果没有同时指定 IPC_CREATIPC_EXCL 将被忽略。
      3. IPC_NOWAIT
        • 如果消息队列不能立即创建(例如,由于资源限制),msgget 会立即返回 -1,而不是等待直到可以创建为止。
        • 如果没有指定 IPC_NOWAIT,系统会等待直到可以创建消息队列为止。
      4. IPC_PRIVATE
        • 在 Linux 中,IPC_PRIVATE 是一种特殊的键值,它允许不使用 ftok 来生成唯一的 key
        • 使用 IPC_PRIVATE 作为 key 时,系统会为调用者分配一个私有的、唯一的消息队列标识符。
      5. IPC_RMID
        • 这是一个较新的选项,用于删除所有关联的消息队列,而不仅仅是与给定 key 关联的那个。
      6. 06660777 等权限位
        • 这些数字代表消息队列的访问权限。通常,它们被设置为八进制数,其中每个数字对应于用户、组和其他人的读/写权限。
  4. 返回值
    • 成功,返回消息队列的标识符(非负整数)
    • 失败,返回-1
  5. 向消息队列发送消息 msgsnd函数
    1. 原型int msgsnd(int msqid, const struct msgbuf *msgp, int msgsz, int msgflg);
    2. 功能:向消息队列发送一个消息
    3. 参数
      • msqid:消息队列的标识符,通常由msgget函数返回。
      • msgp:指向要发送的消息结构的指针,该结构包含消息类型和消息正文。
      • msgsz:消息的大小(以字节为单位)。
      • msgflg:指定消息发送的标志,可以是IPC_NOWAIT等,同msgget函数中的msgflg参数。
    4. 返回值
      • 成功,返回0
      • 失败,返回-1

示例

发送一条消息到队列

#include <stdio.h> 
#include <sys/ipc.h> 
#include <sys/msg.h> 
#include <string.h> typedef struct // 定义一个结构体类型
{long msg_type; // 消息类型,必须是long型char buf[128]; // 消息内容缓冲区,用于存储消息内容
}msgT;    #define MSGLEN  (sizeof(msgT)-sizeof(long)) // 定义消息长度宏,减去长整型变量的大小int main() // 主函数
{key_t key; // 定义一个key_t类型的变量key,用于存储生成的键值int msgid; // 定义一个整型变量msgid,用于存储消息队列的标识符int ret;   // 定义一个整型变量ret,用于存储函数调用的返回值msgT msg;  // 定义一个msgT类型的结构体变量msg,用于存储要发送的消息//1.生成一个键值key = ftok(".",100); // "."表示当前目录,100是一个任意的整数if(key<0) // 如果生成键值失败{perror("ftok"); // 打印错误信息并返回0return 0;}//2.函数创建msgid = msgget(key,IPC_CREAT|0666); // key为键值,IPC_CREAT表示如果不存在则创建,0666表示权限设置为可读写if(msgid<0) // 如果获取消息队列失败{perror("msgget"); // 打印错误信息并返回0return 0;}//3.发送消息msg.msg_type = 1; // 设置消息类型为1strcpy(msg.buf,"this msg type 1"); // 将消息内容复制到buf中ret = msgsnd(msgid,&msg,MSGLEN,0);//0表示不等待if(ret<0) // 如果发送消息失败{perror("msgsnd"); // 打印错误信息并返回0return 0;}
}

运行程序后,在终端输入ipcs -qc查看消息队列

在这里插入图片描述

这就是刚刚发送到消息队列的信息

接收端

  1. 打开/创建消息队列 msgget函数(发送端用过)

从消息队列接收消息

msgrcv函数

  1. 原型ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  2. 功能:从消息队列中接收一个消息
  3. 参数
    • msqid:消息队列的标识符,通常由msgget函数返回。
    • msgp:指向要接收的消息结构的指针,该结构包含消息类型和消息正文。
    • msgsz:消息的大小(以字节为单位)。
    • msgtyp指定要接收的消息类型
      • msgtyp=0:收到的第一条消息,任意类型。
      • msgtyp>0:收到的第一条 msg_typ类型的消息。
      • msgtyp<0:接收类型等于或者小于msgtyp绝对值的第一个消息。
    • msgflg
      • 0:阻塞式接收消息
      • IPC_NOWAIT:如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG
      • MSG_EXCEPT:与msgtyp配合使用返回队列中第一个类型不为msgtyp的消息
  4. 返回值
    • 成功时返回实际接收到的消息大小(以字节为单位)
    • 失败时返回-1
  5. 控制(删除)消息队列 msgctl函数
    1. 原型int msgctl(int msqid, int cmd, struct msqid_ds *buf);
    2. 功能:控制消息队列,包括获取和设置消息队列的属性
    3. 参数
      • msqid:消息队列的标识符,通常由msgget函数返回。
      • cmd:指定要执行的命令,可以是IPC_STATIPC_SET等。
      • buf:指向一个msqid_ds结构体的指针,用于存储或设置消息队列的属性。
    4. 返回值
      • 成功时返回0
      • 失败时返回-1

示例

接收消息队列的一条消息

#include <stdio.h> 
#include <sys/ipc.h> 
#include <sys/msg.h> 
#include <string.h> // 定义消息结构体
typedef struct
{long msg_type; // 消息类型char buf[128]; // 消息内容缓冲区
}msgT;    #define MSGLEN  (sizeof(msgT)-sizeof(long)) // 计算消息长度,减去长整型成员的长度
int main()
{int msgid; // 消息队列IDkey_t key; // 消息队列关键字msgT msg; // 消息结构体实例int ret; // 返回值key = ftok(".",100); // 生成消息队列关键字if(key<0){perror("ftok"); // 如果生成失败,打印错误信息return 0;}// 1.创建或获取消息队列IDmsgid = msgget(key,IPC_CREAT|0666); if(msgid<0){perror("msgget"); // 如果获取失败,打印错误信息return 0;}// 2.从消息队列中接收消息ret = msgrcv(msgid,&msg,MSGLEN,0,0); if(ret<0){perror("msgrcv"); // 如果接收失败,打印错误信息return 0;}printf("receiv msg type=%d,buf=%s\n",(int)msg.msg_type,msg.buf); // 打印接收到的消息类型和内容// 3.删除消息队列ret = msgctl(msgid,IPC_RMID,NULL);if(ret<0){perror("msgctl"); // 如果删除失败,打印错误信息return 0;}
}

这篇关于【并发程序设计】14.消息队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1025356

相关文章

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf