개발 일지

Kafka Consumer #4 Offset Commit Strategy

북극곰은콜라 2023. 6. 22. 18:08
반응형


개요

Kafka Consumer의 Commit 설정 및 전략에 대한 조사

 


Kafka Commit Config

name Desc Default
enable.auto.commit Auto-Commit 사용 여부 true
auto.commit.interval.ms Auto-Commit 사용 시 Commit을 수행할 시간 (ms) 5000

 


AUTO-Commit

enable.auto.commit : commit 여부
auto.commit.interval.ms : commit interval 시간
public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        nextAutoCommitTimer.update(now);
        if (nextAutoCommitTimer.isExpired()) {
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            autoCommitOffsetsAsync();
        }
    }
}
auto commit이 enable true 인 경우
poll() 호출 시 마다 ConsumerCoordinator의 poll() 이 호출되고
poll() 내부적으로 autoCommit설정 시 동작하는 maybeAutoCommitOffsetsAsync()가 수행된다.

 


Confluent Client Sample Code

final Consumer<String, DataRecord> consumer = new KafkaConsumer<String, DataRecord>(props);
consumer.subscribe(Arrays.asList(topic));

Long total_count = 0L;

try {
  while (true) {
    ConsumerRecords<String, DataRecord> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, DataRecord> record : records) {
      String key = record.key();
      DataRecord value = record.value();
      total_count += value.getCount();
      System.out.printf("Consumed record with key %s and value %s, and updated total count to %d%n", key, value, total_count);
    }
  }
} finally {
  consumer.close();
}
일반 Client의 Sample은 Data를 순회하면서 처리한다.

문제:
 - 비동기 프로그램으로 record 처리 시, commit에 대한 문제가 발생
 - record들이 각각 비동기적으로 처리 될 시, commit 시점에 대한 고려가 필요함

 


Spring with Kafka (Commit Strategy)

Spring에서 Kafka를 Integration된 library의 interface / configuration 분석
Spring은 내부적으로 Spring Configuration을 통해 KafkaConsumer 객체를 생성 후 관리를 한다.
Spring은 Commit을 AckMode로 불린다.
Spring은 KafkaConsumer를 사용하기 위한 Listener를 제공한다.
Listener는 MessageListener와 BatchMessageListener 2가지를 제공하며, 차이점은 한 번에 처리할 Record가 1개 또는 다수 일지 이다.
자세한 사항은 후술

Spring Kafka AckMode

AckMode Desc
RECORD Commit the offset after each record is processed by the listener.
한번에 하나의 처리된 레코드 커밋
BATCH Commit the offsets of all records returned by the previous poll after they all have been processed by the listener.
이전 poll() record가 모두 처리된 후 커밋
TIME Commit pending offsets after
{@link ContainerProperties#setAckTime(long) ackTime} has elapsed.

actTime 이후 pending record 커밋
COUNT Commit pending offsets after
{@link ContainerProperties#setAckCount(int) ackCount} has been exceeded.

AckCount 만큼 record 처리 후 커밋
COUNT_TIME Commit pending offsets after
{@link ContainerProperties#setAckCount(int) ackCount} has been exceeded or after {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.

TIME or COUNT 조건 만족 시 커밋
MANUAL Listener is responsible for acking - use a
{@link org.springframework.kafka.listener.AcknowledgingMessageListener}; acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener.

AcknowledgingMessageListener를 통해서 커밋을 큐잉한다.
Acknowledgement.acknowledge()가 호출되면 다음번 poll()때 이전 Records들을 commit 한다.
MANUAL_IMMEDIATE Listener is responsible for acking - use a
{@link org.springframework.kafka.listener.AcknowledgingMessageListener}; the commit will be performed immediately if the {@code Acknowledgment} is acknowledged on the calling consumer thread; otherwise, the acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener; results will be indeterminate if you sometimes acknowledge on the calling thread and sometimes not.

AcknowledgingMessageListener를 통해 즉시 커밋을 호출한다.

 


Quarkus with Kafka (Commit Strategy)

참고:

Quarkus는 MSA를 구성하기 위해 만들어진 Java 기반 Framework이다.
Red-hat의 주도로 만들어졌으며, 주요 이점은 다음과 같다
 - 개발자 친화적
 - 컨테이너(쿠버네티스)에 Java를 최적화 (GraalVM 최적화)
 - common한 framework와 결합
 - Reactive한 code flow 지원

Quarkus의 Kafka 비동기 프로그래밍 Commit 전략

Strategy Desc
Throttled 제한 전략
모든 비동기 처리에 대한 ack를 관리하고
특정 위치의 이전 ack가 모두 성공한 상황에서 commit 실행
영원히 오지 않을 수 있는 ack에 대해서는 별도의 max-age 속성을 통해 시간 체크 후 무시
Ignore 무시 전략
commit에 관여하지 않는다.
gracefully shutdown시 또는 rebalance 시에만 commit 동작
Latest 최신 전략
일정 주기로 비동기 처리의 최신 ack의 offset을 commit

 


Reactive Kafka (Commit Strategy)

ACK Mode Summary

AckMode Desc
AUTO_ACK AUTO_ACK 모드로 Flux 생성 시 KafkaConsumer에서 poll() records들을 Flux로 받는다
전달받은 Flux (poll records)가 종료되면 자동으로 commit 처리된다.
최대 poll() 된 크기 단위로 Message 중복이 발생할 수 있다.
MANUAL_ACK 수동으로 offset 조작을 하기 위한 모드
전달받은 Record offset을 기반으로 하여 수동으로 commit 수행 (내부적으로 commit offset 보다 작으면 무시)
offset이 기존보다 크면, commit scheduler commit offset 등록
ATMOST_ONCE 중복 처리를 방지하는 커밋 모드
poll() 이 후 flux 생성 전 동기적으로 poll() record에 대한 commit을 진행
commit이 동기적으로 수행되기에 실제 record를 받는데 비용이 발생
사실 상 record를 받은 시점에서 commit 하는것으로 볼 수 있으며
commit 설정에 따라서 Message 손실이 발생할 수 있다.
EXACTLY_ONCE consum → process → produce transaction을 보장해주는 모드
produce transaction을 받아서 commit()을 실행한다.

AUTO_ACK vs ATMOST_ONCE 성능

AUTO_ACK 모드는 asyncCommit()을 이용하기 때문에, commit 메시지가 유실될 수 있어, 메시지 중복처리가 발생할 수 있다.
ATMOST_ONCE는 syncCommit()을 이용하기 때문에, commit 메시지가 유실되지 않으며, 미리 commit하기 때문에 중복처리가 없지만, 손실이 발생할 수 있다.
따라서 성능적으로 asyncCommit()을 사용하는 AUTO_ACK 모드가 syncCommit()을 사용하는 ATMOST_ONCE보다 좋다

Reactive Kafka Commit Config

Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.89:29091");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test-1");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, List.of("LoggingCommitInterceptor"));

ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerConfig);

this.kafkaReceiver = KafkaReceiver
    .create(receiverOptions
        .commitInterval(Duration.ZERO)
        .commitBatchSize(1)
        .subscription(List.of("commit-test-1")));
ConsumerConfig (Kafka Client Config)를 기반으로 ReceiverOptions 생성 후
Reactive에서 정의된 option들을 추가적으로 설정한다.
KafkaReceiver는 ReceiverOptions 기반으로 생성한다.
내부적으로 KafkaClient는 ConsumerConfig를 기반으로 동작하고, ConsumerHandler가 전체적인 Config를 종합하여 동작한다.
※ KafkaReceiver는 기본적으로 Commit을 ACK_MODE에 따라서 자체적으로 관리한다. 따라서 ConsumerConfig의 Auto-commit 옵션은 무시된다고 보면 된다.
name Desc
commitInterval(Duration commitInterval) Commit interval 시간
값 설정 시
·         batchSize에 도달 안해도 시간에 따라서 commit 요청
0으로 설정 시
·         batchSize가 설정 되어있을 시 batchSize 설정 만으로 commit 동작
·         batchSzie 0일 시 수동으로 commit에 대한 handling 필요 (ReceiverOffset.commit())
commitBatchSize(int commitBatchSize) commit시 필요한 record의 최소 단위 설정
값 설정시
·         값에 도달하면 commit
0으로 설정 시
·         interval 설정에 의해서만 commit 동작
·         interval 또한 0이면 수동으로 commit에 대한 handling 필요 (ReceiverOffset.commit())
atmostOnceCommitAheadSize(int commitAheadSize) ATMOST_ONCE에서 보장하는 최대로 손실될 size
값 설정시
·         매번 record 발행 전 commit을 확인하여 발행할 records가 현재 commit을 넘으면, size + 1 만큼 syncCommit()을 한다.
·         따라서 최대로 손실 될 수 있는 message 갯수는 size + 1 이다.
0으로 설정 시
·         record 발행 시마다 미리 syncCommit()을 한다.
maxCommitAttempts(int maxAttempts) commit 최대 시도 횟수
commitRetryInterval(Duration commitRetryInterval) commit retry 시간
maxDeferredCommits(int maxDeferred) 지연된 commit의 최대 갯수

AUTO_ACK

default Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
    return receiveAutoAck(null);
}
poll() 된 records단위로 flux가 생성된다.
flux 단위로 commit이 처리된다.
따라서, 서버가 갑자기 다운되는 경우 마지막으로 처리된 poll() records가 commit 되어있어 중복 처리가 발생
this.kafkaReceiver = KafkaReceiver
    .create(receiverOptions
        .commitInterval(Duration.of(10, ChronoUnit.MILLIS))
        .commitBatchSize(1)
        .subscription(List.of("commit-test-1")));

...

private static void subscribeAutoAck() {
  KafkaReceiverManager kafkaReceiverManager = new KafkaReceiverManager();

  kafkaReceiverManager.getReceiver()
      .receiveAutoAck()
      .concatMap(r -> r)
      .doOnNext(record -> errorByPrefix("55", record.value()))
      .onErrorContinue((e, o) -> {
        log.error("error on consume", e);
        System.exit(1);
      })
      .subscribe(record -> log.info("key: {}, value: {}", record.key(), record.value()));
}

// concatMap

Test log

// 10개 씩 records Poll()
14:15:46.014 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-2740,commit-test-1-0-2741,commit-test-1-0-2742,commit-test-1-0-2743,commit-test-1-0-2744,commit-test-1-0-2745,commit-test-1-0-2746,commit-test-1-0-2747,commit-test-1-0-2748,commit-test-1-0-2749

// Emit
14:15:46.014 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0

// 10개 정상 처리
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 41-1260010f-ded8-4c5e-b8db-8006d2491bff
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 42-04dbe9e7-b21f-445e-8d3f-09a582dbe3b9
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 43-14d5c8e4-e1b5-4610-be9d-edac51cf9f6c
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 44-161c3aae-6d60-49bd-bc4a-95a811988d57
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 45-c7690625-9ae5-4987-9519-e8b6dfb82831
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 46-9f9e68b5-a199-4b25-abca-263742e6cb33
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 47-474d1bfa-c755-4ca8-ab57-771cb4d718e2
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 48-0ab63698-8ff2-4369-bc93-e9c763c4b152
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 49-cb46f2a2-79a7-4949-bda3-8c62719113da
14:15:46.014 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 50-7bc420c8-1a00-4c41-9f63-599bf49a80d0
14:15:46.014 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- onRequest.toAdd 1, paused false

// Async Commit (check by ConsumerEventLoop)
14:15:46.014 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Async committing: {commit-test-1-0=OffsetAndMetadata{offset=2750, leaderEpoch=null, metadata=''}}

// KafkaConsumer로 commit 호출
14:15:46.014 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Committing offsets: {commit-test-1-0=OffsetAndMetadata{offset=2750, leaderEpoch=null, metadata=''}}

// NetworkClient로 commit 호출, Get Reponse From Broker
14:15:46.014 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=35) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='test-1', generationId=90, memberId='consumer-test-1-1-97557ddd-e091-4e03-9db5-c687955bd175', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='commit-test-1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=2750, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
14:15:46.015 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=33): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='commit-test-1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])

// 10개 씩 Poll()
14:15:46.015 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-2750,commit-test-1-0-2751,commit-test-1-0-2752,commit-test-1-0-2753,commit-test-1-0-2754,commit-test-1-0-2755,commit-test-1-0-2756,commit-test-1-0-2757,commit-test-1-0-2758,commit-test-1-0-2759

// Emit
14:15:46.015 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0

// 54까지 정상, 55에서 Error, System.exit(1)
14:15:46.015 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 51-22044176-c4bf-4f77-bbfa-1ba66f638db6
14:15:46.015 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 52-c830bc30-c4b1-4155-9916-982c8d2fba6d
14:15:46.015 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 53-aefd0b98-9309-440c-998f-c2f6c27acb92
14:15:46.015 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 54-4262041c-9ace-45ea-b87d-cb993c10922c
14:15:46.017 [reactive-kafka-test-1-1] ERROR TestMain -- error on consume
java.lang.RuntimeException: prefix error !!
	at TestMain.errorByPrefix(TestMain.java:40)
	at TestMain.lambda$subscribeAutoAck$1(TestMain.java:22)
...

기본적으로 polling된 Records의 단위로 동작을 하기 때문에, 메시지의 중복 처리가 발생할 수 있다.
Commit이 별도의 Async로 동작하기에, CommitInterval 설정과 BatchSize를 적절하게 세팅해야한다.
경우에 따라서 중복 처리되는 메시지가 늘어날 수 있다.
 - commitAsync 요청이 NetworkClient 안에서 큐잉이 발생했을 때 Application이 강제 종료되면 마지막으로 보내진 Commit 이후 메시지들이 중복처리됨

ATMOST_ONCE

default Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
    return receiveAtmostOnce(null);
}
this.kafkaReceiver = KafkaReceiver
        .create(receiverOptions
            .atmostOnceCommitAheadSize(20)
            .subscription(List.of("commit-test-1")));

private static void subscribeAtMostOnce() {
  KafkaReceiverManager kafkaReceiverManager = new KafkaReceiverManager();

  kafkaReceiverManager.getReceiver()
      .receiveAtmostOnce()
      .doOnNext(record -> errorByPrefix("55", record.value()))
      .onErrorContinue((e, o) -> {
        log.error("error on consume", e);
        System.exit(1);
      })
      .subscribe(record -> {
        log.info("key: {}, value: {}", record.key(), record.value());
      });
}

Test Log

size = 0

...
16:29:08.387 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 52-9f1921f1-9eee-4d50-a42d-f008687aada2
16:29:08.387 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Sync committing: {commit-test-1-0=OffsetAndMetadata{offset=2953, leaderEpoch=null, metadata=''}}
16:29:08.387 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=62) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='test-1', generationId=94, memberId='consumer-test-1-1-2c2a4598-c0ca-463a-a31a-40c591c40daa', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='commit-test-1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=2953, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
16:29:08.388 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=62): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='commit-test-1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
16:29:08.389 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Committed offset 2953 for partition commit-test-1-0
16:29:08.389 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-2953
16:29:08.389 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 53-fd5794b0-0252-4345-a110-cd1e5eab7dbc
16:29:08.389 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-2990,commit-test-1-0-2991,commit-test-1-0-2992,commit-test-1-0-2993,commit-test-1-0-2994,commit-test-1-0-2995,commit-test-1-0-2996,commit-test-1-0-2997,commit-test-1-0-2998,commit-test-1-0-2999
16:29:08.389 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 10
16:29:08.389 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Sync committing: {commit-test-1-0=OffsetAndMetadata{offset=2954, leaderEpoch=null, metadata=''}}
16:29:08.389 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=63) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='test-1', generationId=94, memberId='consumer-test-1-1-2c2a4598-c0ca-463a-a31a-40c591c40daa', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='commit-test-1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=2954, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
16:29:08.390 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=63): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='commit-test-1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
16:29:08.391 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Committed offset 2954 for partition commit-test-1-0
16:29:08.391 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-2954
16:29:08.391 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 54-d5942adc-529f-4af9-99d7-5c8df55bb1aa
16:29:08.391 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Sync committing: {commit-test-1-0=OffsetAndMetadata{offset=2955, leaderEpoch=null, metadata=''}}
16:29:08.391 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=64) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='test-1', generationId=94, memberId='consumer-test-1-1-2c2a4598-c0ca-463a-a31a-40c591c40daa', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='commit-test-1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=2955, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
16:29:08.392 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1-1, correlationId=64): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='commit-test-1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
16:29:08.392 [reactive-kafka-test-1-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -- [Consumer clientId=consumer-test-1-1, groupId=test-1] Committed offset 2955 for partition commit-test-1-0
16:29:08.392 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-2955
16:29:08.395 [reactive-kafka-test-1-1] ERROR TestMain -- error on consume
java.lang.RuntimeException: prefix error !!
	at TestMain.errorByPrefix(TestMain.java:51)
...

size = 20

...
// poll 10
08:36:58.748 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3800,commit-test-1-0-3801,commit-test-1-0-3802,commit-test-1-0-3803,commit-test-1-0-3804,commit-test-1-0-3805,commit-test-1-0-3806,commit-test-1-0-3807,commit-test-1-0-3808,commit-test-1-0-3809
...
// poll: 10, commit: 0 -> commit: 21
08:36:58.757 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3821
08:36:58.757 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 1-99908b38-4cce-43c9-8e1e-2bdadfc8918a
// poll: 10, commit: 21 -> poll: 20
08:36:58.758 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3810,commit-test-1-0-3811,commit-test-1-0-3812,commit-test-1-0-3813,commit-test-1-0-3814,commit-test-1-0-3815,commit-test-1-0-3816,commit-test-1-0-3817,commit-test-1-0-3818,commit-test-1-0-3819
...
// poll: 20, commit: 21 -> commit: 22
08:36:58.760 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3822
08:36:58.760 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 2-f156abbd-16b5-4f5e-a951-1936df768fff
...
08:36:58.761 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 10-46020f53-66bb-4c4e-9a8a-00d646d781d0
08:36:58.761 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 11-6000179f-5bea-44ba-86ff-c1dcb044cbb3
...
08:36:58.762 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 20-ab35889b-aa01-4b71-a47c-42389d5f3173
...
// poll: 20, commit:22 -> commit: 32
08:36:58.764 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3832
// poll: 20, commit:32 -> poll: 30
08:36:58.765 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3820,commit-test-1-0-3821,commit-test-1-0-3822,commit-test-1-0-3823,commit-test-1-0-3824,commit-test-1-0-3825,commit-test-1-0-3826,commit-test-1-0-3827,commit-test-1-0-3828,commit-test-1-0-3829
08:36:58.765 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0
08:36:58.765 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 21-e78a711a-02d3-4615-88a4-46a9fc4e001f
...
08:36:58.765 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 30-1af38ef1-1ea5-4147-874a-465aa9a49151
...
// poll: 30, commit: 32 -> commit: 43
08:36:58.767 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3843
// poll:30, commit: 43 -> poll: 40
08:36:58.767 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3830,commit-test-1-0-3831,commit-test-1-0-3832,commit-test-1-0-3833,commit-test-1-0-3834,commit-test-1-0-3835,commit-test-1-0-3836,commit-test-1-0-3837,commit-test-1-0-3838,commit-test-1-0-3839
08:36:58.767 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0
08:36:58.768 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 31-7a4b04d2-eb7c-4b59-a241-ca5d4055b2b4
...
08:36:58.768 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 40-3b6b0504-695f-44c3-9182-8daa1b3406c3
...
// poll: 40, commit: 43 -> commit: 54
08:36:58.770 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3854
// poll: 40, commit: 54 -> commit: 50
08:36:58.770 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3840,commit-test-1-0-3841,commit-test-1-0-3842,commit-test-1-0-3843,commit-test-1-0-3844,commit-test-1-0-3845,commit-test-1-0-3846,commit-test-1-0-3847,commit-test-1-0-3848,commit-test-1-0-3849
08:36:58.770 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0
08:36:58.770 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- onRequest.toAdd 1, paused false
08:36:58.770 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 41-7dbe2748-ff08-4035-bd4f-c183f3e15018
...
08:36:58.771 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 50-f350dc4b-70b6-4746-b625-653103dfd6f6
...
// poll: 50, commit: 54 -> commit: 65
08:36:58.772 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onCommit: commit-test-1-0-3865
// poll: 50, commit: 65 -> poll: 60
08:36:58.773 [reactive-kafka-test-1-1] INFO LoggingCommitInterceptor -- onConsume: commit-test-1-0-3850,commit-test-1-0-3851,commit-test-1-0-3852,commit-test-1-0-3853,commit-test-1-0-3854,commit-test-1-0-3855,commit-test-1-0-3856,commit-test-1-0-3857,commit-test-1-0-3858,commit-test-1-0-3859
08:36:58.773 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- Emitting 10 records, requested now 0
08:36:58.773 [reactive-kafka-test-1-1] DEBUG reactor.kafka.receiver.internals.ConsumerEventLoop -- onRequest.toAdd 1, paused false
08:36:58.773 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 51-3aabfb11-860a-419e-b712-88fd9d268e73
08:36:58.773 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 52-0257c5c5-5d9f-4f09-a49f-7a9e71fd6bff
08:36:58.773 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 53-a28607e5-bf88-4434-b41a-273335ad257c
08:36:58.773 [reactive-kafka-test-1-1] INFO TestMain -- key: null, value: 54-9d74c852-2a00-41d4-82c8-78565b1d52fb
08:36:58.775 [reactive-kafka-test-1-1] ERROR TestMain -- error on consume
java.lang.RuntimeException: prefix error !!
	at TestMain.errorByPrefix(TestMain.java:51)
	at TestMain.lambda$subscribeAtMostOnce$4(TestMain.java:39)
...

 


REFERENCE

 

 

 

 

 

 

 

반응형