티스토리 뷰

 

 

도메인 이벤트를 발생시키는 프로듀서와 이벤트를 가져와 데이터를 변경하는 컨슈머를 구성한다.

메시지 중복 저장을 방지하는 멱등성 프로듀서를 구성하고 카프카 클러스터는 3개의 브로커를 가지는 멀티 노드 클러스터로 만들어 가용성을 높인다. 컨슈머는 브로커 파티션의 갯수만큼 멀티 스레드로 구성해 데이터를 병렬로 처리하도록 한다. 

 

 

1. 개발 환경

  • java 11
  • spring boot 2.7.x
  • gradle
  • confluent kafka 3.x
  • oracle VM

 

2. 카프카 환경 설정

confluent-kafka 설치

$ wget https://packages.confluent.io/archive/7.2/confluent-community-7.2.1.tar.gz
$ tar xvf confluent-community-7.2.1.tar.gz

 

힙 메모리,  path 설정

$ mv confluent-7.2.1 confluent

$ vi ~/.bashrc
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
export CONFLUENT_HOME=/home/ec2-user/confluent
export PATH=.:$PATH:$CONFLUENT_HOME/bin

$ source ~/.bashrc
$ echo $KAFKA_HEAP_OPTS
$ echo $CONFLUENT_HOME

 

멀티 노드 설정

[zookeeper]
$ vi etc/kafka/zookeeper.properties
dataDir=/home/mandy/data/zookeeper-logs

$ mkdir /home/mandy/data/zookeeper-logs

 

kafka broker 실행 옵션 설정

[kafka-x]
$ vi etc/kafka/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-x-IP:9092
log.dirs=/home/mandy/data/kafka-logs
zookeeper.connect=zookeeper:2181

$ mkdir -p /home/mandy/data/kafka-logs

 

schema registry 실행 옵션 설정

$ vi etc/schema-registry/schema-registry.properties

 

zookeeper, broker, schema registry 구동

[zookeeper]
$ zookeeper-server-start etc/kafka/zookeeper.properties

[kafka-x]
$ kafka-server-start etc/kafka/server_0.properties
$ kafka-server-start etc/kafka/server_1.properties
$ kafka-server-start etc/kafka/server_2.properties

[zookeeper]
$ schema-registry-start etc/schema-registry/schema-registry.properties

 

 

파일 시스템 구조

home
├── mandy
│   ├── data
│   │   ├── zookeeper-logs
│   │   ├── kafka-logs
│   ├── confluent
│   │   ├── bin
│   │   │   ├── kafka-configs
│   │   │   ├── kafka-console-consumer
│   │   │   ├── kafka-console-producer
│   │   │   ├── kafka-server-start
│   │   │   ├── kafka-topics
│   │   │   ├── zookeeper-server-start
│   │   │   ├── schema-registry-start
│   │   │   ├── ksql-server-start
│   │   ├── etc
│   │   │   ├── kafka
│   │   │   │   ├── server.properties
│   │   │   │   └── zookeeper.properties
│   │   ├── lib
│   │   ├── LICENSE
│   │   ├── logs
│   │   │   ├── server.log

 

 

3. 개발

1) 토픽

 

토픽 생성

$ kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic papaco.review \
--partitions 3 \
--replication-factor 3
--config min.insync.replicas=2

파티션 개수

● partitions 3

메시지 키를 사용해 데이터 처리 순서를 지켜야 하기 때문에 토픽을 생성할 때 파티션을 엄격하게 결정한다.

프로듀서가 기본 파티셔너를 사용할 때 파티션 개수가 달라지면 이미 매칭된 파티션과 메시지 키의 매칭이 깨지고 전혀 다른 파티션에 데이터가 할당될 수 있기 때문이다.

 

복제 개수

● replication-factor 3

복제 개수가 높으면 높을수록 데이터의 복구 확률이 높아지지만 복제 개수가 너무 높으면 팔로워 파티션이 데이터를 복제하는 데에 시간이 오래 걸릴 수 있으면 클러스터 전체를 봤을 때 저장되는 데이터의 용량도 그만큼 더 늘어난다.

클러스터의 브로커 1대에 이슈가 발생했을 경우에도 안정적으로 데이터를 받기 위해 최소 2 이상으로 설정한다.

 

최소 동기화 리플리카

● min.insync.replicas=2

ISR의 2개 이상의 파티션에 적재되었음을 확인한다. 리더 파티션과 1개의 팔로워 파티션에 데이터가 정상적으로 적재되었음을 보장한다. min.insync.replicas 옵션은 프로듀서의 acks를 all로 설정한 경우에만 유효하다. 

브로커의 복제 개수와 동일하게 설정하면 클러스터의 브로커 1대만이라도 이슈가 발생하여 동작하지 못하는 경우 최소 파티션의 개수가 부족하기 때문에 프로듀서가 데이터를 토픽에 전송할 수 없게 된다.

 

토픽 확인

$ kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name papaco.review --describe --all
All configs for topic papaco.review are:
  compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer}
  leader.replication.throttled.replicas= sensitive=false synonyms={}
  message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true}
  min.insync.replicas=2 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
  segment.jitter.ms=0 sensitive=false synonyms={}
  cleanup.policy=delete sensitive=false synonyms={DEFAULT_CONFIG:log.cleanup.policy=delete}
  flush.ms=9223372036854775807 sensitive=false synonyms={}
  follower.replication.throttled.replicas= sensitive=false synonyms={}
  segment.bytes=1073741824 sensitive=false synonyms={STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=604800000 sensitive=false synonyms={}
  flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
  message.format.version=3.0-IV1 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=3.0-IV1}
  max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807}
  file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000}
  max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588}
  min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
  message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime}
  preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false}
  min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
  index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096}
  unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
  retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1}
  delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
  segment.ms=604800000 sensitive=false synonyms={}
  message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807}
  segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}

리더 파티션 선출

● unclean.leader.election.enable=false

리더 파티션의 브로커에 장애가 발생한 경우, ISR이 아닌 팔로워 파티션을 리더 파티션으로 선출하지 않고 리더 파티션이 존재하는 브로커가 다시 실행될 때까지 기다린다.

그동안 서비스가 중단되지만 데이터의 유실은 발생하지 않는다.

 

 

2) 프로듀서

 

(1) 프로듀서 설정

application.properties

spring.kafka.producer.bootstrap-servers=192.168.56.101:9092
spring.kafka.producer.properties.schema.registry.url=http://192.168.56.101:8081
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.properties.enable.idempotence=true

 

ProducerConfig

(org.apache.kafka.clients.producer.ProducerConfig)

ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [192.168.56.101:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer

 

acks 옵션

● ack=all

ack 를 all로 설정하면 min.insync.replicas 개수 만큼의 Replicator에 복제를 수행한 뒤에 리더 브로커가 보내는 Ack 를 받은 후 다음 메시지를 전송한다. 1이나 0에 비해 프로듀서가 클러스터로 데이터를 저장하는 데에 시간이 오래 걸리지만 클러스터 또는 네트워크에 이상이 생겼을 경우 복구할 확률이 가장 높다. 

 

파티셔너

● partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner

DefaultPartitioner는 키를 가지는 메시지의 경우 키 값을 Hashing하여 키 값에 따라 파티션 별로 균일하게 전송한다.

파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터에 데이터를 버퍼로 쌓아놓고 발송한다. 버퍼로 쌓인 데이터는 배치로 묶어서 전송함으로써 프로듀서 처리량을 향상시킨다.

 

어큐뮬레이터

● buffer.memory = 33554432

● linger.ms = 0

● batch.size = 16384

전반적인 Producer와 Broker간의 전송이 느리다면 linger.ms를 0보다 크게 설정하여 Sender Thread가 하나의 Record Batch를 가져갈 때 일정 시간 대기하여 Record Batch에 메시지를 보다 많이 채울 수 있도록 적용할 수 있다.

linger.ms는 보통 20ms 이하로 설정을 권장한다.

 

재시도

● retries = 2147483647

● request.timeout.ms = 30000

● linger.ms = 0

● retry.backoff.ms = 100

● delivery.timeout.ms = 120000

Sender Thread가 0ms(linger.ms)만큼 대기한 후에 Record Accumulator에서 레코드를 배치별로 가져간다.

Sender Thread는 배치를 브로커로 전송하고 30000ms(request.timeout.ms)만큼 대기한 후 

리더 브로커로부터 에러 메시지를 받으면 100ms(retry.backoff.ms) 후에 재전송을 시도한다.

이와 같은 방식으로 재전송을 2147483647회(retries) 시도하고 더 이상 retry 시도 하지 않는다.

 

멱등성 프로듀서

● enable.idempotence=true

동일한 데이터를 여러번 전송하더라도 카프카 클러스터에 단 한번만 저장되도록 한다.(정확히 한번 전달)

멱등성 프로듀서는 기본 프로듀서와 달리 데이터를 브로커로 전달할 때 프로듀서 PID와 시퀀스 넘버를 함께 전달한다. 그러면 브로커는 프로듀서의 PID와 시퀀스 넘버를 확인하여 동이한 메시지의 적재 요청이 오더라도 단 한번만 데이터를 적재한다.

 

 

(2) 데이터 포맷

 

Avro

● value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer

 

KafkaAvroSerializerConfig

(io.confluent.kafka.serializers.KafkaAvroSerializerConfig)

KafkaAvroSerializerConfig values: 
	auto.register.schemas = true
	basic.auth.credentials.source = URL
	basic.auth.user.info = [hidden]
	bearer.auth.credentials.source = STATIC_TOKEN
	bearer.auth.token = [hidden]
	key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
	latest.compatibility.strict = true
	max.schemas.per.subject = 1000
	proxy.host = 
	proxy.port = -1
	schema.reflection = false
	schema.registry.basic.auth.user.info = [hidden]
	schema.registry.ssl.cipher.suites = null
	schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	schema.registry.ssl.endpoint.identification.algorithm = https
	schema.registry.ssl.engine.factory.class = null
	schema.registry.ssl.key.password = null
	schema.registry.ssl.keymanager.algorithm = SunX509
	schema.registry.ssl.keystore.certificate.chain = null
	schema.registry.ssl.keystore.key = null
	schema.registry.ssl.keystore.location = null
	schema.registry.ssl.keystore.password = null
	schema.registry.ssl.keystore.type = JKS
	schema.registry.ssl.protocol = TLSv1.3
	schema.registry.ssl.provider = null
	schema.registry.ssl.secure.random.implementation = null
	schema.registry.ssl.trustmanager.algorithm = PKIX
	schema.registry.ssl.truststore.certificates = null
	schema.registry.ssl.truststore.location = null
	schema.registry.ssl.truststore.password = null
	schema.registry.ssl.truststore.type = JKS
	schema.registry.url = [http://192.168.56.101:8081]
	use.latest.version = false
	value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

 

 

스키마 레지스트리

● schema.registry.url = [http://192.168.56.101:8081]

 key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

 value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

 

 

(3) 메시지 전송

private final KafkaTemplate<String, ReviewMessage> kafkaTemplate;

public void sendMessage(ReviewEvent event) {
    // ...
    
    kafkaTemplate.send(topic, key, message).addCallback(new ListenableFutureCallback<SendResult<String, ReviewMessage>>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("kafka send error: {}", ex.getMessage());
        }

        @Override
        public void onSuccess(SendResult<String, ReviewMessage> result) {
            log.info("kafka send result: {}", result.toString());
        }
    });
}

 

 

 

3) 컨슈머

(1) 컨슈머 설정

 

application.properties

spring.kafka.consumer.bootstrap-servers=192.168.56.101:9092,192.168.56.101:9093,192.168.56.101:9094
spring.kafka.consumer.properties.schema.registry.url=http://192.168.56.101:8081
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.allow.auto.create.topics=false
spring.kafka.consumer.properties.auto.offset.reset=earliest
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.listener.type=BATCH
spring.kafka.listener.ack-mode=MANUAL

 

ConsumerConfig

(org.apache.kafka.clients.consumer.ConsumerConfig)

ConsumerConfig values: 
	allow.auto.create.topics = false
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.56.101:9092, 192.168.56.101:9093, 192.168.56.101:9094]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-papaco-review-project-query-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = papaco-review-project-query
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

 

bootstrap-servers

 spring.kafka.consumer.bootstrap-servers=192.168.56.101:9092,192.168.56.101:9093,192.168.56.101:9094

bootstrap-servers 는 브로커 리스너들의 리스트이고 브로커들의 메타정보를 가져오는데 사용된다.

카프카 브로커들은 토픽 파티션의 Leader와 Follower들의 메타 정보를 서로 공유하고 있는데 카프카 컨슈머는 bootstrap-servers 에 접속해 이 메타 정보를 가져오고 메타 정보를 통해 토픽의 파티션이 있는 브로커로 다시 접속하게 된다.

2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.

(프로듀서도 동일)

 

auto.offset.reset

 auto.offset.reset = earliest

__consumer_offsets에 컨슈머 그룹이 해당 토픽의 파티션별로 offset 정보를 가지고 있지 않을 때 컨슈머가 접속하면 파티션의 처음 offset 부터 가져오도록 설정한다.

 

Consumer Fetcher 설정 파라미터

 fetch.min.bytes = 1

 fetch.max.wait.ms = 500

 fetch.max.bytes = 52428800

 max.partition.fetch.bytes = 1048576

 max.poll.records = 500

 BatchConsumerAwareMessageListener.DEFAULT_COMMIT_TIMEOUT = 30; (스프링 카프카 컨슈머)

Fetcher가 Linked Queue에서 가져오는 레코드의 개수는 500(max.poll.records)으로 제한한다.

Linked Queue에 데이터가 없으면 ConsumerNetworkClient는 30초(DEFAULT_COMMIT_TIMEOUT)까지 Broker에 메시지를 요청한다.

ConsumerNetworkClient가 브로커에서 가져오는 레코드 배치의 최대 크기는 max.partition.fetch.bytes와 fetch.max.bytes로 설정한다.

 

리밸런스 관련 컨슈머 파라미터

 heartbeat.interval.ms = 3000

 session.timeout.ms = 45000

 max.poll.interval.ms = 300000

컨슈머는 3초(heartbeat.interval.ms)마다 브로커로 heartbeat을 보낸다.

컨슈머가 45초(session.timeout.ms)만큼 heartbeat을 보내지 않거나,

300초(max.poll.interval.ms)만큼 poll() 호출을 하지 않으면 브로커(코디네이터)는 리밸런스 명령을 리더 컨슈머에게 보낸다.

 

컨슈머 파티션 할당 전략(Cooperative Sticky)

 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

리밸런스 발생시 기존 토픽들의 파티션과 컨슈머 매핑은 최대한 유지하고 재할당되어야 하는 파티션들만 컨슈머에 따라 순차적으로 재할당을 수행한다. 모든 파티션 할당을 취소하지 않는 Cooperative 모드에서 동작한다.

 

 

 

(2) 메시지 컨슘

@KafkaListener(
            topics = "${papaco.kafka.review.avro.topic.name}", 
            groupId = "papaco-review-project-query",
            concurrency = "3")
public void listenReview(ConsumerRecords<String, ReviewMessage> records, Consumer<String, String> consumer) {
    consumer.commitSync();
}

 

컨슈머 그룹 ID

 @KafkaListener(groupId = "papaco-review-project-query")

review 관련 메시지를 수신하는 컨슈머들을 컨슈머 그룹으로 묶어서 관리한다. 컨슈머 그룹을 기준으로 브로커에서 오프셋(__consumer_offsets)을 관리하고 컨슈머 리밸런스가 발생하면 해당 오프셋을 기준으로 다시 메시지를 가져오기 때문에 메시지 중복이나 유실이 발생하지 않는다.

 

배치 컨슈머 리스너

 spring.kafka.listener.type=BATCH

 spring.kafka.listener.ack-mode=MANUAL

 public void listenReview(ConsumerRecords<String, ReviewMessage> records, Consumer<String, String> consumer)

 consumer.commitSync();

스프링 카프카 컨슈머에서 제공하는 BATCH 타입의 리스너를 사용해 레코드 묶음(ConsumerRecords)을 파라미터로 받아 한 번에 처리하도록 한다. 또한 AcksMode를 MANUAL로 설정해 Auto commit 이 아닌 동기 혹은 비동기 Manual commit 을 사용하도록 설정한다. Comsumer 객체를 파라미터로 받아 commitSync() 와 같이 Manual commit 메소드를 호출할 수 있다.

동기 오프셋 커밋은 메시지를 읽고 브로커에 commit 적용이 성공적으로 될 때까지 블로킹 하도록해 비동기 방식에 비해 메시지 중복 읽기 가능성을 줄여준다.

 

멀티 스레드 컨슈머

 @KafkaListener(concurrency = "3")

스프링 카프카 컨슈머는 concurrency 옵션값에 해당하는 만큼 컨슈머 스레드를 만들어서 병렬처리한다.

파티션의 개수만큼 컨슈머 스레드를 늘려서 운영하면 각 스레드에 각 파티션이 할당되며, 데이터 유실의 가능성이 작고 파티션 별로 컨슈머에서 레코드를 처리하므로 메시지 키를 갖는 경우 레코드 처리 역전이 발생하지 않는다.

 

 

 

 

 

정리

높은 처리량

카프카는 데이터 송수신을 묶어서 처리할 수 있기 때문에 동일한 양의 데이터에서 네트워크 통신 횟수를 줄일 수 있고, 데이터 배치 처리와 파티션 단위로 병렬 처리가 가능하기 때문에 높은 데이터 처리량을 보인다.

 

확장성

카프카는 데이터가 적을 때 클러스터의 브로커를 최소한의 개수로 운영하다가 데이터가 많아지면 브로커의 개수를 늘려 스케일 아웃할 수 있다. 반대로 스케일 인도 가능하다.

 

영속성

카프카는 디스크 기반의 파일 시스템에 데이터를 저장하기 때문에 브로커에 장애가 발생해 종료되더라도 데이터가 사라지지 않는다.

 

고가용성

3개 이상의 서버들로 운영되는 카프카 클러스터는 데이터를 복제해 저장하기 때문에 일부 서버에 장애가 발생하더라도 무중단으로 안전하고 지속적으로 데이터를 처리할 수 있다.

 

 

 

 

 

 

 

 

 

참고

아파치 카프카 애플리케이션 프로그래밍

카프카 완벽 가이드 - 코어편

 

 

 

 

 

 

728x90

'Kafka' 카테고리의 다른 글

[Kafka] 5. 스키마 레지스트리  (0) 2023.04.18
[Kafka] 4. 카프카 컨슈머  (0) 2023.02.02
[Kafka] 3. 카프카 프로듀서  (0) 2023.02.02
[Kafka] 2. 토픽과 파티션  (0) 2023.02.02
[Kafka] 1. 카프카 기본 개념  (0) 2023.02.01