반응형
개요
Kafka Consumer에 대한 세부 조사
Kafka Client와 Kafka Broker의 rebalance flow 및 requset / response data 조사
Kafka Consumer Assignor 관련 조사
Kafka Consumer STW 관련 조사
Consumer Startup
Group Coordinator 란
Kafka Broker에서 Consumer Group을 관리하기 위한 컴포넌트
기본적으로 Consumer Group 별로 존재
Cluster내 임의의 broker에 생성 됨
Consumer Group내 member의 변동을 감지하고 Rebalance를 관장
kafka topic을 이용해서 group의 metadata를 유지
총 3가지 단계로 coordinator와 consumer간의 협상이 일어난다.
1. start group: coordinator를 찾는 단계
2. join group: coordinator를 통해 consumer가 group에 join 하는 단계
3. sync group: join된 group에서 토픽 / 파티션을 sync하는 단계
1. Start Group
일반적으로 Consumer가 Startup 할 시 group.id 를 기반으로 findCoordinator 요청을 한다.
broker는 group.id를 기반으로 group coordinator를 생성할 브로커를 선정 및 생성하고 해당 coordinator의 endpoint를 응답
findCoordinatorRequestData (SCHEMA_4)
name | type | Desc |
key_type | INT8 | The coordinator key type. (Group, transaction, etc.) |
coordinator_keys | String | The coordinator keys. |
findCoordinatorResponseData (SCHEMA_4)
name | type | Desc |
throttle_time_ms | INT32 | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
coordinators | list | Each coordinator result in the response |
coordinatorData.key | String | The coordinator key. |
coordinatorData.node_id | INT32 | The node id. |
coordinatorData.host | String | The host name. |
coordinatorData.port | INT32 | The port. |
coordinatorData.error_code | INT16 | The error code, or 0 if there was no error. |
coordinatorData.error_message | String | The error message, or null if there was no error. |
2. Join Group
consumer들은 group coordinator로 join 요청 (구독 정보 포함)
group coordinator는 그룹에서 최초로 join 요청을 보낸 consumer를 leader로 선정
(최초 생성 시) Consumer Group의 offset 정보 등 group 관련 데이터 생성
consumer에게 memberId를 응답, leader에게는 memberList 및 subscription 정보 전달
JoinGroupRequestData (SCHEMA_8)
name | type | Desc |
group_id | String | The group identifier. |
session_timeout_ms | int | The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds. |
rebalance_timeout_ms | int | The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group. |
member_id | String | The member id assigned by the group coordinator. |
group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
protocol_type | String | The unique name the for class of protocols implemented by the group we want to join. (consumer, connect) |
reason | String | The reason why the member (re-)joins the group. |
protocols | list<Protocol> | The list of protocols that the member supports. (Partition Assignment) |
protocol.name | String | The protocol name. (Assigner Name) |
protocol.metadata | bytes | The protocol metadata. (Subscription 객체를 serialize) |
JoinGroupResponseData (SCHEMA_9)
name | type | Desc |
throttle_time_ms | int | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
error_code | int | The error code, or 0 if there was no error. |
generation_id | int | The generation ID of the group. |
protocol_type | String | The group protocol name. |
protocol_name | String | The group protocol selected by the coordinator. |
leader | String | The leader of the group. |
skip_assignment | boolean | True if the leader must skip running the assignment. |
member_id | String | The member ID assigned by the group coordinator. |
members | List<Member> | |
member.member_id | String | The group member ID. |
member.group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
member.metadata | bytes | The group member metadata. |
3. SyncGroup
Consumer Group Leader는 구독할 파티션에 대한 전략을 수행
각 consumer member가 구독할 파티션 정보를 coordinator로 전달
다른 consumer들은 coordinator로 지속적으로 자신이 구독할 파티션에 대한 sync 요청을 보냄
consumer partition assign 전략은 다음편에 자세히 설명
SyncGroupRequestData (SCHEMA_5)
name | type | Desc |
group_id | String | The unique group identifier. |
generation_id | int | The generation of the group. |
member_id | String | The member ID assigned by the group. |
group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
protocol_type | String | The group protocol type. |
protocol_name | String | The group protocol name. |
assignments | List | Each assignment. (각 member 별 assign 된 결과 리스트) |
assignments.member_id | Strring | The ID of the member to assign. |
assignments.assignment | bytes | The member assignment. (assignment 결과를 serialize) |
SyncGroupResponseData (SCHEMA_5)
name | type | Desc |
throttle_time_ms | int | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
error_code | int | The error code, or 0 if there was no error. |
protocol_type | String | The group protocol type. (consumer, connect) |
protocol_name | String | The group protocol name. = assignment.getName() |
assignment | bytes | The member assignment. |
Consumer Rebalance
Rebalancing 조건
1. 인스턴스가 timeout안에 heartbeat 실패하여 group에서 제외 되었을 때
2. 인스턴스가 group에 추가되었을 때
3. 구독중인 토픽에 파티션이 추가 되었을 때
4. *로 topic을 subscribe 중에, 조건에 맞는 topic이 추가되었을 때
5. consumer group이 시작 될 때
Rebalance noti
heartbeat response 또는 offsetFetch response를 통해 consumer에게 notify한다.
REFERENCE
- https://kafka.apache.org/documentation/#majordesignelements
- https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/consumer/package-summary.html
- https://velog.io/@hyun6ik/Apache-Kafka-Partition-Assignment-Strategy
- https://developer.confluent.io/learn-kafka/architecture/consumer-group-protocol/
- https://github.com/apache/kafka/tree/3.4
반응형
'개발 일지' 카테고리의 다른 글
Kafka Consumer #3 Consume 로직 (0) | 2023.06.21 |
---|---|
Kafka Consumer #2 Partition Assignor Strategy (0) | 2023.06.15 |
Expand Kafka Cluster (0) | 2023.06.13 |
Webclient 동작 원리 및 Configuration (0) | 2023.05.30 |
HttpClient (Netty) Configuration 정리 (0) | 2023.05.25 |