Kafka Consumer

카프카에서 데이터 읽기

카프카 컨슈머를 이해하기 위해서는 카프카 컨슈머와 컨슈머 그룹을 알아야한다.

카프카와 컨슈머 그룹

카프카는 메시지를 발행하는 프로듀서가 여러 개가 될 수 있듯이, 컨슈머 또한 여러 컨슈머들이 존재할 수 있따.

이러한 컨슈머들은 컨슈머 그룹에 속한다.

가령 하나의 그룹은 특정 토픽 안에 있는 N개의 파티션을 읽어갈 수 있다.

중요한 것은 한 토픽 내의 파티션 수 보다 더 많은 컨슈머를 추가하는 것은 의미가 없다. ( 이렇게 되면 일부 컨슈머가 쉬게 된다. )

같은 토픽의 메시지를 다수의 애플리케이션이 읽어야하는 경우라면 카프카 그룹을 사용하자.

(각 컨슈머는 스레드로 구현되며 병행으로 실행된다.)

컨슈머 그룹과 리밸런싱

컨슈머 그룹의 컨슈머들은 자신이 읽는 토픽의 소유권을 공유한다.

그리고 새로운 컨슈머를 그룹에 추가하면 이전에 다른 커슈머가 읽던 파티션의 메시지들을 읽는다.

만약 컨슈머가 중단한다면 그 컨슈머가 읽던 파티션은 남은 컨슈머 중 하나가 재할당받아 읽는다.

해당 컨슈머 그룹들이 읽는 토픽에 변경 사항이 생기는 경우(관리자가 새로운 파티션을 추가하는 경우) 파티션의 재할당이 생길 수 있다.

한 컨슈머가 다른 컨슈머로 파티션 소유권 이전하는 것을 리밸런싱(rebalancing)이라 부른다.

리밸런시은 컨슈머 그룹의 강요성과 확장성을 높여주는 중요한 개념이다.

그러나 리밸런싱 되는 동안 컨슈머들이 메시지를 읽을 수 없으므로 해당 컨슈머 그룹 전체가 잠시 사용 불가 상태가 된다.

그룹 조정자(group coordinator)로 지정된 카프카 브로커에게 컨슈머가 하트비트를 전송하면 자신의 속한 컨슈머 그룹의 멤버십과 자신에게 할당된 파티션 소유권을 유지할 수 있다.

하트비트는 컨슈머가 폴링을 할 때 또는 읽은 메시지를 커밋할 때 자동 전송된다.

만약 컨슈머가 세션 타임아웃 시간이 경과될 때까지 하트비트 전송을 중단하면 GroupCoordinator가 해당 컨슈머를 중단된 것으로 간주하고 리밸런싱을 시작한다.

카프카 컨슈머 생성 및 토픽 구독하는 리스터 만들기

@Service
public class MessageListener {

    @KafkaListener(topics = "topic", groupId = "groupId", containerFactory = KafkaConsumerConfig.ConsumerContainer)
    public void listen(MessageDto dto) throws ResponseException {
        System.out.println("recv : " + dto);
    }
}

spring-kafka에서는 카프카 리스너를 애노테이션으로 간단하게 생성하고 구독시킬 수 있다.

카프카의 핵심 로직은 폴링이다.

리스너별 개별 스레드를 만들고 무한루프를 돌면서 카프카에게 폴링을 한다.

커밋과 오프셋

카프카는 다른 Java Message Service 시스템과는 다르게 컨슈머들이 읽는 레코드를 추적 관리한다.

카프카의 각 컨슈머는 파티션별로 자신이 읽는 레코드의 현재 위치를 추적 관리할 수 있다.

파티션 내부의 현재 위치를 변경하는 것을 커밋(commit)이라고 부른다.

컨슈머는 오프셋을 커밋하면 내부적으로 consumer_offsets라는 특별한 토픽에 메시지를 쓴다.

이 토픽은 모든 컨슈머의 오프셋을 갖는다.

그리고 모든 컨슈머들이 정상적으로 실행 중일 때는 오프셋을 커밋해도 아무런 영향을 주지 않는다.

그러나 기존 컨슈머가 비정상적으로 종료되었거나, 새로운 컨슈머가 컨슈머 그룹에 추가되면 오프셋 커밋은 리밸런싱을 유발한다.

리밸런싱이 끝나면 각 컨슈머는 종전과 다른 파티션들을 할당받게 될 수 있다.

따라서 어느 위치부터 메시지를 읽어야하는지에 대해 알기 위해 컨슈머는 각 파티션의 마지막 커밋 오프셋을 알아낸 후 거기서부터 계속 읽어야한다.

자동 커밋

자동 커밋은 가장 쉬운 오프셋 커밋 방법이며, 카프카 컨슈머 객체가 자동으로 오프셋을 커밋해준다.

그러나 중복이 발생할 수 있다는 잠재적인 문제를 안고 있다.

현재의 오프셋 커밋하기

enable.auto.commit을 false로 바꾸고 commitSync()를 사용하여 개발자가 명시적으로 반환된 오프셋을 받아볼 수 있다.

commitSync()poll()에서 반환된 가장 최근의 오프셋을 커밋한다.

비동기 커밋

브로커가 커밋 요청에 응답할 때 까지 애플리케이션이 일시 중단된다는 것이 수동 커밋의 단점이다.

이를 위해 비동기 커밋을 해볼 수 있다.

브로커의 커밋 응답을 기다리는 대신 커밋 요청을 전송하고 처리를 계속할 수 있다.

commitAsync() 재시도하지 않는다는 것이 단점이다.

왜냐하면 서버의 응답을 받는 사이에 이후의 다른 커밋이 먼저 성공할 수 있기 때문이다.

그래서 이런 경우에는 예외가 발생했을 때 로깅을 남겨 추적을 할 수 있다.

Reference

스크린샷 2021-04-28 오후 7 39 45

카프카 핵심 가이드



© 2022. by minkuk

Powered by minkuk