Kafka Producer

카프카에 메시지 쓰기

카프카는 메시지를 Produce(발행)하고 Consume(소비)하는 형태의 오픈소스이다.

메시지를 Produce하는 쪽을 Producer라고 부르며, 메시지를 Consume하는 쪽을 Consumer라고 부른다.

이번 장에서는 카프카 프로듀서에 대해서 배워볼 것이다.

프로듀서 API는 매우 간단하다.

우선 카프카에 쓰려는 메시지를 갖는 ProducerRecord를 생성해야한다.

ProducerRecord에는 토픽과 값으로 이루어져있으며 선택적으로 키와 파티션을 지정하는 것도 가능하다.

어쨌든 가장 중요한 것은 어떤 토픽어떤 값을 저장할 것인가가 프로듀서의 가장 큰 관심사이다.

ProducerRecord가 카프카로 전송하기위해서는 키와 값의 쌍으로 구성되는 메시지 객체들이 네트워크로 전송될 수 있도록 바이트 배열(Byte Array)로 직렬화해줘야한다는 점이다.

이를 위한 직렬화 처리 컴포넌트 Serializer가 담당한다.

이 후 파티셔너 컴포넌트에 전달된다.

ProducerRecord가 특정 파티션을 지정했다면 해당 파티션으로 가겠지만, 특별히 선택하지 않는다면 카프카가 ProducerRecord의 키를 기준으로 파티션을 하나 지정해준다.

이후 토픽과 파티션으로 전송될 레코드들을 모은 레코드 배치(Batch)에 추가되며, 별개의 스레드가 그 배치를 카프카 브로커에게 전송해준다.

브로커는 수신된 레코드의 메시지를 처리한 후 응답을 전송해주는데, 메시지가 성공하면 RecordMetadata 객체를 반환해준다.

만약 메시지 쓰기에 실패하면 에러를 반환하며 프로듀서는 메시지 쓰기를 포기하고 에러를 반환하기 전에 몇 번 더 재전송을 시도할 수 있다.

카프카 프로듀서 구성하기

ProducerRecord의 메시지를 카프카에 쓰기 위해 프로듀서의 기능을 수행하는 객체를 생성해야한다.

이 객체에 카프카에 관련된 설정을 해볼 수 있는데 다음 세 개의 필수 속성을 갖는다.

bootstrap.servers

카프카 클러스터에 최초 연결하기 위해 프로듀서가 사용하는 브로커들의 host:port 목록을 이 속성에 설정한다.

최소 2개정도의 host:port를 포함하는 것이 좋다.

key.serializer

프로듀서가 생성하는 레코드 (ProducerRecord)의 메시지 키를 직렬화하기 위해 사용되는 클래스 이름을 이 속성에 설정해야한다.

기본적으로 카프카 자체에서 ByteArraySerializer, StringSerializer, IntegerSerializer를 포함하고 있다.

value.serializer

레코드의 메시지 값을 직렬화하는데 사용되는 클래스를 이 속성에 설정한다.

방법은 key.serializer와 동일하다.

실제 사용법

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

책에서는 Java 클래스로 설정값을 관리하는 코드를 보여주었지만, 우리는 일반적으로 Spring Boot에서 개발할 것이므로 application.properties에서 다음과 같이 설정하여 설정값을 관리할 수 있다.

설정이 완료되면 카프카로 메시지 전송을 할 수 있다.

메시지 전송 방법을 선택할 수 있는데, 세 가지 방법이 있다.

File-and-forget(전송 후 망각)

send() 메서드로 메시지를 전송만 하고 성공 또는 실패 여부에 관계없이 후속 조치를 취하지 않는 방법이다.

이 방법은 잘못하면 메시지가 유실될 가능성도 존재한다.

Synchronous send(동기식 전송)

send() 메서드로 메시지를 전송하면 자바의 Future 객체가 반환된다.

Future의 get() 메서드를 곧장 호출하면 작업이 완료될 때까지 기다렸다가 브로커로부터 결과가 반환되므로 send()가 성공적으로 수행되었는지 알 수 있다.

Asynchronous send(비동기식 전송)

send() 메서드의 호출 후 콜백 메서드를 구현한 객체를 매개변수로 전달한다.

이 객체에 구현된 콜백 메서드는 카프카 브로커로부터 응답을 받을 때 자동으로 호출되므로 send()가 성공적으로 수행되었는지 알 수 있다.

콜백 메서드는 카프카 브로커로부터 응답을 받을 때 자동으로 호출되므로 send()가 성공적으로 수행되었는지 알 수 있다.

카프카에 메시지 전송하기

메시지를 전송하기 위한 코드는 다음과 같다.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        try{
            this.kafkaTemplate.send(TOPIC, message);
        } catch(Exception e) {
            e.printStackTrace();
        }

    }
}

이 역시 책의 예제 보다는, 스프링에서 제공하는 Spring Kafka를 예제로 가져와보았다.

여기서 메시지 전송에 응답을 받지 않았으므로 응답의 성공,실패 여부를 알 수 없다.

어디까지나 예제 코드이므로 실제 애플리케이션에서는 이렇게 처리하지 않는다.

또한 카프카에게 메시지를 전송하기 전에 프로듀서에서 에러가 생기면 예외가 발생할 수 있다.

메시지의 직렬화 실패에는 SerializationException, 버퍼가 가득차면 BufferExhaustedException, 시간 초과라면 TimeoutException 예외가 발생한다.

동기식 메시지 전송 예시

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        try{
            this.kafkaTemplate.send(TOPIC, message).get();
        } catch(Exception e) {
            e.printStackTrace();
        }

    }
}

여기서는 get 메서드를 사용해 카프카의 응답을 기다린다.

카프카 프로듀서를 사용할 때는 두 가지 유형의 에러가 발생할 수 있다고 한다.

우선 메시지를 다시 전송하면 해결될 수 있는 재시도 가능한 에러 (retriable) 에러가 있다.

이 메시지 전송을 다시 하여 해결할 수 있는 예는, Connection 에러이다. 연결을 다시 시도하면 해결된다.

그리고 no leader가 있는데, 이는 파티션의 새로운 리더가 선출되면 해결된다.

카프카는 이러한 에러들을 자동으로 재시도하도록 구성할 수 있다.

재시도 횟수가 소진되거나 에러가 해결되지 않으면 애플리케이션 코드가 재시도 관련 예외를 받는다.

그러나 재시도 해도 예외가 해결되지 않을 때가 있는데, 이는 메시지 크기가 너무 큰 경우이다.

이 경우 카프카 프로듀서는 재시도 없이 그 즉시 애플리케이션에 예외를 발생시킨다.

비동기식으로 메시지 전송하기

애플리케이션과 카프카 클러스터간의 네트워크 왕복시간(RTT, roundtrip time)이 최대 10밀리초라면, 100개의 메시지를 전송하는데 약 1초의 시간이 걸린다.

이와 달리 모든 메시지 전송만 하고 기다리지 않는다면 100개의 메시지 전송에는 아무리 느려도 10밀리초 밖에 걸리지 않는다.

실제로 대부분의 경우에 응답을 기다려서 받아야할 이유가 없다.

왜냐하면 프로듀서의 경우 메시지를 쓴 이후 응답에 대한 메타데이터(토픽, 파티션, 오프셋)가 필요없기 떄문이다.

그러나 메시지 전송에 완전히 실패했을 때는 해당 내용을 알아야 한다.

이를 위해 에러를 로그에 쓰는 방식으로 처리해볼 수 있다.

비동기식으로 메시지를 전송하고 받기 위해 프로듀서에 콜백을 추가할 수 있다.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));

        ListenableFuture<SendResult<String, String>> future;
        try{
            future = this.kafkaTemplate.send(TOPIC, message);
        } catch(Exception e) {
            e.printStackTrace();
        }

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            }
        });

    }
}

비동기 전송 방법은 응답으로 오는 future에 콜백 메소드를 등록해둠으로써 가능하다.

직렬처리기

카프카로 전송해야하는 객체가 단순한 문자열이나 정수가 아닌 경우라면 Avro,Thrift, Protobuf와 같은 범용 직렬화 라이브러리를 사용해 레코드를 생성하거나 이미 사용중인 객체의 커스텀 직렬처리기를 만들 수 있다.

그러나 저자는 가급적이면 범용 직렬화 라이브러리의 사용을 적극 권장하고 있다.

아파치 Avro를 사용해 직렬화해보기

아파치 Avro는 언어 중립적인 데이터 직렬화 시스템이다.

Avro는 스키마 데이터 구조 표현에 JSON을 지원하는데 직렬화에는 주로 이진 파일을 사용한다.

Avro가 카프카와 같은 메시지 시스템에 강점이 있는 이유는, 애플리케이션이 새로운 스키마(데이터 구조)로 변경하더라도 해당 메시지를 읽는 애플리케이션은 일체의 변경 없이 계속해서 메시지를 처리할 수 있어야하기 때문이다.

Avro는 이러한 메시지의 변경을 유연하게 처리해준다.

Avro는 이러한 스키마를 파일로 관리하는데, 이렇게 Avro 파일을 사용하는 것 보다 더 좋은 방법에 대해 알아보자.

카프카에서 Avro 레코드 사용하기

데이터 파일에 스키마 전체를 저장하는 Avro 파일과는 다르게, 데이터를 갖는 각 레코드에 스키마 전체를 저장하면 어떨까?

각 레코드 크기가 2배 이상이 되므로 이는 꽤 부담이 될 것이다.

그러나 레코드를 읽을 때 Avro는 스키마 전체를 필요로 하므로, 스키마를 레코드가 아닌 다른 곳에 저장해두어야한다.

이러한 아키텍처 패턴을 스키마 레지스트리 패턴을 사용한다.

카프카에 데이터를 쓰는데 사용하는 모든 스키마를 레지스트리에 저장하는 것이 스키마 레즈스티리이다.

그 후 카프카에 쓰는 레코드에 사용된 스키마의 식별자(id)만 저장하면 된다.

그러면 컨슈머는 해당 식별자를 사용해 스키마 레지스트리의 스키마를 가져온 후 이 스키마에 맞게 데이터를 역직렬화 할 수 있다.

파티션

ProducerRecord 객체는 토픽 이름과 키와 값을 포함한다.

키를 명시하지않으면 null이 되는데, 이 경우 카프카의 기본 파티셔너가 사용 가능한 토픽의 파티션들 중 하나를 무작위로 선택되어 해당 레코드가 저장되는 방식이다.

이 파티션을 선정하는데에는 라운드 로빈 알고리즘을 사용하고 있다.

당연히 키가 있으면서 기본 파티셔너가 사용되는 경우라면, 카프카에서 키의 해시 값을 구한 후 (파티셔너 자체의 해싱 알고리즘을 사용하므로 자바 버전 업그레이드와는 관련이 없음) 그 값에 따라 특정 파티션에 메시지를 저장한다.

같은 키와 대응되는 파티션이 변경되지 않게 하려면 충분한 수의 파티션을 갖는 토픽을 생성하고 이후에는 파티션을 추가하지 않는 것이 가장 이상적이라고 할 수 있다.

커스텀 파티셔너

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;

public class CustomPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null)
            throw new InvalidRecordException("Message must have a key");
        else if (!(key instanceof String))
            throw new InvalidRecordException("A key must be instanceof String");

        if((String) key.equals("Harry")) {
            return numPartitions-1;
        }

        return (Math.abs(Utils.mumur2(keyBytes)) % (numPartitions-1))
    }

    public void close() {}

반드시 특정 키의 해시값을 특정 파티션에 매핑하고 싶을 수도 있다.

가령 B2B 서비스에서 특정 기업의 판매량이 매우 높아서 전체 트랜잭션의 10%이상이 해당 고객과의 거래로 발생할만큼 양이 많다면, 해당 서비스에서 발생한 Key는 다른 파티션에 매핑될 확률이 높다.

그 해당 기업의 이름이 Harry라고 했을 때 이 Harry 기업의 메시지는 특정 파티션에만 매핑되게 하려면 커스텀 파티셔너를 구현해야한다.

Reference

스크린샷 2021-04-28 오후 7 39 45

카프카 핵심 가이드



© 2022. by minkuk

Powered by minkuk