개발 일지

ksqlDB 란

북극곰은콜라 2024. 5. 27. 21:48
반응형


ksqlDB란?

2017년 Confluent에서 오픈소스로 공개한 이벤트 스트리밍 데이터베이스이다.
Kafka Streams + Kafka Connect로 구성되며, SQL을 인터페이스로 채용했다.
이는 스트림처리 및 데이터 통합을 단순화시키는 효과를 가진다.

특징

대화형 작업환경을 제공
SQL로 토폴로지를 표현하여 유지보수성 향상
SQL 인터페이스로 인한 “상대적”으로 낮은 진입 장벽
connect와 streams를 한번에 관리하여 아키텍처를 단순화
Streams를 활용한 기능적 확장성을 제공
간편한 설치 및 클라우드 지원
데이터 탐색에 최적화

License

ksqlDB is licensed under the Confluent Community License.

Confluent Community License: https://www.confluent.io/ko-kr/confluent-community-license/

ksqlDB는 Confluent Community이며, 요약하자면 다음과 같다
1. 무단 변경 금지
2. 라이센스의 변경 금지
3. sublicensing 금지
기본적으로 무료 라이선스에 가깝다고 볼 수 있다.

배경 및 Idea

1. Kafka Streams +  SQL

최초 ksqlDB는 Kafka Streams DSL의 가능성을 보고 시작되었으며 KSQL로 명명하여 시작된 프로젝트 었다.
핵심 아이디어는 전통적인 SQL 언어의 이점으로 Streams를 통해 data handling을 구성하는 것이다.
이는 Kafka에서 stream 되는 data를 무한 데이터로 볼 수 있다는 배경이 있다.
SQL을 차용한 배경에는 data handling에 최적화된 언어로, 오랜 기간 사용되며 우수성을 증명한 언어이기 때문이라는 이유가 있다.
최초 개발 당시 KSQL은 무한 데이터를 쿼리 하는 것에 초점이 맞추어져 있었다.
이에 SQL 문법에 방언(dialect)을 적용하는 방식으로 해결하고자 했다.
DDL 및 DML에 추가적인 키워드를 정의했으며, 이를 통해 data stream을 구성하고, 데이터를 쿼리 할 수 있도록 했다.
# DDL EX)
CREATE STREAM XXX (id bigint, date varchar, ...);
DROP STREAM IF EXISTS XXX;

# DML EX)
SELECT id FROM XXX;
INSERT INTO XXX (id, date) VALUES (1, '20240101');

2. ksqlDB로 발전

Kafka Streams가 발전하면서, KSQL로 구성할 수 있는 기능이 더 많아지게 되었다.
이에 따라 KSQL에 많은 기능이 추가되었으며 ksqlDB로 명칭을 변경했다.
그중 핵심이 되는 기능으로, Streams의 StateStore 기능이다.
Streams는 메모리 또는 RocksDB로 topic에 대한 Key-Value Store를 구성할 수 있게 되었다.

ksqlDB는 stateStore를 통해 data의 현재 상태를 쿼리 할 수 있는 기능이 추가되었고 query에 대한 type을 정의했다.
 - push query: 무한 데이터를 조회하는 방식으로 업데이트되는 데이터를 지속적으로 푸시해 준다.
 - pull query: 현재 상태를 조회하는 방식으로 키 기반으로 데이터를 조회할 수 있다.
# PULL QUERY EX)
CREATE TABLE YYY AS SELECT id, COUNT(id) AS FROM XXX GROUP BY date;
SELECT * FROM YYY;

3. Kafka Connect 통합

많은 비즈니스 요구사항은 data Streaming 뿐 아니라, sink와 source도 필요했다.
ksqlDB는 Kafka Connect와 병행되어 사용되었고, 이에 connect에 대한 통합을 요구하는 피드백이 많아졌다.
ksqlDB의 핵심 사상은 고수준 인터페이스를 통해 사용성의 향상이다.
따라서 Kafka Connect의 통합은 자연스럽게 개발되었다.
kafka connect의 sink, source connector를 구성하는 기능이 추가되었으며, 이를 ksqlDB ETL기능으로 명명되었다.
# Connect EX)
CREATE SOURCE CONNECTOR `jdbc-connector` WITH ("connector.class"="io.confluent.connect.jdbc.JdbcSourceConnector", ...);

 


ksqlDB vs RDBMS

유사점

Name Desc
SQL Interface ksqlDB SQL 문법, 파서, 실행 엔진을 포함하고 있다.
DDL, DML ksqlDB SQL은 기존 SQL처럼 DDL DML로 나누어진다.
DDL table streams를 정의하고, DML Data handling을 할 수 있다.
Schema name, type으로 정의되는 schema를 갖는다
내장함수 & 연산자 ksqlDB 또한 연산자 및 함수를 제공한다.

차별점

Name Desc
Pull, Push Query DDL, DML ksqlDB는 쿼리를 PULL QUERY, PUSH QUERY로 나누어진다.
무한데이터를 쿼리하는 것인지, 상태를 쿼리하는 것인지로 나누어진다.
Not Simple Query 일반적이 RDBMS 처럼 쿼리를 위한 쿼리 기능은 지양한다.
쿼리는 하나의 stream이며, 목적성을 가진 쿼리만 작성한다.
Schema Management ksqlDB Kafka 생태계의 Schema Registry를 활용하여, schema의 진화를 관리할 수 있다.
High-Availability & Fault-Tolerant Kafka에서 보장하는 고가용성 및 장애 복구가 적용된다.
Eventually Consistency Data Model은 결과적인 일관성을 베이스로 한다.
event-driven architecture를 베이스로 설계되었기 때문

 


동작 원리

ksqlDB는 Kafka 생태계 메인제품인 Kafka Streams와 Kafka Connect를 활용하여 만들었다.
따라서 동작은 해당 제품의 원리를 따른다.
ksqlDB의 메인 동작은 SQL을 파싱 하여 각 제품의 기능을 사용해 사용자가 원하는 결과를 만드는 것이다.

구성요소

Name Desc
ksqlDB Server SQL Engine, Kafka Streams 등을 포함하며, 사용자의 요청을 수행하는 단위이다.
간단하게 ksqlDB Server 하나는 Kafka Streams Application 하나로 생각해도 무방하다.
ksql.service.id로 각 인스턴스는 아이디를 설정할 수 있는데, 같은 아이디끼리 하나로 묶여서 동작하며, 이를 클러스터라고 부른다. ksql id consumerId와 같은 동작을 한다.
클러스터 내의 각 인스턴스는 Streams처럼 파티션을 나눠 갖고, 실행된다.
SQL Engine SQL parser를 포함하고 있으며, 주요 기능은 SQL을 분석하고, Streams 토폴로지로 변환하는 것이다.
REST Service (Interface) SQL Engine의 인터페이스이다. 사용자는 REST를 통해 SQL Engine과 상호작용 할 수 있다.
ksqlDB CLI cli를 통해 사용자에게 SQL Engine과 상호작용 할 수 있는 Applicaiton이다.
ksqlDB UI Confluent에서 제공하는 Web UI

ksqlDB 모드

Interactive mode (대화형)

사용자가 실시간으로 쿼리를 제출하며, ksqlDB Server가 해당 쿼리를 parsing 및 execute 한다.
모든 쿼리는 command topic이라는 internal-topic으로 publish 되며, 이는 클러스터 내 한번 실행됨을 의미한다.
정리하면, 모든 ksqlDB는 command topic을 subscribe 하고 있으며, 각각의 consumer설정과 partition설정을 통해 가져가서 SQL Engine에게 넘겨준다.

Headless mode (사전정의형)

REST API 등 실시간 입력을 차단하고, 사전정의 된 Query만 실행하는 모드이다.
기동 시 properties로 실행할 sql의 위치를 지정해줘야 한다.
속성은 queries.file로 정의한다.

 


KSQL Language

keywords

참고: https://docs.ksqldb.io/en/latest/reference/sql/appendix/#keywords

Name Desc Example
CREATE create an object CREATE STREAM rock_songs (artist VARCHAR, title VARCHAR) …
DROP delete an object DROP CONNECTOR <connector-name>;
ADD add column ADD (COLUMN)? identifier type;
SELECT FROM query a stream or table SELECT * FROM metrics EMIT CHANGES;
WHERE filter records by a condition SELECT * FROM pageviews WHERE pageid < 'Page_20'
INSERT INTO insert new records in a stream/table INSERT INTO <stream-name> ...
DELETE remove a Kafka topic DROP TABLE <table-name> DELETE TOPIC;
GROUP BY group rows with the same values SELECT regionid, COUNT(*) FROM pageviews GROUP BY regionid
HAVING condition expression GROUP BY card_number HAVING COUNT(*) > 3
JOIN match records in streams/tables CREATE TABLE t AS SELECT * FROM l INNER JOIN r ON l.ID = r.ID;
KEY specify key column CREATE TABLE users (userId INT PRIMARY KEY, …
EXPLAIN show execution plan EXPLAIN <query-name>; or EXPLAIN <expression>;
AS alias a column, expression, or type  
CASE, WHEN, THEN, ELSE, END select a condition from expressions SELECT CASE WHEN condition THEN result [ WHEN … THEN … ] … END
IF EXIST test whether object exists DROP STREAM IF EXISTS <stream-name>;
PRINT output records in a topic PRINT <topic-name> FROM BEGINNING;
PROPERTIES list all properties LIST PROPERTIES; or SHOW PROPERTIES;
QUERIES list all queries LIST QUERIES; or SHOW QUERIES;
TOPIC specify a Kafka topic DROP TABLE <table-name> DELETE TOPIC;
TOPICS list all topics LIST TOPICS; or SHOW TOPICS;
PARTITION BY repartition a stream PARTITION BY <key-field>
PARTITIONS partitions to distribute keys over CREATE STREAM users_rekeyed WITH (PARTITIONS=6) AS …
CONNECTOR manage a connector CREATE SOURCE CONNECTOR 'jdbc-connector' WITH( …
CONNECTORS list all connectors SHOW CONNECTORS;

Data Types

참고: https://docs.ksqldb.io/en/latest/reference/sql/data-types/

Type Name Storage size range Desc Backing Java Type
Boolean boolean     value representing true or false java.lang.Boolean
Character varchar, string     variable-length string java.lang.String
Numeric bytes     variable-length byte array byte []
int 4 bytes -231 to 231-1 typical choice for integer Integer
bigint 8 bytes -263 to 263-1 large-range integer Long
double 8 bytes 2-1074† to (2-2-52)·21023 variable-precision, inexact Double
decimal value dependent n/a user-specified precision, exact BigDecimal
Time time     value representing a time of day in millisecond precision. java.sql.Time
date     value representing a calendar date independent of time zone. java.sql.Date
timestamp     value representing a point in time in millisecond precision without timezone information java.sql.Timestamp
Compound array     sequence of values of a single type Java native array
struct     a strongly typed structured data type org.apache.kafka.connect.data.Struct
map     a mapping of keys to values java.util.map

 


ksqlDB 활용

INSTALL

// ksqlDB-server 설치
docker run -d  -p 127.0.0.1:8088:8088  -e KSQL_BOOTSTRAP_SERVERS=xxx.xxx.xxx.xxx:9092  -e KSQL_LISTENERS=http://0.0.0.0:8088/  -e KSQL_KSQL_SERVICE_ID=ksql_service_x confluentinc/ksqldb-server:0.29.0

// ksqlCli 설치 및 실행
docker run -it --net host confluentinc/ksqldb-cli ksql http://127.0.0.1:8088
ksqlDB는 Kafka의 consumer/producer와 같은 레이어이며 로컬에서 테스트 가능하다.
도커로 간편하게 설치 가능하며 내부 포트, bootstrap server, serviceid 정도 세팅했다.

ksqlDB Stream Test

1. topic 확인

ksql> show topics;
 Kafka Topic                                            | Partitions | Partition Replicas 
 ...                
 quickstart-avro-config                                 | 1          | 1                  
 quickstart-avro-offsets                                | 25         | 1                  
 quickstart-avro-status                                 | 5          | 1                  
 schema-changes.mysql                                   | 1          | 1                  
 schema-changes.mysql.event_history                     | 1          | 1                  
 survey.debezium                                        | 1          | 1                  
 survey.debezium.survey.event_history                   | 1          | 1                  
 test.ksql.1                                            | 1          | 1
test.ksql.1 토픽으로 Stream을 생성할 것이다.
해당 토픽은 사전 생성했다.

2. Create Stream

ksql> CREATE STREAM TEST_STREAM_1 (id VARCHAR KEY, desc STRING) WITH (KAFKA_TOPIC='test.ksql.1', VALUE_FORMAT='json');
CREATE STREAM TEST_STREAM_1 (id VARCHAR KEY, desc STRING) WITH (KAFKA_TOPIC='test.ksql.1', VALUE_FORMAT='json')
 Message        
 Stream created
심플한 stream을 생성했다.
stream 이름은 TEST_STREAM_1이며 schema는 id와 desc가 문자열이다.

3. INSERT DATA

ksql> INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a');
INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a')
ksql> INSERT INTO TEST_STREAM_1 VALUES ('b', 'test b');
INSERT INTO TEST_STREAM_1 VALUES ('b', 'test b')
ksql> INSERT INTO TEST_STREAM_1 VALUES ('c', 'test c');
INSERT INTO TEST_STREAM_1 VALUES ('c', 'test c')
ksql> INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a');
INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a')
ksql> INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a');
INSERT INTO TEST_STREAM_1 VALUES ('a', 'test a')
간단하게 Data를 insert 할 수 있다.

4. PUSH Query

# no data (cause by offset properties)
ksql> SELECT * FROM TEST_STREAM_1 EMIT CHANGES;
+----------------------------------------+----------------------------------------+
|ID                                      |DESC                                    |
+----------------------------------------+----------------------------------------+
# set offset reset
ksql> set 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
# SELECT data
ksql> SELECT * FROM TEST_STREAM_1 EMIT CHANGES;
+----------------------------------------+----------------------------------------+
|ID                                      |DESC                                    |
+----------------------------------------+----------------------------------------+
|a                                       |test a                                  |
|b                                       |test b                                  |
|c                                       |test c                                  |
|a                                       |test a                                  |
|a                                       |test a                                  |
ksqlDB가 Streams를 사용하기 때문에, reset 정책도 당연하게 가져간다.
offset 정책을 earliest로 해서 모든 데이터를 초기 offset부터 읽을 수 있도록 설정할 수 있다.
결과 table이 기존 rdbms랑 다르게 밑줄이 없는 것은, data 가 streaming 되기 때문이다.

5. Query with GROUP BY

ksql> SELECT id, COUNT(*) AS CNT FROM TEST_STREAM_1 GROUP BY id EMIT CHANGES;
+-----------------------------------------+-----------------------------------------+
|ID                                       |CNT                                      |
+-----------------------------------------+-----------------------------------------+
|b                                        |1                                        |
|c                                        |1                                        |
|a                                        |3                                        |
STREAM에서 id를 기준으로 group by 하여 count 결과를 select 했다.
이는 PUSH Query로 조회했으며, 실시간으로 CNT가 반영되는 것을 볼 수 있다.

6. drop stream

ksql> drop stream TEST_STREAM_1;
 Message
----------------------------------------------------------
 Source `TEST_STREAM_1` (topic: test.ksql.1) was dropped.
----------------------------------------------------------
ksql> SHOW STREAMS;
 Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------
------------------------------------------------------------------
간단하게 STREAM을 삭제할 수 있다.

 


ksqlDB Table Test

1. CREATE Table

ksql> CREATE TABLE TEST_TABLE_1 (id VARCHAR PRIMARY KEY, desc STRING) WITH (KAFKA_TOPI
C='test.ksql.1', VALUE_FORMAT='json');
 Message
---------------
 Table created
---------------
ksql> SHOW TABLES;
 Table Name   | Kafka Topic | Key Format | Value Format | Windowed
-------------------------------------------------------------------
 TEST_TABLE_1 | test.ksql.1 | KAFKA      | JSON         | false
-------------------------------------------------------------------
Simple하게 테이블을 생성했다.

2. SELECT Data

ksql> set 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM TEST_TABLE_1 EMIT CHANGES;
+-----------------------------------------+-----------------------------------------+
|ID                                       |DESC                                     |
+-----------------------------------------+-----------------------------------------+
|b                                        |test b                                   |
|c                                        |test c                                   |
|a                                        |test a                                   |
STREAM과 다르게 pk로 데이터가 override 된 것을 확인할 수 있다.

 


Extend KSQL Guide

SubQuery

아쉽게도 ksql은 서브쿼리를 지원하지 않으며, 현재 이에 관련된 개발 로드맵은 없다.
ksqlDB는 subQuery를 통한 복잡도는 프로젝트 성격 상 맞지 않다고 판단했으며
SubQuery가 아닌 여러 Query를 Kafka로 통합하는 방향을 지향하고 있다.

 


Join

KSQL의 join은 기존 Join과 다른 양상을 보인다.
이는 Source가 되는 Record의 structure가 Stream 또는 Table로 나누어지기 때문이다.
ksqlDB 진영에서 JOIN의 의미는 UPSTREAM의 데이터를 특정 기준으로 합쳐서 DOWNSTREAM을 생성하는 것이다.
따라서 PULL Query로는 JOIN 할 수 없다.
// Pull queries don't support JOIN clauses.

종류

JOIN은 3종류가 있다.
INNER JOIN: 양쪽 Record의 키가 같을 때 사용할 수 있는 JOIN
LEFT JOIN: LEFT Record를 기준으로 downStream을 형성한다. 즉 FROM 절의 record가 들어올 때 downStream record를 생성하며, join 할 대상이 없는 경우 null이 된다.
FULL JOIN: 양쪽 모두의 Record Stream을 받아서 downStream을 형성한다. 또한 JOIN 할 대상이 없다면 null이 된다.

Full Flow Example 참고: https://docs.ksqldb.io/en/latest/developer-guide/joins/join-streams-and-tables/


제약사항

Support Combination Support JOIN Function Etc
Stream - Stream INNER JOIN
LEFT JOIN
FULL JOIN
Stream간의 JOIN Window 설정을 해야한다.
이는 Stream은 무한하기 때문에, JOIN 시점에 전체 데이터를 알 수 있는 기준이 없기 때문
Stream - Table INNER JOIN
LEFT JOIN
Table PK 컬럼만 JOIN에 사용할 수 있다. 이는 KTable repartition이 불가하기 때문이다.
또한 최근 지원되는 Multi-PK의 경우 그 중 하나만 선택해서 JOIN이 가능하다. JOIN … ON … AND는 불가하다.
Table - Table INNER JOIN
LEFT JOIN
FULL JOIN
 
JOIN의 큰 조건은 두 대상의 파티션 수가 같아야 하며, 동일 파티션 어싸이너를 가져야 한다.
이는 key를 통한 JOIN을 할 때 파티션 매칭을 위한 것이다.
또한 JOIN KEY는 모두 동일 데이터 타입이어야 한다.

N-Way Joins

CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B ON A.id = B.product_id
    JOIN C ON A.id = C.purchased_id;
최신 버전 ksqlDB는 여러 Stream / Table의 JOIN을 지원한다.
제약사항으로 JOIN 구성 중 STREAM, TABLE 모두 있다면 FULL JOIN은 지원하지 못한다.

 


ksqlDB Java Client

참고: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/

ksqlDB를 Java Applicaiton에서 사용할 수 있도록, Confluent에서 Java Client Library를 제공한다.

Gradle

implementation 'io.confluent.ksql:ksqldb-api-client:7.5.3'

Client init

ClientOptions options = ClientOptions.create()
    .setHost(KSQLDB_SERVER_HOST)
    .setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);
// Send requests with the client by following the other examples
// Terminate any open connections and close the client
client.close();
Client는 host, port로 간단하게 생성할 수 있다.

Client

public interface Client {
  /**
   * Executes a query (push or pull) and returns the results one row at a time.
   * 
   * <p>This method may be used to issue both push and pull queries, but the usage 
   * pattern is better for push queries. For pull queries, consider using the 
   * {@link Client#executeQuery(String)} method instead.
   *
   * <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
   * failed.
   *
   * @param sql statement of query to execute
   * @return a future that completes once the server response is received, and contains the query
   *         result if successful
   */
  CompletableFuture<StreamedQueryResult> streamQuery(String sql);
  ...
  /**
   * Executes a query (push or pull) and returns all result rows in a single batch, once the query
   * has completed.
   * 
   * <p>This method is suitable for both pull queries and for terminating push queries,
   * for example, queries that have a {@code LIMIT} clause. For non-terminating push queries,
   * use the {@link Client#streamQuery(String)} method instead.
   *
   * @param sql statement of query to execute
   * @return query result
   */
  BatchedQueryResult executeQuery(String sql);
  ...
}
여러 method를 지원한다.
그중 streamQuery()는 CompletableFuture로 실행결과에 대한 future를 받는 형태이며, PUSH Query에 권장되는 메서드이다.
executeQuery는 PULL Query에 적합하다. 또한 PUSH Query 중 LIMIT 등으로 끝이 있는 쿼리에도 사용할 수 있다.
그 외 INSERT 등 여러 메서드를 지원한다.
// 아쉽게도 Spring 진영에서 integration을 지원하고 있진 않다.

 


Conclusion

ksqlDB는 Kafka를 통한 Event-Driven System에서 고수준 인터페이스를 통해 Stream처리 및 source, sink를 구성할 수 있는 product이다.
기본적으로 SQL을 확장하여 인터페이스를 구성했으며, 상호작용을 위한 대화형 CLI, REST API를 제공한다.
Confluent에서 관리되고 있으며, kafka update에 맞춰 적극적인 업데이트가 이루어지고 있다.

활용방안

스팟 성 요구사항 대응

일시적으로 기존 비즈니스와 다른 요구사항이 발생했을 때 적은 비용으로 대응이 가능하다.
이는 Kafka의 장점을 손쉽게 활용할 수 있다.

Develop 환경에서 활용

개발 중 임시로 활용할 데이터 생성이나 기존 데이터를 복제하는 등 일시적인 stream 구성을 할 때 활용할 수 있다.
이는 개발 생산성을 올릴 수 있는 방안으로 생각된다.

잦은 변경이 예상되는 비즈니스

SQL 대화형 인터페이스 또는 UI로 Data Processing 로직 변경이 가능하기 때문에
잦은 변경이 예상되는 부분을 ksqlDB로 구성하여, 즉각적인 반영이 가능하다.

Data ETL

복잡한 비즈니스 없는 ETL 요구사항에 활용될 수 있다.
ksqlDB는 단순한 비즈니스를 구성하는데 큰 생산성 향상을 가져오며, 유지관리에도 용이한 장점이 있다.
이는 Kafka Connect + Schema Registry + Streams를 통합하여 제공하기 때문이며
데이터의 source + processing + sink를 한 번에 관리할 수 있기 때문이다.

 


REFERENCE

도서 - OREILLY의 Mastering Kafka Streams and ksqlDB

https://docs.ksqldb.io/en/latest/reference/sql/syntax/lexical-structure/

https://docs.ksqldb.io/en/latest/operate-and-deploy/installation/install-ksqldb-with-docker/

 

 

반응형

'개발 일지' 카테고리의 다른 글

VoltDB란  (0) 2024.06.03
Java Reactive Streams Publisher / Subscriber 분석 (projectreactor)  (0) 2024.03.26
Spring Webflux with EventListener  (0) 2024.03.20
[Kafka] Parallel Consumer  (0) 2024.03.19
TURN Protocol (+ STUN Message)  (1) 2024.03.18