Dapr生产者消费者示例

[TOC]


新一代云原生应用开发流程:

框架 版本号
spring boot 5.5.2
dapr 1.3.1

1、Dapr开发及调试 Spring Boot + Rabbitmq 应用流程

1.1、安装开发环境

  • 安装 docker - 自行百度

  • 安装 dapr, 建议使用 windows 11 的 linux 子系统,对于多语言开发非常方便 - 自行百度

  • 初始化 dapr

    dapr init
    

    初始化完成会看到 docker 中启动了以下三个镜像:

    image-20211229150533042

1.2、创建 Spring Boot 应用

  • 新建 spring-boot 项目如图:

    image-20220118153328061

1.3、生产端开发

  • cloud-native-publisher 生产端 pom.xml 文件中引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk-actors</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk-springboot</artifactId>
    </dependency>
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-zipkin</artifactId>
    </dependency>
    
  • 编写控制器类

    @RestController
    public class PublishMsgController {
    
        /**
         * 消息数量
         */
        private static final int NUM_MESSAGES = 10000;
    
        /**
         * 消息生存时间
         */
        private static final String MESSAGE_TTL_IN_SECONDS = "1000";
    
        /**
         * 消息队列名称
         */
        private static final String TOPIC_NAME = "lite.product.save";
    
        /**
         * 消息队列服务器名称
         */
        private static final String PUBSUB_NAME = "rabbitbus";
    
        @Autowired
        protected DaprClient daprClient;
    
        @GetMapping
        public void sendMsg() {
            for (int i = 0; i < NUM_MESSAGES; i++) {
                String message = String.format("This is message #%d", i);
                daprClient.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
                System.out.println("Published message: " + message);
            }
            System.out.println("消息发送成功...");
        }
    }
    

1.4、消费端开发

  • cloud-native-subscriber 消费端 pom.xml 文件中引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk-actors</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dapr</groupId>
        <artifactId>dapr-sdk-springboot</artifactId>
    </dependency>
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-zipkin</artifactId>
    </dependency>
    
  • 编写消费端控制器

    public class SubscriberMsgController {
    
        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    
        /**
         * 消息队列名称
         */
        private static final String TOPIC_NAME = "lite.product.save";
    
        /**
         * 消息队列服务器名称
         */
        private static final String PUBSUB_NAME = "rabbitbus";
    
        @Autowired
        protected DaprClient daprClient;
    
        @Topic(name = TOPIC_NAME, pubsubName = PUBSUB_NAME)
        @PostMapping(path = "/liteProductSave")
        public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
            return Mono.fromRunnable(() -> {
                try {
                    System.out.println("Subscriber got: " + cloudEvent.getData());
                    System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }
    

1.5、调试工具配置及运行

  • Intellij 打开菜单 File / Settings / Tools / External Tools, 添加两个 daprd.exe 外部调试工具, 如图:

    image-20220118154759506

    • 第一个:

      Name: Daprd Publisher Service、

      Program: 配置为 C:\dapr\daprd.exe, daprd.exe 在初始化dapr以后生成在 C:\Users\wyf.dapr\bin 这个路径

      Arguments: -app-id cloud-native-publisher –components-path ./components/pubsub -app-port 8000 -dapr-http-port 60000 -dapr-grpc-port 50000 -placement-host-address localhost:50005 -metrics-port 9090

    • -app-id 应用名字

    • -app-port 应用端口号

    • -dapr-http-port 相当于docker中的映射参数, 将内部端口号 3000 映射到主机端口号 3005,浏览器中访问的时候用

    • -dapr-grpc-port 相当于docker中的映射参数, 将内部端口号 3000 映射到主机端口号 52000

    • -placement-host-address

    • 第二个:

      Name: Daprd Publisher Service、

      Program: 配置为 C:\dapr\daprd.exe, daprd.exe 在初始化dapr以后生成在 C:\Users\wyf.dapr\bin 这个路径

      Arguments: -app-id cloud-native-subscriber –components-path ./components/pubsub -app-port 8001 -dapr-http-port 60001 -dapr-grpc-port 50001 -placement-host-address localhost:50005 -metrics-port 9091

    • -app-id 应用名字

    • -app-port 应用端口号

    • -dapr-http-port 相当于docker中的映射参数, 将内部端口号 3000 映射到主机端口号 3005,浏览器中访问的时候用

    • -dapr-grpc-port 相当于docker中的映射参数, 将内部端口号 3000 映射到主机端口号 52000

    • -placement-host-address

  • 启动调试工具,点击 Intellij 菜单栏 Tools / External Tools 里面能看到

1.6、Spring Boot启动参数配置及运行

  • 配置 Spring Boot 应用启动参数,如图:

    image-20220118155029542

    image-20220118155047840

  • Debug模式启动应用

  • 打开浏览器访问: http://localhost:60000/v1.0/invoke/cloud-native-publisher/method 即可看到接口返回值 “test”

  • 访问方式:

    http://localhost:<端口号>/v1.0/invoke/<应用名称>/method/<方法名称, 没有不传, 比如上面的接口有个POST的 /say 接口, 就需要传 say>

1.7、启动 rabbitmq 消息队列服务

  • 启动后会发现消费者正在监听队列

    image-20220118155659967

1.8、启动 rabbitmq 消息队列服务



感谢您的反馈。如果您有关于如何使用 KubeSphere 的具体问题,请在 Slack 上提问。如果您想报告问题或提出改进建议,请在 GitHub 存储库中打开问题。