Kafka + Spring Boot: Hello World Example

GOAL

Создать два простейших Spring Boot приложения и настроить их общение друг с другом через Kafka.

PREREQUISITES

Считаем, что у Вас уже установлена Kafka и доступна по адресу localhost:9092

Если, Kafka еще не установлена, следуйте инструкции по установке Kafka: akuzmin.hashnode.dev/kafka-ustanovka-kafka

Также предполагаем, что Вы обладаете минимальными знаниями Spring Boot.

STEP BY STEP INSTRUCTION

  • Создаем два Spring-Boot приложения с помощью Spring Initializr: start.spring.io

В качестве зависимостей в оба проекта добавим Web и kafka.

Первое - Kafka Consumer. Это приложение будет получать сообщения из Kafka.

kafka-consumer.jpg

Второе - Kafka Provider. Оно будет отправлять сообщения в Kafka.

kafka-provider.jpg

  • Откроем приложение Kafka Provider и создадим в нем ендпоинт для отпраки сообщения через Kafka:
package com.akuzmin.kafkaprovider.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(path = "kafka")
public class KafkaController {

    private static final String KAFKA_TOPIC_NAME = "topic_name";
    private static final String KAFKA_DATA_KEY = "data_key";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping(value = "/send")
    public String send(@RequestParam String message) {
        kafkaTemplate.send(KAFKA_TOPIC_NAME, KAFKA_DATA_KEY, message);
        return "Message has been sent to Kafka";
    }
}

Как видите работа с Kafka очень проста: используем бин KafkaTemplate, и одной строчкой отправляем сообщение.

  • В этом же проекте вместо application.properties создадим файл application.yml (более устоявшийся в наше время формат) со следующим содержимым:
server:
  port: 8081

spring:
  kafka:
    bootstrap-servers: localhost:9092
  • Откроем приложение Kafka Consumer и настроим получение сообщений. Для этого создадим простой класс MessageListener:
ppackage com.akuzmin.kafkaconsumer;

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@EnableKafka
@Component
public class MessageListener {

    private static final String KAFKA_TOPIC_NAME = "topic_name";

    private final List<String> messages = new CopyOnWriteArrayList<>();

    @KafkaListener(topics=KAFKA_TOPIC_NAME)
    public void msgListener(String msg){
        System.out.println("Kafka message received: " + msg);
        messages.add(msg);
    }

    public List<String> getMessages() {
        return new ArrayList<>(messages);
    }
}

Также создадим класс ConsumerController, с помощью которого мы сможем просматривать принятые сообщения через web.

package com.akuzmin.kafkaconsumer.controller;

import com.akuzmin.kafkaconsumer.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping
public class ConsumerController {

    @Autowired
    private MessageListener messageListener;

    @GetMapping(value = "/")
    public List<String> list() {
        return messageListener.getMessages();
    }
}
  • Также отредактируем файл application.yml:
server:
  port: 8082

spring:
  kafka:
    consumer:
      group-id: app.1
    bootstrap-servers: localhost:9092
  • Запустим оба приложения.

Отправка сообщения теперь доступна по URL: localhost:8081/kafka/send?message=hello_world

При каждой отправке сообщения увидим получение сообщения в консоли приложения Kafka Consumer:

messages.jpg

Также мы можем просмотреть список всех принятых сообщений по адресу

localhost:8082

ИТОГ

Таким образом мы реализовали отправку сообщения в одном Spring Boot приложении и его получение в другом Spring Boot приложении.