스프링 부트와 아파치 카프카를 활용한 고급 메시징 패턴

스프링 부트와 아파치 카프카를 활용한 고급 메시징 패턴

Kafka and Spring Boot

스프링 부트와 아파치 카프카는 많은 기업에서 메시징 시스템으로 사용되고 있습니다. 이 두 기술을 함께 활용하면 더욱 높은 수준의 메시징 패턴을 구현할 수 있습니다. 이번 글에서는 스프링 부트와 아파치 카프카를 활용하여 고급 메시징 패턴을 구현하는 방법에 대해 알아보겠습니다.

스프링 부트와 아파치 카프카 이해하기

스프링 부트

스프링 부트는 스프링 프레임워크를 기반으로 한 프레임워크입니다. 스프링 부트는 스프링의 여러 기능을 자동 구성해줍니다. 이를 통해 개발자는 더 적은 코드로 웹 애플리케이션을 개발할 수 있습니다.

스프링 부트는 내장형 서버를 제공합니다. 따라서 개발자는 별도의 웹 서버를 설치할 필요가 없습니다. 또한 스프링 부트는 자동으로 필요한 라이브러리를 다운로드하여 빌드합니다. 이를 통해 개발자는 더욱 더 쉽게 개발할 수 있습니다.

아파치 카프카

아파치 카프카는 대용량 메시지 처리에 특화된 분산 메시징 시스템입니다. 아파치 카프카는 수많은 메시지를 안정적으로 처리할 수 있습니다.

아파치 카프카는 다음과 같은 특징을 가지고 있습니다.

  • 대용량 메시지 처리: 아파치 카프카는 초당 수백만 개의 메시지를 처리할 수 있습니다.
  • 분산 처리: 아파치 카프카는 여러 대의 서버에 데이터를 분산하여 처리할 수 있습니다.
  • 안정성: 아파치 카프카는 메시지 손실 없이 안정적으로 메시지를 처리할 수 있습니다.

고급 메시징 패턴 구현하기

Publisher-Subscriber 패턴

Publisher-Subscriber 패턴은 메시지를 발행하는 Publisher와 메시지를 구독하는 Subscriber로 구성됩니다. Publisher는 토픽에 메시지를 발행하고, Subscriber는 해당 토픽을 구독하여 메시지를 수신합니다.

아파치 카프카는 이러한 Publisher-Subscriber 패턴을 기본적으로 지원합니다. Publisher는 Producer, Subscriber는 Consumer로 대응됩니다.

아래는 스프링 부트와 아파치 카프카를 활용하여 Publisher-Subscriber 패턴을 구현한 예시입니다.

@Service
public class KafkaMessageProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult>() {
            @Override
            public void onSuccess(SendResult result) {
                System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            }
        });
    }
}

@Service
public class KafkaMessageConsumer {

    @KafkaListener(topics = "${kafka.topic}")
    public void receive(String message) {
        System.out.println("Received message=[" + message + "]");
    }
}

Request-Reply 패턴

Request-Reply 패턴은 클라이언트가 요청을 보내면 서버는 요청을 처리하고 응답을 보내는 패턴입니다. 아파치 카프카에서는 Request-Reply 패턴을 기본적으로 지원하지 않습니다. 따라서 개발자는 이를 직접 구현해야 합니다.

아래는 Request-Reply 패턴을 구현한 예시입니다.

@Service
public class KafkaRequestProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public String sendMessage(String topic, String message) throws ExecutionException, InterruptedException {
        RequestReplyFuture future = RequestReplyFuture.newFuture(message, Duration.ofMillis(3000), kafkaTemplate, topic);
        ProducerRecord record = new ProducerRecord(topic, message);
        kafkaTemplate.send(record, future);
        ConsumerRecord response = future.get();
        return response.value();
    }
}

@Service
public class KafkaRequestConsumer {

    @KafkaListener(topics = "${kafka.topic}")
    @SendTo
    public String receive(String message) {
        System.out.println("Received message=[" + message + "]");
        return "Hello, " + message;
    }
}

Message Router 패턴

Message Router 패턴은 메시지를 받아서 여러 대상으로 라우팅하는 패턴입니다. 아파치 카프카에서는 Message Router 패턴을 기본적으로 지원하지 않습니다. 따라서 개발자는 이를 직접 구현해야 합니다.

아래는 Message Router 패턴을 구현한 예시입니다.

@Service
public class KafkaMessageRouter {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private List messageProcessors;

    @KafkaListener(topics = "${kafka.topic}")
    public void receive(String message) {
        KafkaMessageProcessor messageProcessor = messageProcessors.stream()
                .filter(processor -> processor.supports(message))
                .findFirst()
                .orElseThrow(() -> new RuntimeException("No message processor found for message: " + message));
        String outputTopic = messageProcessor.process(message);
        kafkaTemplate.send(outputTopic, message);
        System.out.println("Routed message=[" + message + "] to topic=[" + outputTopic + "]");
    }
}

public interface KafkaMessageProcessor {
    boolean supports(String message);
    String process(String message);
}

@Component
public class FooMessageProcessor implements KafkaMessageProcessor {

    @Override
    public boolean supports(String message) {
        return message.startsWith("foo");
    }

    @Override
    public String process(String message) {
        return "foo-output";
    }
}

@Component
public class BarMessageProcessor implements KafkaMessageProcessor {

    @Override
    public boolean supports(String message) {
        return message.startsWith("bar");
    }

    @Override
    public String process(String message) {
        return "bar-output";
    }
}

아파치 카프카 스트림즈 활용하기

아파치 카프카 스트림즈는 아파치 카프카에서 제공하는 스트림 처리 라이브러리입니다. 이를 활용하면 스트림 데이터를 처리할 수 있습니다.

아래는 아파치 카프카 스트림즈를 활용하여 데이터를 처리하는 예시입니다.

final StreamsBuilder builder = new StreamsBuilder();
KStream inputStream = builder.stream("input-topic");
KStream outputStream = inputStream.filter((key, value) -> value.contains("important"));
outputStream.to("output-topic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

메시지 트랜잭션 처리와 오류 대응하기

아파치 카프카에서는 메시지 트랜잭션 처리를 지원합니다. 이를 활용하면 여러 메시지를 하나의 트랜잭션으로 처리할 수 있습니다.

아래는 메시지 트랜잭션 처리를 구현한 예시입니다.

@Autowired
private KafkaTemplate kafkaTemplate;

@Transactional
public void sendMessageInTransaction(String topic, String message) {
    ListenableFuture<SendResult> future = kafkaTemplate.send(topic, message);
    future.addCallback(new ListenableFutureCallback<SendResult>() {
        @Override
        public void onSuccess(SendResult result) {
            System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.err.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            throw new RuntimeException(ex);
        }
    });
}

아파치 카프카에서는 오류 대응을 위한 다양한 기능을 제공합니다. 예를 들어, 오류 발생 시 재시도를 수행하는 기능이 있습니다.

아래는 오류 대응을 구현한 예시입니다.

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

    private ConsumerFactory consumerFactory() {
        Map props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory(props);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000L);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        return retryTemplate;
    }
}

@Service
public class KafkaMessageConsumer {

    @KafkaListener(topics = "${kafka.topic}")
    public void receive(String message) {
        System.out.println("Received message=[" + message + "]");
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }

    @KafkaListener(topics = "${kafka.retry.topic}")
    public void retry(String message) {
        System.out.println("Retrying message=[" + message + "]");
    }
}

결론

스프링 부트와 아파치 카프카는 메시징 시스템을 구현하는 데 매우 유용한 기술입니다. 이러한 기술을 활용하면 더욱 높은 수준의 메시징 패턴을 구현할 수 있습니다. 이번 글에서는 스프링 부트와 아파치 카프카를 활용하여 고급 메시징 패턴을 구현하는 방법에 대해 알아보았습니다.