Kafka + Spring Boot: Hello World Example
GOAL
Создать два простейших Spring Boot приложения и настроить их общение друг с другом через Kafka.
PREREQUISITES
Считаем, что у Вас уже установлена Kafka и доступна по адресу localhost:9092
Если, Kafka еще не установлена, следуйте инструкции по установке Kafka: akuzmin.hashnode.dev/kafka-ustanovka-kafka
Также предполагаем, что Вы обладаете минимальными знаниями Spring Boot.
STEP BY STEP INSTRUCTION
- Создаем два Spring-Boot приложения с помощью Spring Initializr: start.spring.io
В качестве зависимостей в оба проекта добавим Web и kafka.
Первое - Kafka Consumer. Это приложение будет получать сообщения из Kafka.
Второе - Kafka Provider. Оно будет отправлять сообщения в Kafka.
- Откроем приложение Kafka Provider и создадим в нем ендпоинт для отпраки сообщения через Kafka:
package com.akuzmin.kafkaprovider.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(path = "kafka")
public class KafkaController {
private static final String KAFKA_TOPIC_NAME = "topic_name";
private static final String KAFKA_DATA_KEY = "data_key";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping(value = "/send")
public String send(@RequestParam String message) {
kafkaTemplate.send(KAFKA_TOPIC_NAME, KAFKA_DATA_KEY, message);
return "Message has been sent to Kafka";
}
}
Как видите работа с Kafka очень проста: используем бин KafkaTemplate, и одной строчкой отправляем сообщение.
- В этом же проекте вместо application.properties создадим файл application.yml (более устоявшийся в наше время формат) со следующим содержимым:
server:
port: 8081
spring:
kafka:
bootstrap-servers: localhost:9092
- Откроем приложение Kafka Consumer и настроим получение сообщений. Для этого создадим простой класс MessageListener:
ppackage com.akuzmin.kafkaconsumer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@EnableKafka
@Component
public class MessageListener {
private static final String KAFKA_TOPIC_NAME = "topic_name";
private final List<String> messages = new CopyOnWriteArrayList<>();
@KafkaListener(topics=KAFKA_TOPIC_NAME)
public void msgListener(String msg){
System.out.println("Kafka message received: " + msg);
messages.add(msg);
}
public List<String> getMessages() {
return new ArrayList<>(messages);
}
}
Также создадим класс ConsumerController, с помощью которого мы сможем просматривать принятые сообщения через web.
package com.akuzmin.kafkaconsumer.controller;
import com.akuzmin.kafkaconsumer.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping
public class ConsumerController {
@Autowired
private MessageListener messageListener;
@GetMapping(value = "/")
public List<String> list() {
return messageListener.getMessages();
}
}
- Также отредактируем файл application.yml:
server:
port: 8082
spring:
kafka:
consumer:
group-id: app.1
bootstrap-servers: localhost:9092
- Запустим оба приложения.
Отправка сообщения теперь доступна по URL: localhost:8081/kafka/send?message=hello_world
При каждой отправке сообщения увидим получение сообщения в консоли приложения Kafka Consumer:
Также мы можем просмотреть список всех принятых сообщений по адресу
ИТОГ
Таким образом мы реализовали отправку сообщения в одном Spring Boot приложении и его получение в другом Spring Boot приложении.