반응형
개요
Consum 과정은 polling(fetch) + commit offset으로 볼 수 있다.
Broker로부터 Date를 가져와서 처리 후 처리된 offset만큼 commit하는 과정이다.
- kafkaConsumer가 initialize 하면서 Broker로 부터 커밋된 offset 정보를 받아온다.
- polling을 통해 Data를 받는다.
- commit을 통해 offset을 올린다.
1. Fetch Offset
1. poll 호출을 받으면 우선 모든 구독된 토픽/파티션에 offset이 있는지 확인한다.
2. 만약 하나라도 offset 정보가 없다면 resfresh offset 로직이 동작한다.
3. ConsumerCoordinator는 OffsetFetch Reuqest를 만들고, future에 대한 handler를 등록한다.
4. Request 객체로 ConsumerNetworkClient에게 send 요청을 한다.
5. 그 후 ConsumerCoordinator에게 리턴하고, ConsumerCoordinator는 NetworkClient의 response가 올 때 까지 block 됩니다.
6. networkClient는 Broker와 통신을 하고 응답을 등록된 handler를 통해 future 객체에 offset을 세팅합니다.
7. coordinator는 future에서 offset을 받아서 구독 정보에 해당 offset 정보를 세팅하고 리턴합니다.
8. 이후 polling 프로세스로 넘어갑니다.
OffsetFetchRequestData (SCHEMA_8)
name | type | Desc |
groups | List<Group> | Each group we would like to fetch offsets for |
group.group_id | String | The group ID. |
group.topics | List<Topic> | Each topic we would like to fetch offsets for, or null to fetch offsets for all topics. |
Topic.name | String | The topic name. |
Topic.partition_indexes | List<Integer> | The partition indexes we would like to fetch offsets for. |
require_stable | boolean | Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions. |
2-1. Polling
1. kafkaConsumer는 poll이 호출되면, Fetcher에게 Fetch된 데이터를 달라고 한다.
2. Fetcher는 이미 Fetch 된 데이터를 maxPollRecords 만큼 반환한다.
- 부족하거나 없다면 Empty 반환
3. kafkaConsumer는 리턴된 Fetch를 확인하여, empty인 경우 Fetcher에게 Fetch(sendFetch)를 호출한다.
4. Fetcher는 sendFetch가 호출되면, 설정 및 파티션 할당 정보로 Fetch Reuquest를 구성하여 ConsumernetworkClient의 send()를 호출
- Fetcher는 send()호출을 하면서, future에 success 및 fail응답에 대한 listener를 작성
- KafkaConsumer는 ConsumerNetworkClient에게 poll()을 호출하여, 응답을 받을 때 까지 Block
5. ConsumerNetworkClient는 Broker에게 Fetch request 요청
- ConsumerNetworkClient는 요청 후 fetchRequest(network)가 실행되는 쓰레드가 block 됨
6. Broker에게 응답을 받으면, Fetcher가 작성한 handler가 실행 됨
2-2. Fetch
주요 옵션
Option | Desc |
fetch.min.bytes | fetch 시 브로커에게 최소한 fetch.min.bytes 값만큼 데이터를 요청 반환할 만큼 데이터가 충분하지 않다면 브로커는 데이터가 누적되길 기다린다. default 1 |
fetch.max.wait.ms | 브로커가 Fetch API 요청을 받았을 때 fetch.min.bytes 값만큼 데이터가 없는 경우 응답을 주기까지 최대로 기다릴 시간 default 500ms |
fetch.max.bytes | Fetch API 요청에 대해 브로커가 반환해야 하는 최대 데이터 크기이다. 다만 첫 번째 파티션의 첫 번째 메시지가 이 값보다 크다면 컨슈머가 계속 진행될 수 있도록 데이터가 반환된다. 브로커가 허용하는 최대 메시지 크기는 message.max.bytes와 max.message.bytes를 통해 설정한다. default 52428800(50MiB) |
FetchRequestData (SCHEMA_13)
name | type | Desc |
cluster_id | COMPACT_NULLABLE_STRING | The clusterId if known. This is used to validate metadata fetches prior to broker registration. |
replica_id | INT32 | The broker ID of the follower, of -1 if this request is from a consumer. |
max_wait_ms | INT32 | The maximum time in milliseconds to wait for the response. |
min_bytes | INT32 | The minimum bytes to accumulate in the response. |
max_bytes | INT32 | The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored. |
isolation_level | INT8 | This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records |
session_id | INT32 | The fetch session ID. |
session_epoch | INT32 | The fetch session epoch |
topics | [{topic_id, partitionDatas}, …] | The topics to fetch. |
forgotten_topics_data | [{topic_id, partitionDatas}, …] | In an incremental fetch request the partitions to remove. |
rack_id | COMPACT_STRING | Rack ID of the consumer making this request |
partitionData.partition | INT32 | The partition index. |
partitionData.current_leader_epoch | INT32 | The current leader epoch of the partition. |
partitionData.fetch_offset | INT64 | The message offset. |
partitionData.last_fetched_epoch | INT32 | The epoch of the last fetched record or -1 if there is none |
partitionData.log_start_offset | INT64 | The earliest available offset of the follower replica. The field is only used when the request is sent by the follower. |
partitionData.partition_max_bytes | INT32 | The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored. |
FetchResponseData (SCHEMA_13)
name | type | Desc |
throttle_time_ms | INT32 | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
error_code | INT16 | The top level response error code. |
session_id | INT32 | The fetch session ID, or 0 if this is not part of a fetch session. |
responses | [{topic_id, partitionDatas}, …] | The response topics. |
partitionData에 Records 가 있다.
3. Commit Offset
1. commit 발생 시 ConsumerCoordinator의 commitOffset()을 호출 한다.
2. ComsumerCoordinator는 request 객체를 생성하고, response에 대한 handler를 등록한다.
3. ComsumerCoordinaor는 ConsumerNetworkClient의 send()를 호출하고, poll()을 통해 Broker response가 올 때 까지 Block 된다.
4. Response가 오면, handler를 통해 offset이 extract하고 interceptors의 onCommit()을 실행한다.
SyncCommit vs AsyncCommit
Sync는 실패시 기본적으로 retry정책에 따라 retry 수행
Async는 retry 정책을 onFailure에 직접 구현해야 함
OffsetCommitRequestData (SCHEMA_8)
name | type | Desc |
group_id | String | The unique group identifier. |
generation_id | int | The generation of the group. |
member_id | String | The member ID assigned by the group coordinator. |
group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
topics | List<Topic> | The topics to commit offsets for. |
name | String | The topic name. |
partitions | List<Partitions> | Each partition to commit offsets for. |
partition_index | int | The partition index. |
committed_offset | int | The message offset to be committed. |
committed_leader_epoch | int | The leader epoch of this partition. |
committed_metadata | String | Any associated metadata the client wants to keep. |
REFERENCE
- https://developer.confluent.io/learn-kafka/architecture/broker/
- https://d2.naver.com/helloworld/0974525
반응형
'개발 일지' 카테고리의 다른 글
mysql-connector with mariaDB 버전 호환성 이슈 (0) | 2023.07.26 |
---|---|
Kafka Consumer #4 Offset Commit Strategy (0) | 2023.06.22 |
Kafka Consumer #2 Partition Assignor Strategy (0) | 2023.06.15 |
Kafka Consumer #1 Startup / Rebalance Flow (0) | 2023.06.14 |
Expand Kafka Cluster (0) | 2023.06.13 |