Kafka Consumer #1 Startup / Rebalance Flow

2023. 6. 14. 19:22


Kafka Consumer에 대한 세부 조사
Kafka Client와 Kafka Broker의 rebalance flow 및 requset / response data 조사
Kafka Consumer Assignor 관련 조사
Consumer Startup

Group Coordinator 란

Kafka Broker에서 Consumer Group을 관리하기 위한 컴포넌트
기본적으로 Consumer Group 별로 존재
Cluster내 임의의 broker에 생성 됨
Consumer Group내 member의 변동을 감지하고 Rebalance를 관장
kafka topic을 이용해서 group의 metadata를 유지
총 3가지 단계로 coordinator와 consumer간의 협상이 일어난다.
 1. start group: coordinator를 찾는 단계
 2. join group: coordinator를 통해 consumer가 group에 join 하는 단계
 3. sync group: join된 group에서 토픽 / 파티션을 sync하는 단계

1. Start Group

일반적으로 Consumer가 Startup 할 시 group.id 를 기반으로 findCoordinator 요청을 한다.
broker는 group.id를 기반으로 group coordinator를 생성할 브로커를 선정 및 생성하고 해당 coordinator의 endpoint를 응답

findCoordinatorRequestData (SCHEMA_4)

name type Desc
key_type INT8 The coordinator key type. (Group, transaction, etc.)
coordinator_keys String The coordinator keys.

findCoordinatorResponseData (SCHEMA_4)

name type Desc
throttle_time_ms INT32 The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
coordinators list Each coordinator result in the response
coordinatorData.key String The coordinator key.
coordinatorData.node_id INT32 The node id.
coordinatorData.host String The host name.
coordinatorData.port INT32 The port.
coordinatorData.error_code INT16 The error code, or 0 if there was no error.
coordinatorData.error_message String The error message, or null if there was no error.

2. Join Group

consumer들은 group coordinator로 join 요청 (구독 정보 포함)
group coordinator는 그룹에서 최초로 join 요청을 보낸 consumer를 leader로 선정
(최초 생성 시) Consumer Group의 offset 정보 등 group 관련 데이터 생성
consumer에게 memberId를 응답, leader에게는 memberList 및 subscription 정보 전달

JoinGroupRequestData (SCHEMA_8)

name type Desc
group_id String The group identifier.
session_timeout_ms int The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.
rebalance_timeout_ms int The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.
member_id String The member id assigned by the group coordinator.
group_instance_id String The unique identifier of the consumer instance provided by end user.
protocol_type String The unique name the for class of protocols implemented by the group we want to join.
(consumer, connect)
reason String The reason why the member (re-)joins the group.
protocols list<Protocol> The list of protocols that the member supports. (Partition Assignment)
protocol.name String The protocol name.
(Assigner Name)
protocol.metadata bytes The protocol metadata.
(Subscription 객체를 serialize)

JoinGroupResponseData (SCHEMA_9)

name type Desc
throttle_time_ms int The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
error_code int The error code, or 0 if there was no error.
generation_id int The generation ID of the group.
protocol_type String The group protocol name.
protocol_name String The group protocol selected by the coordinator.
leader String The leader of the group.
skip_assignment boolean True if the leader must skip running the assignment.
member_id String The member ID assigned by the group coordinator.
members List<Member>  
member.member_id String The group member ID.
member.group_instance_id String The unique identifier of the consumer instance provided by end user.
member.metadata bytes The group member metadata.

3. SyncGroup

Consumer Group Leader는 구독할 파티션에 대한 전략을 수행
각 consumer member가 구독할 파티션 정보를 coordinator로 전달
다른 consumer들은 coordinator로 지속적으로 자신이 구독할 파티션에 대한 sync 요청을 보냄
consumer partition assign 전략은 다음편에 자세히 설명

SyncGroupRequestData (SCHEMA_5)

name type Desc
group_id String The unique group identifier.
generation_id int The generation of the group.
member_id String The member ID assigned by the group.
group_instance_id String The unique identifier of the consumer instance provided by end user.
protocol_type String The group protocol type.
protocol_name String The group protocol name.
assignments List Each assignment.
( member assign 된 결과 리스트)
assignments.member_id Strring The ID of the member to assign.
assignments.assignment bytes The member assignment.
(assignment 결과를 serialize)

SyncGroupResponseData (SCHEMA_5)

name type Desc
throttle_time_ms int The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
error_code int The error code, or 0 if there was no error.
protocol_type String The group protocol type.
(consumer, connect)
protocol_name String The group protocol name.
= assignment.getName()
assignment bytes The member assignment.


Consumer Rebalance

Rebalancing 조건

1. 인스턴스가 timeout안에 heartbeat 실패하여 group에서 제외 되었을 때
2. 인스턴스가 group에 추가되었을 때
3. 구독중인 토픽에 파티션이 추가 되었을 때
4. *로 topic을 subscribe 중에, 조건에 맞는 topic이 추가되었을 때
5. consumer group이 시작 될 때

Rebalance noti

heartbeat response 또는 offsetFetch response를 통해 consumer에게 notify한다.






