筆記:使用 Spring Cloud Stream 和 RabbitMQ 發送與接收訊息

Spring Cloud Stream 提供 Spring 應用程式建立以事件驅動為主的微服務系統,並可跟 RabbitMQ、Kafka 等 Message Queue 系統進行整合。本文將介紹如何使用 Spring Cloud Stream 加上 RabbitMQ 撰寫一個小型的事件驅動範例應用程式。

Spring Cloud Stream 和 RabbitMQ 主要是使用 Producer-Processor-Consumer 架構來處理訊息,Producer 傳送資料,Processor 接收並處理資料,然後 Consumer 接收處理過的資料並輸出結果:

讓我們來看看 3 個簡單的微服務——生產者 (Producer)、處理器 (Processor) 和消費者 (Consumer)。 生產者將透過 REST 端點接受字串並將訊息發佈到 RabbitMQ 主題。 處理器將從主題訂閱,將字串轉換為大寫並發佈到輸出主題。 消費者將訂閱輸出主題並在控制台 (Console) 中列印該值。 這 3 個服務將示範 Spring Cloud Stream 中的來源(Source)-處理器(Processor)-接收器(Sink)概念:

開始實作:

首先使用 Docker 在本地端運行一個 RabbitMQ 服務,在終端機輸入以下指令:

$ docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 5672:5672 -p 15672:15672 --rm rabbitmq:3.8-management

運行成功後,可以使用 http://localhost:15672/ 開啟 RabbitMQ 的 Web 管理介面,帳號為 admin,密碼為 123456:

接著用 https://start.spring.io/ 建立一個新的 Spring Boot 專案,Artifact 為 demqmq;Dependencies 選擇 Spring WebCloud StreamSpring for RabbitMQ

然後用 IntelliJ IDEA 開啟 Spring Boot 專案的 pom.xml 檔案,載入專案,然後新增一個 Java Class,名稱為 ValueController,並加入以下程式碼:

package com.example.demomq;

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ValueController {
    private StreamBridge streamBridge;

    public ValueController(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @GetMapping("values/{value}")
    public ResponseEntity<String> values(@PathVariable String value) {
        System.out.println("Sending value to topic: " + value);
        streamBridge.send("values-topic", value);
        return ResponseEntity.ok("ok");
    }
}

說明:透過 streamBridge.send() 將 value 傳送到 values-topic 主題。

然後在 IntelliJ IDEA 裡新增一個 Java Class,名稱為 ValueProcessor,並加入以下程式碼:

package com.example.demomq;

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Function;

@Component
public class ValueProcessor {
    @Bean
    public Function<String, String> convertToUppercase() {
        return (value) -> {
            System.out.println("Received: " + value);
            String upperCaseValue = value.toUpperCase();
            System.out.println("Sending: " + upperCaseValue);
            return upperCaseValue;
        };
    }
}

說明:Function<String, String> 說明了 convertToUppercase 這個 bean 函數將接收一個 String,然後傳送一個 String。在 application.yml 裡可以設定使用 <函數名稱>-in-0 接收資料,這裡就是使用 convertToUppercase -in-0 這個名稱來接收資料;使用 <函數名稱>-out-0 來傳送資料,這裡就是使用 convertToUppercase -out-0 這個名稱來傳送資料。(完整的 application.yml 設定請看後面)。

接著在 IntelliJ IDEA 裡新增一個 Java Class,名稱為 ValueConsumer,並加入以下程式碼:

package com.example.demomq;

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class ValueConsumer {
    @Bean
    public Consumer<String> onReceive() {
        return (value) -> {
            System.out.println("Received value in Consumer: " + value);
        };
    }
}

說明:Consumer<String> 說明了 onReceive 這個 bean 函數將接收一個 String。在 application.yml 裡可以使用 <函數名稱>-in-0 接收資料,這裡就是使用 onReceive-in-0 這個名稱來接收資料。(完整的 application.yml 設定請看後面)

然後刪除 src/main/resources 下的 application.properties 檔案,在 src/main/resources 下新增一個 application.yml 檔案,並填入以下內容:

spring:
  cloud:
    function:
      definition: convertToUppercase;onReceive
    stream:
      bindings:
        convertToUppercase-in-0:
          destination: values-topic
          group: processor
        convertToUppercase-out-0:
          destination: uppercase-values-topic
        onReceive-in-0:
          destination: uppercase-values-topic
          group: consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456

說明:

spring.cloud.function.definition 填入 convertToUppercase、onReceive 這兩個 bean 函數名稱。

spring.cloud.stream.bindings.convertToUppercase-in-0.destination 指定 convertToUppercase 函數要從 values-topic 接收訊息,spring.cloud.stream.bindings.convertToUppercase-out-0.destination 指定 convertToUppercase 函數要將訊息輸出到 uppercase-values-topic,spring.cloud.stream.bindings.onReceive-in-0.destination 指定 onReceive 函數從 uppercase-values-topic 接收訊息。

spring.rabbitmq.host 填入 localhost,spring.rabbitmq.port 填入 5672,spring.rabbitmq.username 跟 spring.rabbitmq.password 分別填入 RabbitMQ 的帳號跟密碼。

接著運行 Spring Boot 應用程式,用 postman 或 API Tester 等工具進行測試:

然後可以看到在 IntelliJ IDEA 的 Console 裡有輸出結果,表示應用程式成功的從 RabbitMQ Cluster 傳送與接收訊息了:

最後,附帶一提,如果 Spring Boot 應用程式是連接到 RabbitMQ Cluster,那 yml 設定應該使用 addresses 而不是 host,addresses 填入多個 IP 位址,且用逗號隔開,如下圖所示: