반응형

개요
Kafka Consumer에 대한 세부 조사
Kafka Client와 Kafka Broker의 rebalance flow 및 requset / response data 조사
Kafka Consumer Assignor 관련 조사
Kafka Consumer STW 관련 조사
Consumer Startup
Group Coordinator 란

Kafka Broker에서 Consumer Group을 관리하기 위한 컴포넌트
기본적으로 Consumer Group 별로 존재
Cluster내 임의의 broker에 생성 됨
Consumer Group내 member의 변동을 감지하고 Rebalance를 관장
kafka topic을 이용해서 group의 metadata를 유지
총 3가지 단계로 coordinator와 consumer간의 협상이 일어난다.
1. start group: coordinator를 찾는 단계
2. join group: coordinator를 통해 consumer가 group에 join 하는 단계
3. sync group: join된 group에서 토픽 / 파티션을 sync하는 단계
1. Start Group

일반적으로 Consumer가 Startup 할 시 group.id 를 기반으로 findCoordinator 요청을 한다.
broker는 group.id를 기반으로 group coordinator를 생성할 브로커를 선정 및 생성하고 해당 coordinator의 endpoint를 응답
findCoordinatorRequestData (SCHEMA_4)
| name | type | Desc |
| key_type | INT8 | The coordinator key type. (Group, transaction, etc.) |
| coordinator_keys | String | The coordinator keys. |
findCoordinatorResponseData (SCHEMA_4)
| name | type | Desc |
| throttle_time_ms | INT32 | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
| coordinators | list | Each coordinator result in the response |
| coordinatorData.key | String | The coordinator key. |
| coordinatorData.node_id | INT32 | The node id. |
| coordinatorData.host | String | The host name. |
| coordinatorData.port | INT32 | The port. |
| coordinatorData.error_code | INT16 | The error code, or 0 if there was no error. |
| coordinatorData.error_message | String | The error message, or null if there was no error. |
2. Join Group

consumer들은 group coordinator로 join 요청 (구독 정보 포함)
group coordinator는 그룹에서 최초로 join 요청을 보낸 consumer를 leader로 선정
(최초 생성 시) Consumer Group의 offset 정보 등 group 관련 데이터 생성
consumer에게 memberId를 응답, leader에게는 memberList 및 subscription 정보 전달
JoinGroupRequestData (SCHEMA_8)
| name | type | Desc |
| group_id | String | The group identifier. |
| session_timeout_ms | int | The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds. |
| rebalance_timeout_ms | int | The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group. |
| member_id | String | The member id assigned by the group coordinator. |
| group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
| protocol_type | String | The unique name the for class of protocols implemented by the group we want to join. (consumer, connect) |
| reason | String | The reason why the member (re-)joins the group. |
| protocols | list<Protocol> | The list of protocols that the member supports. (Partition Assignment) |
| protocol.name | String | The protocol name. (Assigner Name) |
| protocol.metadata | bytes | The protocol metadata. (Subscription 객체를 serialize) |
JoinGroupResponseData (SCHEMA_9)
| name | type | Desc |
| throttle_time_ms | int | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
| error_code | int | The error code, or 0 if there was no error. |
| generation_id | int | The generation ID of the group. |
| protocol_type | String | The group protocol name. |
| protocol_name | String | The group protocol selected by the coordinator. |
| leader | String | The leader of the group. |
| skip_assignment | boolean | True if the leader must skip running the assignment. |
| member_id | String | The member ID assigned by the group coordinator. |
| members | List<Member> | |
| member.member_id | String | The group member ID. |
| member.group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
| member.metadata | bytes | The group member metadata. |
3. SyncGroup

Consumer Group Leader는 구독할 파티션에 대한 전략을 수행
각 consumer member가 구독할 파티션 정보를 coordinator로 전달
다른 consumer들은 coordinator로 지속적으로 자신이 구독할 파티션에 대한 sync 요청을 보냄
consumer partition assign 전략은 다음편에 자세히 설명
SyncGroupRequestData (SCHEMA_5)
| name | type | Desc |
| group_id | String | The unique group identifier. |
| generation_id | int | The generation of the group. |
| member_id | String | The member ID assigned by the group. |
| group_instance_id | String | The unique identifier of the consumer instance provided by end user. |
| protocol_type | String | The group protocol type. |
| protocol_name | String | The group protocol name. |
| assignments | List | Each assignment. (각 member 별 assign 된 결과 리스트) |
| assignments.member_id | Strring | The ID of the member to assign. |
| assignments.assignment | bytes | The member assignment. (assignment 결과를 serialize) |
SyncGroupResponseData (SCHEMA_5)
| name | type | Desc |
| throttle_time_ms | int | The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. |
| error_code | int | The error code, or 0 if there was no error. |
| protocol_type | String | The group protocol type. (consumer, connect) |
| protocol_name | String | The group protocol name. = assignment.getName() |
| assignment | bytes | The member assignment. |
Consumer Rebalance
Rebalancing 조건
1. 인스턴스가 timeout안에 heartbeat 실패하여 group에서 제외 되었을 때
2. 인스턴스가 group에 추가되었을 때
3. 구독중인 토픽에 파티션이 추가 되었을 때
4. *로 topic을 subscribe 중에, 조건에 맞는 topic이 추가되었을 때
5. consumer group이 시작 될 때
Rebalance noti

heartbeat response 또는 offsetFetch response를 통해 consumer에게 notify한다.
REFERENCE
- https://kafka.apache.org/documentation/#majordesignelements
- https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/consumer/package-summary.html
- https://velog.io/@hyun6ik/Apache-Kafka-Partition-Assignment-Strategy
- https://developer.confluent.io/learn-kafka/architecture/consumer-group-protocol/
- https://github.com/apache/kafka/tree/3.4
반응형
'개발 일지' 카테고리의 다른 글
| Kafka Consumer #3 Consume 로직 (0) | 2023.06.21 |
|---|---|
| Kafka Consumer #2 Partition Assignor Strategy (0) | 2023.06.15 |
| Expand Kafka Cluster (0) | 2023.06.13 |
| Webclient 동작 원리 및 Configuration (0) | 2023.05.30 |
| HttpClient (Netty) Configuration 정리 (0) | 2023.05.25 |