티스토리 뷰

Kafka

[Kafka] 4. 카프카 컨슈머

mandykr 2023. 2. 2. 11:04

목차

1. 컨슈머 Client

2. Poll

3. Heartbeat Thread

4. 리밸런싱

5. 오프셋 커밋

6. 특정 파티션만 할당

7. 컨슈머의 안전한 종료

8. 멀티 스레드 컨슈머

9. 컨슈머 랙

10. 스프링 카프카 컨슈머

 

 

 

1. 컨슈머 Client

카프카 컨슈머는 하나의 컨슈머 그룹에 속해 실행되며 카프카 브로커의 파티션은 컨슈머 그룹내 하나 이상의 컨슈머에 할당된다.

 

기본 컨슈머 Client API 처리 로직

// 1. Consumer 환경 설정(Properties 객체를 이용)
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-01");
        
// 2. 1에서 설정한 환경 설정값을 반영하여 KafkaConsumer 객체 생성.
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

// 3. 읽어들일 Topic을 subscribe()를 호출하여 설정
kafkaConsumer.subscribe(List.of(topicName));

// 4. 지속적으로 poll( ) 을 호출하여 Topic의 새로운 메시지를 계속 읽어 들임.
while (true) {
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord record : consumerRecords) {
        logger.info("record key:{}, record value:{}, partition:{}",
                record.key(), record.value(), record.partition());
    }
}
        
// 5. KafkaConsumer객체를 명확하게 close() 수행
kafkaConsumer.close();

 

기본 CLI 명령어

# Consumer Group id group_01을 가지는 consumer를 생성
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --group group_01 --topic topic-p3r3 \
--property print.key=true --property print.value=true \
--property print.partition=true

 

kafka-consumer-groups.sh

# 컨슈머 그룹 목록 확인
$ bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --list

# 컨슈머 그룹 내의 컨슈머 정보 확인
$ bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --describe \
--group group_01

GROUP    TOPIC    PARTITION    CURRENT-OFFSET    LOG-END-OFFSET    LAG    CONSUMER-ID    HOST    CLIENT-ID
goupe_01 topic-p3r3 ...

# 컨슈머 그룹 삭제
$ bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --delete \
--group group_01

 

2. Poll

 

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));

 

동작 과정

  1. ConsumerNetworkClient는 비동기로 계속 브로커의 메시지를 가져와서 Linked Queue에 저장
  2. Fetcher는 Linked Queue에서 데이터를 읽음
  3. Linked Queue에 데이터가 있을 경우 Fetcher는 데이터를 가져오고 반환 하며 poll() 수행 완료
  4. Linked Queue에 데이터가 없을 경우 Fetcher가 ConsumerNetworkClient에게 데이터를 브로커로 부터 가져올 것을 요청
  5. ConsumerNetworkClient는 1000ms 까지 Broker에 메시지 요청 후 poll 수행() 완료

 

Consumer Fetcher 설정 파라미터

파라미터 기본값 설명
fetch.min.bytes 1 Fetcher가 record들을 읽어들이는 최소 bytes. 
브로커는 지정된 fetch.min.bytes 이상의 새로운 메시지가 쌓일때 까지 전송을 하지 않음. 
fetch.max.wait.ms 500ms 브로커에 fetch.min.bytes 이상의 메시지가 쌓일 때까지 최대 대기 시간. 
fetch.max.bytes 50MB Fetcher가 한번에 가져올 수 있는 최대 데이터 bytes. 
max.partition.fetch.bytes 1MB Fetcher가 파티션별 한번에 최대로 가져올 수 있는 bytes.
max.poll.records 500 Fetcher가 한번에 가져올 수 있는 레코드 수. 

 

Consumer Fetcher 설정 파라미터 동작 방식

  1. 가져올 데이터가 1건도 없으면 poll( ) 인자 시간만큼 대기 후 Return
  2. 가져와야할 과거 데이터가 많을 경우 max.partition.fetch.bytes로 배치 크기 설정.
    그렇지 않을 경우 fetch.min.bytes로 배치 크기 설정
  3. 가장 최신의 offset 데이터를 가져오고 있다면 fetch.min.bytes 만큼 가져오고 return 하고 
    fetch.min.bytes만큼 쌓이지 않는다면 fetch.max.wait.ms 만큼 기다린 후 return
  4. 오랜 과거 offset 데이터를 가져 온다면 최대 max.partition.fetch.bytes 만큼 파티션에서 읽은 뒤 반환
  5. max.partition.feth.bytes에 도달하지 못하여도 가장 최신의 offset에 도달하면 반환
  6. 토픽에 파티션이 많아도 가져오는 데이터량은 fetch.max.bytes로 제한
  7. Fetcher가 Linked Queue에서 가져오는 레코드의 개수는 max.poll.records로 제한

 

3. Heartbeat Thread

카프카 컨슈머 클라이언트는 Heart Beat Thread 를 통해서 주기적으로 브로커의 Group Coordinator에 컨슈머의 상태를 전송한다.

Group Coordinator 는 카프카 브로커중 한대가 담당하는데 컨슈머 Hearbeat 을 받아 컨슈머가 동작중인지 확인하고,

Heartbeat 이 오지 않으면 컨슈머가 다운되었다고 판단해 컨슈머를 컨슈머 그룹에서 제외시킨다.(리밸런스를 컨슈머로 명령한다.)

 

4. 리밸런싱

리밸런싱은 컨슈머 그룹 내에서 컨슈머에 변경이 생길 때 파티션과 컨슈머의 조합을 변경하는 과정이다.

브로커의 그룹 코디네이터가 컨슈머들에게 파티션을 재할당하는 리밸런싱을 지시한다.

 

1) 리밸런싱 과정

① 컨슈머 그룹내의 컨슈머가 브로커에 최초 접속을 요청하면 Group Coordinator 생성
② 동일 group.id로 여러 개의 컨슈머로 브로커의 Group Coordinator로 접속
③ 가장 빨리 Group에 Join 요청을 한 컨슈머에게 컨슈머 그룹내의 리더 컨슈머로 지정
④ 리더 컨슈머는 파티션 할당전략에 따라 컨슈머들에게 파티션 할당
⑤ 리더 컨슈머는 최종 할당된 파티션 정보를 Group Coordinator에게 전달
⑥ 정보 전달 성공을 공유한 뒤 개별 컨슈머들은 할당된 파티션에서 메시지 읽음

 

⑦  컨슈머 그룹 내에 새로운 컨슈머가 추가되거나 기존 컨슈머가 종료되면(Heartbeat 를 받지 못함) Group Coordinator는 컨슈머들에게 파티션을 재할당하는 리밸런싱을 수행하도록 지시

 

2) Consumer Group Status

컨슈머 그룹은 Group Meta Data 에 컨슈머 그룹의 상태를 저장한다.

  • Empty : 컨슈머 그룹은 존재하지만 컨슈머는 없음
  • Rebalance : Rebalance 수행
  • Stable : Rebalance가 종료되고 안정적으로 컨슈머 운영

 

3) Consumer Static Group Membership

컨슈머 그룹 내의 컨슈머들에게 고정된 IP를 부여하여,

컨슈머 별로 컨슈머 그룹에 최초 조인 시 할당된 파티션을 그대로 유지하고 컨슈머가 shutdown되어도
session.timeout.ms내에 재기동되면 리밸런스가 수행되지 않고, 기존 파티션이 재할당 된다.

 

컨슈머 그룹 내에서 리밸런스가 수행되면 모든 컨슈머들이 데이터를 소비하지 못하고 Consumer LAG 이 길어지게 되는데

Consumer 스태틱 그룹 멤버쉽으로 Consumer Restart 에서 불필요한 리밸런스가 발생하지 않도록 한다.

 

Consumer LAG
프로듀서가 데이터를 넣는 속도가 컨슈머가 가져가는 속도보다 빠른 경우,
토픽의 가장 최신 오프셋(LOG-END-OFFSET)과 컨슈머가 가져간 데이터의 오프셋(CURRENT-OFFSET)의 차이를 kafka consumer lag이라 부른다.

 

4) 리밸런스 관련 컨슈머 파라미터

파라미터 기본값(ms) 설명
heartbeat.interval.ms 3000 Heart Beat Thread가 Heart Beat을 보내는 간격.
session.timeout.ms 보다 낮게 설정되어야 함.
Session.timeout.ms의 1/3 보다 낮 게 설정 권장.
session.timeout.ms 45000 브로커가 Consumer로 Heart Beat을 기다리는 최대 시간.
브로커는 이 시간동 안 Heart beat을 consumer로 부터 받지 못하면 해당 consumer를 Group에서 제 외하도록 rebalancing 명령을 지시
max.poll.interval.ms 300000 이전 poll( )호출 후 다음 호출 poll( )까지 브로커가 기다리는 시간.
해당 시간동 안 poll( )호출이 Consumer로 부터 이뤄지지 않으면 해당 consumer는 문제가 있 는 것으로 판단하고 브로커는 rebalance 명령을 보냄.

heartbeat.interval.ms 마다 컨슈머는 heartbeat 을 브로커로 보내고 컨슈머가 session.timeout.ms 만큼 heartbeat 을 보내지 않거나 max.poll.interval.ms 만큼 poll() 호출을 하지 않으면 브로커(코디네이터)는 리밸런스 명령을 리더 컨슈머에게 보낸다.

컨슈머에서 메시지를 읽어서 처리하는 서비스 로직이 max.poll.interval.ms 보다 커지면 의도치 않게 리밸런스가 발생할 수 있기 때문에 서비스 로직의 수행 시간을 줄일 수 없다면 max.poll.interval.ms 를 늘리거나 파티션을 추가하는 등의 방법을 고려해야 한다.

 

 

5) Consumer Rebalancing Protocol

Eager 모드

Rebalance 수행시 컨슈머들의 모든 파티션 할당을 취소하고, 새롭게 Consumer에 파티션을 다시 할당 받고 다시 메시지를 읽는다. 모든 컨슈머가 Rebalance 되는 동안 잠시 메시지를 읽지 않는 시간으로 인해 Lag가 상대적으로 크게 발생할 가능성 있다.

 

Cooperative 모드

Rebalance 수행 시 컨슈머들의 모든 파티션 할당을 취소하지 않고 대상이 되는 컨슈머들에 대해서 파티션에 따라 점진적으로(Incremental) 컨슈머를 할당하면서 Rebalance를 수행한다. 많은 컨슈머를 가지는 컨슈머 그룹내에서 Rebalance 시간이 오래 걸릴 시 활용도 높다.

 

6) Consumer 파티션 할당 전략

카프카 컨슈머는 Consumer의 부하를 파티션 별로 균등하게 할당하고 데이터 처리 및 리밸런싱의 효율성 극대화하기 위해 4개의 파티션 할당 전략 유형을 제공한다.

 

(1) Round Robin 할당 전략

Round Robin 전략은 토픽들의 파티션별로 순차적으로 Consumer에 할당하므로 파티션 매핑이 Consumer별로 비교적 균일하게 할당한다.

Rebalancing 시에도 토픽들의 파티션과 Consumer들을 균등하게 매핑하므로 Rebalance 이전의 파티션과 Consumer들의 매핑이 변경되기 쉽다.

 

 

(2) Range 할당 전략 (기본 전략)

Range 전략은 서로 다른 토픽들의 동일한 파티션들을 같은 Consumer로 할당한다.

Rebalancing 시에도 서로 다른 토픽에서 동일한 키값을 가지는 파티션들은 같은 Consumer에서 처리 할 수 있도록 유도한다.

 

(3) Sticky 할당 전략

Sticky 전략은 Rebalancing 시 기존 토픽들의 파티션과 Consumer 매핑은 최대한 유지하고 재할당되어야 하는 파티션들만 Consumer들에 할당한다.
하지만 모든 Consumer들의 파티션이 일제히 취소되는 Eager Protocol 기반에서 동작한다.

 

 

(4) Cooperative Sticky 할당 전략

 

Rebalancing 시 기존 토픽들의 파티션과 Consumer 매핑은 최대한 유지하고 재할당되어야 하는 파티션들만 Consumer들에 할당한다.

모든 매핑을 다 취소하지 않고 기존 매핑을 그대로 유지한채 재할당되어야 할 파티션만 Consumer에 따라 순차적으로 Rebalance를 수행한다.

 

5. 오프셋 커밋

컨슈머 그룹은 토픽이 특정 파티션으로부터 데이터를 가져가서 처리하고 이 파티션의 어느 레코드까지 가져갔는지 확인하기 위해 오프셋을 커밋한다.

 

1) __consumer_offsets

커밋한 오프셋은 브로커의 __consumer_offsets 이라는 내부 토픽에 저장하고 __consumer_offsets 토픽은 컨슈머 그룹별로 관리된다.

리밸런싱과 같이 컨슈머가 동일한 컨슈머 그룹으로 새롭게 접속할 시 __consumer_offsets에 있는 offset 정보를 기반으로 메시지를 가져오기 때문에 메시지 중복이나 손실이 발생하지 않는다.

 

Consumer는 poll( ) 메소드를 이용하여 주기적으로 브로커의 토픽 파티션에서 메시지를 가져오고

메시지를 성공적으로 가져 왔으면 commit을 통해서 __consumer_offse 에 다음에 읽을 offset 위치를 기재한다.

 

 

__consumer_offsets 저장

  • 컨슈머 그룹의 컨슈머가 모두 종료되어도 offset 정보는 7일동안 __consumer_offsets에 저장 (offsets.retention.minutes)
  • Topic이 삭제되고 재생성될 경우에는 해당 offset 정보는 0으로 __consumer_offsets에 기록

 

2) auto.offset.reset

auto.offset.reset 은 __consumer_offsets에 컨슈머 그룹이 해당 토픽의 파티션별로 offset 정보를 가지고 있지 않을 때 컨슈머가 접속하면 파티션의 처음 offset 부터(earliest) 가져올 것인지, 마지막 offset 이후 부터 가져올 것인지를 설정하는 파라미터이다.

 

__consumer_offsets 에 오프셋이 저장되어 있을 경우

  • earliest, latest 모두 마지막 오프셋 이 후의 메시지를 읽는다.

 

__consumer_offsets 에 오프셋이 저장되어 있지 않을 경우

  • earliest 는 0번 오프셋 부터 메시지를 읽는다.
  • latest 는 마지막 오프셋 이 후의 메시지를 읽는다

 

3) Auto Commit

Auto Commit 수행 시점

  • poll()
  • close()

Consumer의 파라미터로 auto.enable.commit=true 인 경우 읽어온 메시지를 브로커에 바로 commit 적용하지 않고,
auto.commit.interval.ms 값(기본 5초)마다 Consumer가 자동으로 Commit을 수행한다.

auto.commit.interval.ms 값만큼 지난 후 수행되는 poll()에서 이전 poll()에서 가져온 마지막 메시지의 offset을 commit 한다.

 

auto.commit.interval.ms=5000 인 경우,

첫번째 poll() 에서 5초보다 작은 3초가 소요되면 offset을 commit하지 않는다.

두번째 poll() 에서 5초가 지나면 이전 poll() 의 offset을 commit 한다.

컨슈머가 비정상적으로 종료되면 현재까지 읽은 offset을 commit 하지 못해 메시지 중복 읽기가 발생할 수 있다.

__consumer_offsets 에 커밋된 offset 부터 메시지를 읽기 때문에 partition 0의 offset 2와 partition 1의 offset 1이 중복으로 읽히게 된다.

 

4) Manual Commit

Consumer Property를 enable.auto.commit = false 로 설정하고 명시적으로 동기 / 비동기 방식 오프셋 커밋을 적용할 수 있다.

 

(1) 동기 오프셋 커밋

  • Consumer객체의 commitSync( ) 메소드를 사용
  • 메시지 배치를 poll( )을 통해서 읽어오고 해당 메시지들의 마지막 offset을 브로커에 commit 적용
  • 브로커에 commit 적용이 성공적으로 될 때까지 블로킹 적용
  • Commit 적용 완료 후에 다시 메시지를 읽어옴
  • 브로커에 Commit 적용이 실패할 경우 다시 Commit 적용 요청
  • Commit 적용 재시도가 일정 횟수동안 실패할 경우 CommitFailedException 발생
  • 비동기 방식 대비 더 느린 수행 시간

 

(2) 비동기 오프셋 커밋

  • Consumer객체의 commitAsync( ) 메소드를 사용
  • 메시지 배치를 poll( )을 통해서 읽어오고 해당 메시지들의 마지막 offset을 브로커에 commit 적용 요청하지만 브로커에 commit 적용이 성공적으로 되었음을 기다리지 않고(블로킹 하지 않음) 계속 메시지를 읽어옴
  • Callback(OffsetCommitCallback) 을 통해 Exception 을 처리 
  • 브로커에 Commit 적용이 실패해도 다시 Commit 시도 안함
    (Consumer 장애 또는 Rebalance 시 한번 읽은 메시지를 다시 중복해서 가져 올 수 있음)
  • 동기 방식 대비 더 빠른 수행 시간

 

5) 리밸런스 리스너

Auto와 Manual offset commit 모두 offset 을 커밋하지 못하고 리밸런스가 발생하면 offset 커밋 이 후 읽은 메시지는

리밸런스가 완료되고 중복해서 읽게된다. 이런 문제를 해결하기 위해 리밸런스가 발생하면 현재까지 처리한 메시지를 기준으로 커밋을 시도해야 한다. 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다.

 

public static void main(String[] args) {
	// ...
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    consumer = new KafkaConsumer<>(configs);
    consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            currentOffsets.put(
            	new TopicPartition(record.topic(), record.partition()), 
                new offsetAndMetadata(record + 1, null)
            );
        }
    }
}

public class RebalanceListener implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(currentOffsets);
    }
}
  • onPartitionsAssigned() : 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출
  • onPartitionsRevoked() : 리밸런스가 시작되기 직전에 호출
    가장 마지막 레코드를 기준으로 커밋을 실시한다.

 

6. 특정 파티션만 할당

Consumer에게 여러 개의 파티션이 있는 Topic에서 특정 파티션만 할당할 수 있다.

배치 처리시 특정 key레벨의 파티션을 특정 Consumer에 할당하여 처리할 경우에 적용한다.

TopicPartition topicPartition = new TopicPartition(topicName, 0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

 

특정 파티션의 특정 오프셋부터 메시지를 읽을 수도 있다.

특정 메시지가 누락되었을 경우 해당 메시지를 다시 읽어 오기 위해 유지 보수 차원에서 일반적으로 사용한다.

TopicPartition topicPartition = new TopicPartition(topicName, 1);
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 6L);

 

7. 컨슈머의 안전한 종료

컨슈머 애플리케이션이 비정상적으로 종료되면 세션 타임아웃이 발생할때까지 컨슈머가 컨슈머 그룹에 남게되기 때문에 더는 동작을 하지 않는 컨슈머가 존재하기 때문에 파티션의 데이터는 소모되지 못하고 컨슈머 랙이 늘어나게 된다.

 

KafkaConsumer.wakeup()

public static void main(String[] args) {
    Runtime.getRuntime().addShutdownHook(new ShutdownThread());

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // ...
            consumer.commitSync();
        }
    } catch (WakeupException e) {
        logger.warn("Wakeup consumer");
    } finally {
        logger.warn("Consumer close");
        consumer.close();
    }
}

static class ShutdownThread extends Thread {
    public void run() {
        logger.info("Shutdown hook");
        consumer.wakeup();
    }
}

셧다운 훅은 사용자 또는 운영체제로부터 종료 요청을 받으면 실행되는 스레드를 뜻한다.

컨슈머 애플리케이션이 종료되면 셧다운 훅을 사용해 wakeup() 메서드를 호출한다. wakeup() 메서드가 실행된 이후 poll() 메서드가 호출되면 WakeupException 예외가 발생하는데 WakeupException 예외처리에서 사용한 자원들을 해제하고 마지막으로 close() 메서드로 카프카 클러스터에게 컨슈머의 종료를 전달해야 한다.

 

8. 멀티 스레드 컨슈머

파티션을 여러개로 운영하는 경우 데이터를 병렬처리하기 위해서 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 방법이다. 그러므로 컨슈머는 멀티 프로세스 방식이나 멀티 스레드 방식을 선택해 개발할 수 있다.

자바는 멀티 스레드를 지원하므로 멀티 컨슈머 스레드를 개발할 수 있는데 멀티 스레드로 컨슈머를 운영하기 위해서는 각 컨슈머 스레드 간에 영향이 미치지 않도록 스레드 세이프 로직, 변수를 적용해야 한다.

 

1) 컨슈머 멀티 워크 스레드 전략

컨슈머 멀티 워크 스레드 전략은 컨슈머 스레드는 1개만 실행하고 데이터 처리를 담당하는 워커 스레드를 여러개 실행하는 전략이다.

 

ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
    for (ConsumerRecord<String, String> record : records) {
        ConsumerWorker worker = new ConsumerWorker(record.value());
        executorService.execute(worker);
    }
}

public class ConsumerWorker implements Runnable {
    private String recordValue;

    ConsumerWorker(String recordValue) {
        this.recordValue = recordValue;
    }

    @Override
    public void run() {
        // record 처리
    }
}
  • ConsumerWorker 클래스는 Runnable 인터페이스를 구현해 스레드로 실행된다.
  • ExecutorService 자바 라이브러리를 사용하면 레코드를 병렬처리하는 스레드를 효율적으로 생성하고 관리할 수 있다.
  • Executors 를 사용해 CashedThreadPool을 사용하면 레코드 처리가 완료되면 스레드는 종료된다.

 

컨슈머 멀티 워크 스레드 전략 문제점

데이터 유실

각 레코드의 데이터 처리가 끝났음을 리턴받지 않고 다음 poll() 메서드를 호출하기 때문에 데이터 처리가 스레드에서 진행중임에도 다음 poll() 메서드를 호출해 커밋을 하게되면 데이터의 유실이 발생할 수 있다.

 

레코드 처리 역전

스레드의 생성은 순서대로 진행되지만 나중에 생성된 스레드의 레코드 처리시간이 더 짧을 경우 이전 레코드가 다음 레코드보다 나중에 처리될 수 있다.

 

 

2) 컨슈머 멀티 스레드 전략

컨슈머 멀티 스레드 전략은 파티션의 개수만큼 컨슈머 스레드를 늘려서 운영하는 전략이다.

데이터 유실의 가능성이 작고 파티션 별로 컨슈머에서 레코드를 처리하므로 메시지 키를 갖는 경우 레코드 처리 역전이 발생하지 않는다.

private final static int CONSUMER_COUNT = 3;

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
    ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
    executorService.execute(worker);
}

public class ConsumerWorker implements Runnable {
    private final Properties prop;
    private final String topic;
    private final String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("{}", record);
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

 

9. 컨슈머 랙

컨슈머 랙은 토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이다.

컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다.

컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 필수적으로 모니터링하고 컨슈머 랙이 늘어나 지연이 발생하면 일시적으로 파티션 개수와 컨슈머 개수를 늘리는 등의 처리를 할 수 있다.

 

컨슈머 랙 모니터링

1) 카프카 명령어

bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 \
--group group_01 --describe

 

2) 컨슈머 애플리케이션

kafkaConsumer.metrics()

  • record-lag-max
  • record-lag
  • record-lag-avg

 

3) 외부 모니터링 툴

  • Datadog
  • 컨플루언트 컨트롤 센터
  • 카프카 버로우

 

 

10. 스프링 카프카 컨슈머

1) 메시지 리스너

스프링 카프카에서는 리스너 컨테이너를 사용해 컨슈머를 2개의 타입으로 래핑하였다.

  • 레코드 리스너(MessageListener) : 1개의 레코드를 처리
  • 배치 리스너(BatchMessageListener) : 한 번에 여러개의 레코드들을 처리

 

컨슈머 타입별 리스너

타입 리스너 설명
RECORD MessageListener Record 인스턴스 단위로 프로세싱.
오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
Acknowledging
MessageListener
Record 인스턴스 단위로 프로세싱.
매뉴얼 커밋을 사용하는 경우
ConsumerAware
MessageListener
Record 인스턴스 단위로 프로세싱.
컨슈머 객체를 활용하고 싶은 경우
AcknowledgingConsumerAware
MessageListener
Record 인스턴스 단위로 프로세싱.
매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우
BATCH BatchMessageListener Records 인스턴스 단위로 프로세싱.
오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
BatchAcknowledging
MessageListener
Records 인스턴스 단위로 프로세싱.
매뉴얼 커밋을 사용하는 경우
BatchConsumerAware
MessageListener
Records 인스턴스 단위로 프로세싱.
컨슈머 객체를 활용하고 싶은 경우
BatchAcknowledgingConsumer
MessageListener
Records 인스턴스 단위로 프로세싱.
매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우

매뉴얼 커밋을 사용할 경우 Ackowledging이 붙은 리스너를 사용하고,

KafkaConsumer 인스턴스에 직접 접근하여 컨트롤하고 싶다면 ConsumerAware가 붙은 리스너를 사용하면된다.

리스너별 생성 메서드 파라미터

 

2) AcksMode

카프카 클라이언트에서는 3가지로 나뉘지만 스프링 카프카에서는 커밋의 종류를 7가지로 세분화 하고 로직을 만들어 놓았다.

AckMode의 기본값은 BATCH이고 컨슈머의 enable.auto.commit 옵션은 false로 지정된다.

 

AcksMode 종류

AcksMode 설명
RECORD 레코드 단위로 프로세싱 이후 커밋
BATCH poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋. 기본값.
TIME 특정 시간 이후에 커밋
시간 간격을 선언하는 AckTime 옵션을 설정해야 한다.
COUNT 특정 개수만큼 레코드가 처리된 이후에 커밋
레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다.
COUNT_TIME TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋
MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 한다.
매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 작동한다.
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 한다.
MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다.
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 한다.

 

3) 기본 리스너 컨테이너

(1) 레코드 리스너(MessageListener)

spring:
  kafka:
    consumer:
      bootstrap-servers: my-kafka:9092
    listener:
      type: RECORD

리스너 타입을 RECORD로 설정한다.

 

● 기본 리스너

poll()이 호출되어 가져온 레코드들은 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다.

@KafkaListener(topics = "test",
        groupId = "test-group-00")
public void recordListener(ConsumerRecord<String,String> record) {
    logger.info(record.toString());
}

 

● 메시지 값을 파라미터로 받는 리스너

기본값인 StringDeserializer 를 사용해 String 클래스로 메시지 값을 전달받는다.

@KafkaListener(topics = "test",
        groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
    logger.info(messageValue);
}

 

개별 리스너에 컨슈머 옵션값을 부여

@KafkaListener(topics = "test",
        groupId = "test-group-02", properties = {
        "max.poll.interval.ms:60000",
        "auto.offset.reset:earliest"
})
public void singleTopicWithPropertiesListener(String messageValue) {
    logger.info(messageValue);
}

 

멀티 스레드 리스너

concurrency 옵션값에 해당하는 만큼 컨슈머 스레드를 만들어서 병렬처리한다.

@KafkaListener(topics = "test",
        groupId = "test-group-03",
        concurrency = "3")
public void concurrentTopicListener(String messageValue) {
    logger.info(messageValue);
}

 

특정 토픽의 특정 파티션만 구독

@KafkaListener(topicPartitions =
        {
                @TopicPartition(topic = "test01", partitions = {"0", "1"}),
                @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
        },
        groupId = "test-group-04")
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
    logger.info(record.toString());
}

 


 

(2) 배치 리스너(BatchMessageListener)

spring:
  kafka:
    consumer:
      bootstrap-servers: my-kafka:9092
    listener:
      type: BATCH

 

기본 리스너

컨슈머 레코드의 묶음을 파라미터로 받는다.

카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턷받아 사용하는 것과 동일하다.

@KafkaListener(topics = "test",
        groupId = "test-group-01")
public void batchListener(ConsumerRecords<String, String> records) {
    records.forEach(record -> logger.info(record.toString()));
}

 

메시지 값의 묶음을 파라미터로 받는 리스너

@KafkaListener(topics = "test",
        groupId = "test-group-02")
public void batchListener(List<String> list) {
    list.forEach(recordValue -> logger.info(recordValue));
}

 

멀티 스레드 리스너

concurrency 옵션값에 해당하는 만큼 컨슈머 스레드를 만들어서 병렬처리한다.

@KafkaListener(topics = "test",
        groupId = "test-group-03",
        concurrency = "3")
public void concurrentBatchListener(ConsumerRecords<String, String> records) {
    records.forEach(record -> logger.info(record.toString()));
}

 


 

(3) 배치 커밋 컨슈머 리스너(BatchAcknowledgingConsumerMessageListener)

배치 컨슈머 리스너는 컨슈머 인스턴스를 파라미터로 받아 동기, 비동기 커밋을 사용할 수 있고,

배치 커밋 리스너는 Acknoledgment 인스턴스를 파라미터로 받아 AckMode 를 사용할 수 있다.

spring:
  kafka:
    consumer:
      bootstrap-servers: my-kafka:9092
    listener:
      type: BATCH
      ack-mode: MANUAL_IMMEDIATE

 

배치 커밋 리스너

AckMode를 MANUAL 또는 MANUAL_IMMEDIATE로 사용할 경우 수동 커밋을 하기 위해 파라미터로 Acknowledgment 인스턴스를 받아야 한다. acknowledge() 메서드를 호출함으로써 커밋을 수행할 수 있다.

@KafkaListener(topics = "test", groupId = "test-group-01")
public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
    records.forEach(record -> logger.info(record.toString()));
    ack.acknowledge();
}

 

배치 컨슈머 리스너

동기, 비동기 커밋을 사용하고 싶다면 컨슈머 인스턴스를 파라미터로 받아서 사용할 수 있다.

consumer 인스턴스의 conmmitSync(), conmmitAsync() 메서드를 호출하면 사용자가 원하는 타이밍에 커밋할 수 있다.

@KafkaListener(topics = "test", groupId = "test-group-02")
public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
    records.forEach(record -> logger.info(record.toString()));
    consumer.commitSync();
}

 

4) 커스텀 리스너 컨테이너

서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서는 커스텀 리스너 컨테이너를 사용해야 한다. 카프카 리스너 컨테이너 팩토리(KafkaListenerContainerFactory) 인스턴스를 생성해야 한다.

 

@Configuration
public class ListenerContainerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsLost(Collection<TopicPartition> partitions) {
            }
        });
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.setConsumerFactory(cf);
        return factory;
    }
}

@KafkaListener(topics = "test",
        groupId = "test-group",
        containerFactory = "customContainerFactory")
public void customListener(String data) {
    logger.info(data);
}
  • DefaultKafkaConsumerFactory로 리스너 컨테이너 팩토리를 생성할 때 컨슈머 기본 옵션을 설정한다.
  • ConcurrentKafkaListenerContainerFactory로 concurrency 옵션을 설정할 수 있는 리스너를 만들 때 사용하는 리스너 컨테이너를 만든다.
  • setConsumerRebalanceListener() 메서드를 호출해 리밸런스 리스너를 만든다.
  • setBatchListener() 메서드에 파라미터로 false를 넣어 리코드 리스너임을 명시한다.
  • setAckMode() 메서드에 파라미터로 RECORD를 넣어 레코드 단위 커밋을 설정한다.

 

 

 

 

 

 

 

참고

아파치 카프카 애플리케이션 프로그래밍

카프카 완벽 가이드 - 코어편

728x90