티스토리 뷰

Kafka

[Kafka] 3. 카프카 프로듀서

mandykr 2023. 2. 2. 11:03

목차

1. 프로듀서 Client

2. Serializer

3. Partitioner

4. Accumulator

5. Sender

6. 멱등성(idempotence) 프로듀서

7. 트랜잭션 프로듀서

8. 스프링 카프카 프로듀서

 

 

 

 

1. 프로듀서 Client

카프카 프로듀서는 브로커로 데이터를 전송할 때 내부적으로 직렬화, 파티셔너, 배치 생성 단계를 거친다.

 

기본 프로듀서 Client API 처리 로직

// 1. Producer 환경 설정(Properties 객체를 이용)
String topicName = "simple-topic";
Properties props  = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 2. 1에서 설정한 환경 설정값을 반영하여 KafkaProducer 객체 생성.
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

// 3. 토픽명과 메시지 값(Key, Value) 을 입력하여 보낼 메시지인 ProducerRecord 객체 생성
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello world");

// 4. KafkaProducer객체의 send( )메소드를 호출하여 ProducerRecord 전송
kafkaProducer.send(producerRecord);
kafkaProducer.flush();

// 5. KafkaProducer객체의 close( )메소드를 호출하여 종료
kafkaProducer.close();

 

2. Serializer

카프카 프로듀서는 Serializer를 통해 데이터를 Byte Array 형태로 전송하고 파티션에서도 Byte Array 형태로 저장한다.

자바 객체(Object)도 Serialization을 통해서 바이트 배열(바이트 스트림) 형태로 저장하기 때문에 객체의 유형, 데이터의 포맷, 적용 시스템에 상관없이 이동/저장/복원이 자유롭다.

카프카 컨슈머에서는 Byte Array를 받아서 DeSerialization 과정을 거쳐 원본 객체로 변환한다.

 

카프카 프로듀서 클라이언트에서 메시지를 Serialization 하는데 활용할 Serializer 를 명시해줘야 한다.

Properties props  = new Properties();
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()

 

카프카 클라이언트가 제공하는 기본 Serializer

  • StringSerializer
  • ShortSerializer
  • IntegerSerializer
  • LongSerializer
  • DoubleSerializer
  • BytesSerializer

 

3. Partitioner

파티셔너는 ProducerRecord를 토픽의 어느 파티션으로 전송할 지 결정한다.

기본 파티셔너는 DefaultPartitioner 이고 프로듀서 API는 RoundRobinPartitioner, UniformStickyPartitioner를 제공한다.

Partitioner 인터페이스를 상속받은 사용자 정의 클래스를 만들 수 있다.

 

메시지 키

메시지는 Partitioner를 통해 토픽의 어떤 파티션으로 전송되어야 할 지 미리 결정 되는데,

Key값을 가지지 않는 경우 라운드 로빈(Round Robin), 스티키 파티션(Sticky Partition)등의 파티션 전략등이 선택되어 파티션 별로 메시지가 전송된다.

카프카는 하나의 파티션 내에서만 메시지 순서를 보장하기 때문에 서로 다른 파티션에 저장된 메시지는 순서를 보장받지 못한채 컨슈머에 의해 읽혀질 수 있다.

 

메시지 순서를 보장하기 위해서는 메시지 키를 사용해야 한다.

같은 키를 가지는 메시지는 같은 파티션으로 파티셔닝되기 때문에 메시지 순서가 보장된다.

또한, 하나의 파티션에 서로 다른 키를 가지는 메시지가 저장될 수 있다.

# 키를 가지는 메시지 전송
$ bin/kafka-console-producer.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --topic topic-p2r3 \
--property key.separator=: --property parse.key=true
>K1:V1
>K2:V1
>K1:V2
>K2:V2
>K3:V1
# 키를 가지는 메시지 컨슘
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --topic topic-p2r3 \
--property print.key=true --property print.value=true \
--property print.partition=true

Partition:0    K1    V1
Partition:1    K2    V1
Partition:0    K1    V2
Partition:1    K2    V2
Partition:0    K3    V1

 

 

RoundRobinPartitioner

  • 카프카 클라이언트 2.4.0 이전 버전에서 기본 파티셔너
  • 메시지 키가 있으면 키의 해시값과 파티션을 매칭
  • ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송
  • 데이터가 배치로 묶이는 빈도가 적다.

 

UniformStickyPartitioner

  • 카프카 클라이언트 2.5.0 버전에서 기본 파티셔너
  • 메시지 키가 있으면 키의 해시값과 파티션을 매칭
  • 특정 파티션으로 전송되는 하나의 배치에 메시지를 빠르게 먼저 채워서 보내는 방식
  • 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 전송
  • RoundRobinPartitioner에 비해 향상된 성능을 가진다.

 

4. Accumulator

Record Accumulator는 Partitioner에 의해서 메시지 배치가 전송이 될 토픽과 Partition에 따라 저장되는 KafkaProducer 메모리 영역이다.

 

KafkaProducer 객체의 send( ) 메소드는 호출 시마다 하나의 ProducerRecord를 입력하지만 바로 전송 되지 않고 Accumulator에 단일 메시지를 토픽 파티션에 따라 Record Batch 단위로 묶인 뒤 전송된다.

메시지들은 Accumulator에 여러 개의 Batch들로 buffer.memory 설정 사이즈 만큼 보관될 수 있으며 메시지 전송은 Sender Thread에 의해 여러 개의 Batch들로 한꺼번에 전송할 수 있다.

Sender Thread는 Accumulator에 누적된 메시지 배치를 꺼내서 브로커로 전송한다.

1개의 Batch를 가져갈수도, 여러 개의 Batch를 가져 갈 수도 있다.

 

  • buffer.memory : Record accumulator의 전체 메모리 사이즈
  • linger.ms : Sender Thread로 메시지를 보내 기전 배치로 메시지를 만들어서 보내기 위한 최대 대기 시간
  • batch.size : 단일 배치의 사이즈

전반적인 Producer와 Broker간의 전송이 느리다면 linger.ms를 0보다 크게 설정하여 Sender Thread가 하나의 Record Batch를 가져갈 때 일정 시간 대기하여 Record Batch에 메시지를 보다 많이 채울 수 있도록 적용할 수 있다.

linger.ms는 보통 20ms 이하로 설정을 권장한다. 기본값은 0.

props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // 배치 사이즈 설정
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // linger ms 설정

 

5. Sender

Producer Client의 Main Thread가 send( ) 메소드를 호출하면 메시지 전송을 시작하지만 바로 전송되지 않으며 내부 Buffer에 메시지를 저장 후에 별도의 Thread가 Kafka Broker에 실제 전송을 하는 방식이다.

 

1) 동기 / 비동기 메시지 전송

(1) 동기 메시지 전송

Future<RecordMetaData> = KafkaProducer.send() 기본적으로 비동기 호출인데,

Future객체의 get( )을 호출하여 브로커로 부터 메시지 Ack 응답을 받을 때 까지 Main Thread를 대기 시키는 방식 으로 동기화를 구성 할 수 있다.

try {
    RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
    logger.info("\n ###### record metadata received ##### \n" +
            "partition:" + recordMetadata.partition() +"\n" +
            "offset:" + recordMetadata.offset() + "\n" +
            "timestamp:" + recordMetadata.timestamp());
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    kafkaProducer.close();
}

동기 메시지 전송은 Record Accumulator 의 배치에 메시지를 채우고 Acks 를 기다리기 때문에 linger.ms 설정값 만큼 대기한 후 Sender Thread 가 메시지를 카프카 브로커로 전송하고 Acks 를 받은 후 RecordMetadata 를 받는다.

따라서 배치에 담기는 메시지는 항상 1개이기 때문에 메시지 배치처리가 불가능하다.

 

 

(2) 비동기 메시지 전송

Main Thread 에서 콜백 메시지를 전송할 때 콜백 객체를 함께 보내고,

Sender 에서 카프카 클러스터의 응답을 받아 콜백 객체에 선언된 메소드를 실행한다.

Main Thread 에서는 카프카 클러스터 응답에 대한 Exception 이나 RecordMetadata 를 확인한다.

kafkaProducer.send(producerRecord, (metadata, exception)-> {
    if (exception == null) {
        logger.info("\n ###### record metadata received ##### \n" +
                "partition:" + metadata.partition() + "\n" +
                "offset:" + metadata.offset() + "\n" +
                "timestamp:" + metadata.timestamp());
    } else {
        logger.error("exception error from broker " + exception.getMessage());
    }
});

 

2) acks 옵션

acks 옵션으로 프로듀서가 메시지를 전송하고 다음 메시지를 전송하기 전에 리더 파티션으로부터 응답을 기다릴지에 대해 설정할 수 있다.

기본값은 -1이다.

props.setProperty(ProducerConfig.ACKS_CONFIG, "0");

 

 

(1) acks = 0

리더 브로커가 메시지를 정상적으로 받았는지에 대한 Ack 메시지를 받지 않고 다음 메시지를 바로 전송

메시지 손실의 우려가 가장 크지만 가장 빠르게 전송할 수 있음

 

(2) acks = 1 

리더 브로커가 메시지를 정상적으로 받았는지에 대한 Ack 메시지를 받은 후에 다음 메시지를 바로 전송

메시지를 복제 중에 다운될 경우 다음 리더가 될 브로커에는 메시지가 없을 수 있기 때문에 메시지를 소실할 우려가 있음

 

(3) acks = all, -1

min.insync.replicas 개수 만큼의 Replicator에 복제를 수행한 뒤에 리더 브로커가 보내는 Ack 를 받은 후 다음 메시지를 바로 전송. 만약 오류 메시지를 브로커로 부터 받으면 메시지를 재전송

 

3) 메시지 전송/재 전송 시간 파라미터

 

  • max.block.ms : Send( ) 호출 시 Record Accumulator에 입력하지 못하고 block되는 최대 시간.
    초과시 Timeout Exception
  • linger.ms : Sender Thread가 Record Accumulator에서 배치별로 가져가기 위한 최대 대기시간.
  • request.timeout.ms : 전송에 걸리는 최대 시간. 전송 재 시도 대기시간 제외.
    초과시 retry를 하거나 Timout Exception 발생
  • retry.backoff.ms : 전송 재 시도를 위한 대기 시간.
    retries 는 재 전송 횟수를 설정
  • delivery.timeout.ms : Producer 메시지(배치) 전송에 허용된 최대 시간.
    ( delivery.timeout.ms >= linger.ms + request.timeout.ms )
    초과시 Timeout Exception

예) retries=10, request.timeout.ms=10000ms, retry.backoff.ms=30 인 경우,
request.timeout.ms 만큼 기다린후 재 전송하기전 30ms 이후 재전송 시도

이와 같은 방식으로 재 전송을 10회 시도하고 더 이상 retry 시도 하지 않음

 

  • max.in.flight.requests.per.connection : 비동기 전송 시 브로커의 응답없이 한꺼번에 보낼 수 있는 Batch의 개수.
    기본값은 5. ( Kafka Producer의 메시지 전송 단위는 Batch 이다. )

 

 

6. 멱등성(idempotence) 프로듀서

Producer는 브로커로 부터 ACK를 받은 다음에 다음 메시지를 전송하는데, Producer ID와 메시지 Sequence를 Header에 저장 하여 전송한다.

브로커는 Producer가 보낸 메시지의 Sequence가 브로커가 가지고 있는 메시지의 Sequence보다 1만큼 큰 경우에만 브로커에 저장하고 브로커에서 메시지 Sequece가 중복 될 경우 이를 메시지 로그에 기록하지 않고 Ack만 전송한다.

 

 

Idempotence 를 위한 Producer 설정

  • enable.idempotence = true
  • acks = all
  • retries는 0 보다 큰 값
  • max.in.flight.requests.per.connection은 1에서 5사이 값

Kafka 3.0 버전 부터는 Producer의 기본 설정이 Idempotence 이다.

acks=1로 설정하는 등 다른 파라미터를 변경하면 Idempotence 로 동작하지 않지만 메시지는 전송된다.

enable.idempotence = true 를 명시적으로 설정하고 다른 파라미터를 잘못 설정하면 Config 오류가
발생한다.

 

시퀀스 넘버가 일정하지 않은 경우에는 OutOfOrderSequenceExceptioin 이 발생한다.

(예: 시퀀스 넘버 0 다음에 1이 아닌 2가 오는 경우)

 

 

7. 트랜잭션 프로듀서

트랜잭션 프로듀서는 다수의 파티션에 데이터를 저장할 경우 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용된다.

 

 

트랜잭션 프로듀서를 위한 Producer 설정

  • enable.idempotence = true
  • transactionl.id 를 임의의 String 값으로 설정
  • 컨슈머의 isolation.level = read_committed

트랜잭션 프로듀서는 사용자가 보낸 데이터를 레코드로 파티션에 저장할 뿐만 아니라 트랜잭션의 시작과 끝을 표현하기 위해 트랜잭션 레코드를 한 개 더 보낸다. (트랜잭션 레코드는 오프셋을 한 개 차지한다.)

트랜잭션 컨슈머는 파티션에 저장된 트랜잭션 레코드를 보고 트랜잭션이 완료되었음을 확인하고 데이터를 가져간다.

 

 

8. 스프링 카프카 프로듀서

 

스프링 카프카 라이브러리 디펜던시 추가

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
}

 

 

스프링 카프카 프로듀서는 Kafka Template 이라는 클래스를 사용해 데이터를 전송할 수 있다.

 

1) 기본 카프카 템플릿

application.properties 에 프로듀서 옵션을 설정할 수 있다.

별도로 선언하지 않으면 기본값으로 설정된다.

spring.kafka.producer.acks
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
spring.kafka.producer.buffer-memory
spring.kafka.producer.client-id
spring.kafka.producer.compression-type
spring.kafka.producer.key-serialzier
spring.kafka.producer.properties.*
spring.kafka.producer.retries
spring.kafka.producer.transaction-id-prefix

 

메시지 전송

private static String TOPIC_NAME = "test";

@Autowired
private KafkaTemplate<Integer, String> template;

template.send(TOPIC_NAME, "test");

KafkaTemplate 객체를 스프링 빈에서 주입받아 사용한다.

send() 메서드를 사용해서 토픽 이름과 케시지 값을 넣어 전송한다.

 

 

2) 커스텀 카프카 템플릿

커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용할 수 있다.

@Configuration
public class KafkaTemplateConfiguration {

    @Bean
    public KafkaTemplate<String, String> customKafkaTemplate() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(pf);
    }
}

커스텀 프로듀서는 필수 옵션(bootstrap.servers, key.serializer, value.serializer)을 포함한 옵션들을 선언해야 한다.

ProducerFactory 를 property 로 초기화 하고 KafkaTemplate을 빈으로 등록한다.

 

메시지 전송

@Autowired
private KafkaTemplate<String, String> customKafkaTemplate;

ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
future.addCallback(new KafkaSendCallback<String, String>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
    }

    @Override
    public void onFailure(KafkaProducerException ex) {
    }
});

ListenableFuture 인스턴스에 addCallback 함수를 붙여 프로듀서가 보낸 데이터의 브로커 적재여부를 비동기로 확인할 수 있다. 브로커에 정상 적재되었다면 onSuccess 메서드가, 적재되지 않고 이슈가 발생했다면 onFailure 메서드가 호출된다.

 

 

 

 

 

참고

아파치 카프카 애플리케이션 프로그래밍

카프카 완벽 가이드 - 코어편

 

 

728x90