개발 일지

Kafka Consumer #3 Consume 로직

북극곰은콜라 2023. 6. 21. 18:15
반응형


개요

Consum 과정은 polling(fetch) + commit offset으로 볼 수 있다.
Broker로부터 Date를 가져와서 처리 후 처리된 offset만큼 commit하는 과정이다.
 - kafkaConsumer가 initialize 하면서 Broker로 부터 커밋된 offset 정보를 받아온다.
 - polling을 통해 Data를 받는다.
 - commit을 통해 offset을 올린다.

 


1. Fetch Offset

1. poll 호출을 받으면 우선 모든 구독된 토픽/파티션에 offset이 있는지 확인한다.
2. 만약 하나라도 offset 정보가 없다면 resfresh offset 로직이 동작한다.
3. ConsumerCoordinator는 OffsetFetch Reuqest를 만들고, future에 대한 handler를 등록한다.
4. Request 객체로 ConsumerNetworkClient에게 send 요청을 한다.
5. 그 후 ConsumerCoordinator에게 리턴하고, ConsumerCoordinator는 NetworkClient의 response가 올 때 까지 block 됩니다.
6. networkClient는 Broker와 통신을 하고 응답을 등록된 handler를 통해 future 객체에 offset을 세팅합니다.
7. coordinator는 future에서 offset을 받아서 구독 정보에 해당 offset 정보를 세팅하고 리턴합니다.
8. 이후 polling 프로세스로 넘어갑니다.

OffsetFetchRequestData (SCHEMA_8)

name type Desc
groups List<Group> Each group we would like to fetch offsets for
group.group_id String The group ID.
group.topics List<Topic> Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
Topic.name String The topic name.
Topic.partition_indexes List<Integer> The partition indexes we would like to fetch offsets for.
require_stable boolean Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions.

 


2-1. Polling

1. kafkaConsumer는 poll이 호출되면, Fetcher에게 Fetch된 데이터를 달라고 한다.
2. Fetcher는 이미 Fetch 된 데이터를 maxPollRecords 만큼 반환한다.
  - 부족하거나 없다면 Empty 반환
3. kafkaConsumer는 리턴된 Fetch를 확인하여, empty인 경우 Fetcher에게 Fetch(sendFetch)를 호출한다.
4. Fetcher는 sendFetch가 호출되면, 설정 및 파티션 할당 정보로 Fetch Reuquest를 구성하여 ConsumernetworkClient의 send()를 호출
  - Fetcher는 send()호출을 하면서, future에 success 및 fail응답에 대한 listener를 작성
  - KafkaConsumer는 ConsumerNetworkClient에게 poll()을 호출하여, 응답을 받을 때 까지 Block
5. ConsumerNetworkClient는 Broker에게 Fetch request 요청
  - ConsumerNetworkClient는 요청 후 fetchRequest(network)가 실행되는 쓰레드가 block 됨
6. Broker에게 응답을 받으면, Fetcher가 작성한 handler가 실행 됨

 


2-2. Fetch

주요 옵션

Option Desc
fetch.min.bytes fetch 시 브로커에게 최소한 fetch.min.bytes 값만큼 데이터를 요청
반환할 만큼 데이터가 충분하지 않다면 브로커는 데이터가 누적되길 기다린다.
default 1
fetch.max.wait.ms 브로커가 Fetch API 요청을 받았을 때 fetch.min.bytes 값만큼 데이터가 없는 경우 응답을 주기까지 최대로 기다릴 시간
default 500ms
fetch.max.bytes Fetch API 요청에 대해 브로커가 반환해야 하는 최대 데이터 크기이다.
다만 첫 번째 파티션의 첫 번째 메시지가 이 값보다 크다면 컨슈머가 계속 진행될 수 있도록 데이터가 반환된다.
브로커가 허용하는 최대 메시지 크기는 message.max.bytes max.message.bytes를 통해 설정한다.
default 52428800(50MiB)

FetchRequestData (SCHEMA_13)

name type Desc
cluster_id COMPACT_NULLABLE_STRING The clusterId if known. This is used to validate metadata fetches prior to broker registration.
replica_id INT32 The broker ID of the follower, of -1 if this request is from a consumer.
max_wait_ms INT32 The maximum time in milliseconds to wait for the response.
min_bytes INT32 The minimum bytes to accumulate in the response.
max_bytes INT32 The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored.
isolation_level INT8 This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
session_id INT32 The fetch session ID.
session_epoch INT32 The fetch session epoch
topics [{topic_id, partitionDatas}, …] The topics to fetch.
forgotten_topics_data [{topic_id, partitionDatas}, …] In an incremental fetch request the partitions to remove.
rack_id COMPACT_STRING Rack ID of the consumer making this request
partitionData.partition INT32 The partition index.
partitionData.current_leader_epoch INT32 The current leader epoch of the partition.
partitionData.fetch_offset INT64 The message offset.
partitionData.last_fetched_epoch INT32 The epoch of the last fetched record or -1 if there is none
partitionData.log_start_offset INT64 The earliest available offset of the follower replica. The field is only used when the request is sent by the follower.
partitionData.partition_max_bytes INT32 The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored.

FetchResponseData (SCHEMA_13)

name type Desc
throttle_time_ms INT32 The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
error_code INT16 The top level response error code.
session_id INT32 The fetch session ID, or 0 if this is not part of a fetch session.
responses [{topic_id, partitionDatas}, …] The response topics.

partitionData에 Records 가 있다.

 


3. Commit Offset

1. commit 발생 시 ConsumerCoordinator의 commitOffset()을 호출 한다.
2. ComsumerCoordinator는 request 객체를 생성하고, response에 대한 handler를 등록한다.
3. ComsumerCoordinaor는 ConsumerNetworkClient의 send()를 호출하고, poll()을 통해 Broker response가 올 때 까지 Block 된다.
4. Response가 오면, handler를 통해 offset이 extract하고 interceptors의 onCommit()을 실행한다.

SyncCommit vs AsyncCommit

Sync는 실패시 기본적으로 retry정책에 따라 retry 수행
Async는 retry 정책을 onFailure에 직접 구현해야 함

OffsetCommitRequestData (SCHEMA_8)

name type Desc
group_id String The unique group identifier.
generation_id int The generation of the group.
member_id String The member ID assigned by the group coordinator.
group_instance_id String The unique identifier of the consumer instance provided by end user.
topics List<Topic> The topics to commit offsets for.
name String The topic name.
partitions List<Partitions> Each partition to commit offsets for.
partition_index int The partition index.
committed_offset int The message offset to be committed.
committed_leader_epoch int The leader epoch of this partition.
committed_metadata String Any associated metadata the client wants to keep.

 


REFERENCE

 

 

 

반응형