Spring Cloud Stream 提供 Spring 應用程式建立以事件驅動為主的微服務系統,並可跟 RabbitMQ、Kafka 等 Message Queue 系統進行整合。本文將介紹如何使用 Spring Cloud Stream 加上 RabbitMQ 撰寫一個小型的事件驅動範例應用程式。
Spring Cloud Stream 和 RabbitMQ 主要是使用 Producer-Processor-Consumer 架構來處理訊息,Producer 傳送資料,Processor 接收並處理資料,然後 Consumer 接收處理過的資料並輸出結果:
讓我們來看看 3 個簡單的微服務——生產者 (Producer)、處理器 (Processor) 和消費者 (Consumer)。 生產者將透過 REST 端點接受字串並將訊息發佈到 RabbitMQ 主題。 處理器將從主題訂閱,將字串轉換為大寫並發佈到輸出主題。 消費者將訂閱輸出主題並在控制台 (Console) 中列印該值。 這 3 個服務將示範 Spring Cloud Stream 中的來源(Source)-處理器(Processor)-接收器(Sink)概念:
開始實作:
首先使用 Docker 在本地端運行一個 RabbitMQ 服務,在終端機輸入以下指令:
$ docker run -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 5672:5672 -p 15672:15672 --rm rabbitmq:3.8-management
運行成功後,可以使用 http://localhost:15672/ 開啟 RabbitMQ 的 Web 管理介面,帳號為 admin,密碼為 123456:
接著用 https://start.spring.io/ 建立一個新的 Spring Boot 專案,Artifact 為 demqmq;Dependencies 選擇 Spring Web、Cloud Stream、Spring for RabbitMQ:
然後用 IntelliJ IDEA 開啟 Spring Boot 專案的 pom.xml 檔案,載入專案,然後新增一個 Java Class,名稱為 ValueController,並加入以下程式碼:
package com.example.demomq;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ValueController {
private StreamBridge streamBridge;
public ValueController(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@GetMapping("values/{value}")
public ResponseEntity<String> values(@PathVariable String value) {
System.out.println("Sending value to topic: " + value);
streamBridge.send("values-topic", value);
return ResponseEntity.ok("ok");
}
}
說明:透過 streamBridge.send() 將 value 傳送到 values-topic 主題。
然後在 IntelliJ IDEA 裡新增一個 Java Class,名稱為 ValueProcessor,並加入以下程式碼:
package com.example.demomq;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Function;
@Component
public class ValueProcessor {
@Bean
public Function<String, String> convertToUppercase() {
return (value) -> {
System.out.println("Received: " + value);
String upperCaseValue = value.toUpperCase();
System.out.println("Sending: " + upperCaseValue);
return upperCaseValue;
};
}
}
說明:Function<String, String> 說明了 convertToUppercase 這個 bean 函數將接收一個 String,然後傳送一個 String。在 application.yml 裡可以設定使用 <函數名稱>-in-0 接收資料,這裡就是使用 convertToUppercase -in-0 這個名稱來接收資料;使用 <函數名稱>-out-0 來傳送資料,這裡就是使用 convertToUppercase -out-0 這個名稱來傳送資料。(完整的 application.yml 設定請看後面)。
接著在 IntelliJ IDEA 裡新增一個 Java Class,名稱為 ValueConsumer,並加入以下程式碼:
package com.example.demomq;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class ValueConsumer {
@Bean
public Consumer<String> onReceive() {
return (value) -> {
System.out.println("Received value in Consumer: " + value);
};
}
}
說明:Consumer<String> 說明了 onReceive 這個 bean 函數將接收一個 String。在 application.yml 裡可以使用 <函數名稱>-in-0 接收資料,這裡就是使用 onReceive-in-0 這個名稱來接收資料。(完整的 application.yml 設定請看後面)
然後刪除 src/main/resources 下的 application.properties 檔案,在 src/main/resources 下新增一個 application.yml 檔案,並填入以下內容:
spring:
cloud:
function:
definition: convertToUppercase;onReceive
stream:
bindings:
convertToUppercase-in-0:
destination: values-topic
group: processor
convertToUppercase-out-0:
destination: uppercase-values-topic
onReceive-in-0:
destination: uppercase-values-topic
group: consumer
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
說明:
spring.cloud.function.definition 填入 convertToUppercase、onReceive 這兩個 bean 函數名稱。
spring.cloud.stream.bindings.convertToUppercase-in-0.destination 指定 convertToUppercase 函數要從 values-topic 接收訊息,spring.cloud.stream.bindings.convertToUppercase-out-0.destination 指定 convertToUppercase 函數要將訊息輸出到 uppercase-values-topic,spring.cloud.stream.bindings.onReceive-in-0.destination 指定 onReceive 函數從 uppercase-values-topic 接收訊息。
spring.rabbitmq.host 填入 localhost,spring.rabbitmq.port 填入 5672,spring.rabbitmq.username 跟 spring.rabbitmq.password 分別填入 RabbitMQ 的帳號跟密碼。
接著運行 Spring Boot 應用程式,用 postman 或 API Tester 等工具進行測試:
然後可以看到在 IntelliJ IDEA 的 Console 裡有輸出結果,表示應用程式成功的從 RabbitMQ Cluster 傳送與接收訊息了:
最後,附帶一提,如果 Spring Boot 應用程式是連接到 RabbitMQ Cluster,那 yml 設定應該使用 addresses 而不是 host,addresses 填入多個 IP 位址,且用逗號隔開,如下圖所示: