Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

2023-12-25 17:58

本文主要是介绍Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》


  1. 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    • 创建 rabbiqmq.yaml

      apiVersion: dapr.io/v1alpha1
      kind: Component
      metadata:
      name: messagebus
      spec:
      type: pubsub.rabbitmq
      metadata:
      - name: hostvalue: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"
      - name: consumerIDvalue: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"
      - name: durablevalue: "true" # Optional. Default: "false"
      - name: deletedWhenUnusedvalue: "false" # Optional. Default: "false"
      - name: autoAckvalue: "false" # Optional. Default: "false"
      - name: deliveryModevalue: "2" # Optional. Default: "0". Values between 0 - 2.
      - name: requeueInFailurevalue: "true" # Optional. Default: "false".
  2. 改造 StorageService.Api

    目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代码为

      public static IHostBuilder CreateHostBuilder(string[] args)
      {return Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.ConfigureKestrel(options =>{options.Listen(IPAddress.Loopback, 5003, listenOptions =>{listenOptions.Protocols = HttpProtocols.Http2;});});webBuilder.UseStartup<Startup>();});
      }
    • 添加 DaprClientService

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}
      }

      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

    • 修改 Startup.cs

       /// <summary>
      /// This method gets called by the runtime. Use this method to add services to the container.
      /// </summary>
      /// <param name="services">Services.</param>
      public void ConfigureServices(IServiceCollection services)
      {services.AddGrpc();services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });
      }
      /// <summary>
      /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
      /// </summary>
      /// <param name="app">app.</param>
      /// <param name="env">env.</param>
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
      {if (env.IsDevelopment()){app.UseDeveloperExceptionPage();}app.UseRouting();app.UseEndpoints(endpoints =>{endpoints.MapSubscribeHandler();endpoints.MapGrpcService<DaprClientService>();});
      }
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件

    • 启动 StorageService 服务

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
  3. 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为
    • 下单
    • 查看订单详情
    • 获取订单列表

    在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

    • 创建 CreateOrder.proto 文件

      syntax = "proto3";package daprexamples;option java_outer_classname = "CreateOrderProtos";
      option java_package = "generate.protos";service OrderService {rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);
      }message CreateOrderRequest {string ProductID = 1; //Product IDint32 Amount=2; //Product Amountstring CustomerID=3; //Customer ID
      }message CreateOrderResponse {bool Succeed = 1; //Create Order Result,true:success,false:fail
      }message RetrieveOrderRequest{string OrderID=1;
      }message RetrieveOrderResponse{Order Order=1;
      }message GetOrderListRequest{string CustomerID=1;
      }message GetOrderListResponse{repeated Order Orders=1;
      }message Order{string ID=1;string ProductID=2;int32 Amount=3;string CustomerID=4;
      }
    • 使用 protoc 生成 Java 代码

      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
    • 引用 MyBatis 做为 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 服务

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
  4. 创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件

    • 引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

      如未安装 protoc-gen-gogo ,通过一下命令获取并安装

      go get github.com/gogo/protobuf/gogoproto

      安装 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto

      根据 proto 文件生成代码

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 客户端代码,创建订单

      ...response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{Id:     "OrderService",Data:   createOrderRequestData,Method: "createOrder",})if err != nil {fmt.Println(err)return}...
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构

      syntax = "proto3";package daprexamples;option java_outer_classname = "DataToPublishProtos";
      option java_package = "generate.protos";message StorageReduceData {string ProductID = 1;int32 Amount=2;
      }
    • 生成 DataToPublish 代码

       protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列

      ...createOrderResponse := &daprexamples.CreateOrderResponse{}if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {fmt.Println(err)return
      }
      fmt.Println(createOrderResponse.Succeed)if !createOrderResponse.Succeed {//下单失败return
      }storageReduceData := &daprexamples.StorageReduceData{ProductID: createOrderRequest.ProductID,Amount:    createOrderRequest.Amount,
      }
      storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)
      if err != nil {fmt.Println(err)return
      }_, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{Topic: "Storage.Reduce",Data:  &any.Any{Value: storageReduceDataData},
      })fmt.Println(storageReduceDataData)if err != nil {fmt.Println(err)
      } else {fmt.Println("Published message!")
      }
      ...

      注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端

       dapr run --app-id client go run main.go

      输出

      == APP == true
      == APP == Published message!
  5. RabbitMQ

    • 在浏览器中输入 http://localhost:15672/ ,账号和密码均为 guest
    • 查看 Connections ,有3个连接
      • 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
    • 查看 Exchanges

      Name            Type    Features    Message rate in Message rate out
      (AMQP default)  direct  D
      Storage.Reduce  fanout  D
      amq.direct      direct  D
      amq.fanout      fanout  D
      ...

      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

    • 查看 Queues

      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打开 DaprClientService.cs 文件,更改内容为

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {private readonly StorageContext _storageContext;public DaprClientService(StorageContext storageContext){_storageContext = storageContext;}public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context){if (request.Topic.Equals("Storage.Reduce")){StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());Console.WriteLine("ProductID:" + storageReduceData.ProductID);Console.WriteLine("Amount:" + storageReduceData.Amount);await HandlerStorageReduce(storageReduceData);}return new Empty();}private async Task HandlerStorageReduce(StorageReduceData storageReduceData){Guid productID = Guid.Parse(storageReduceData.ProductID);Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));if (storageFromDb == null){return;}if (storageFromDb.Amount < storageReduceData.Amount){return;}storageFromDb.Amount -= storageReduceData.Amount;Console.WriteLine(storageFromDb.Amount);await _storageContext.SaveChangesAsync();}
    • 说明
      • 添加 GetTopicSubscriptions() 将完成对主题的关注
        • 当应用停止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理
      • HandlerStorageReduce 用于减少库存
  7. 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
    • Java

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
    • go

      dapr run --app-id client  go run main.go

      go grpc 输出为

      == APP == true
      == APP == Published message!

    查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

源码地址

这篇关于Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/536411

相关文章

Spring Boot项目部署命令java -jar的各种参数及作用详解

《SpringBoot项目部署命令java-jar的各种参数及作用详解》:本文主要介绍SpringBoot项目部署命令java-jar的各种参数及作用的相关资料,包括设置内存大小、垃圾回收... 目录前言一、基础命令结构二、常见的 Java 命令参数1. 设置内存大小2. 配置垃圾回收器3. 配置线程栈大小

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S

Spring中配置ContextLoaderListener方式

《Spring中配置ContextLoaderListener方式》:本文主要介绍Spring中配置ContextLoaderListener方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录Spring中配置ContextLoaderLishttp://www.chinasem.cntene

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码