개요
Confluent Platform은 kafka 기반의 관리 플랫폼이며, 데이터 파이프라인 구성, 스트리밍 애플리케이션을 쉽게 구축 및 관리할 수 있게 해 준다.
아파치 카프카의 최초 개발자들이 만든 회사이며, 카프카를 활용한 시스템을 구축할 때 최고로 평가받고 있다.
이를 활용해서 event 기반 아키텍처를 구성해보려 한다.
Confluent Platform 설치
Confluent Hub 설치
curl http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz --output confluent-hub-client-latest.tar.gz
tar xzfv confluent-hub-client-latest.tar.gz
sudo vi ~/.bashrc
// 맨 밑에 추가
PATH=$PATH:/{ConfluentHub 설치경로}/bin
hub 파일 다운로드 및 환경변수 설정
docker-compose.yml 준비
curl --silent --output docker-compose.yml \
https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.2-post/cp-all-in-one/docker-compose.yml
위 명령어로 docker-compose.yml 파일을 받습니다.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
broker:
image: confluentinc/cp-server:7.3.2
hostname: broker
....
자동 재기동을 위해서 restart: always를 각 모듈마다 추가해 줍니다.
또한 필요 없는 ksql-datagen 모듈을 삭제합니다.
cp-server-connect 모듈 설정을 합니다.
참고용으로 제 설정은
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
broker:
image: confluentinc/cp-server:7.3.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
restart: always
schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
restart: always
kafka-connect:
image: confluentinc/cp-server-connect:7.3.2
container_name: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars,/mnt/datadisk/kafkaConnector
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.3.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
restart: always
control-center:
image: confluentinc/cp-enterprise-control-center:7.3.2
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- kafka-connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'kafka-connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
restart: always
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- kafka-connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
restart: always
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.3.2
container_name: ksqldb-cli
depends_on:
- broker
- kafka-connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
restart: always
rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.2
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
restart: always
입니다.
추후 mongoDB를 활용한 source 및 sink connector를 사용할 것이기에 해당 connector를 설치한 상태로 kafka-connect를 올렸습니다.
수정사항:
mongoDB connector 최신버전 sink connector 부분에 해결이 애매한 이슈가 있어
1.3.0버전으로 차용
문제 로그
[2023-03-30 02:17:05,413] ERROR Uncaught exception in REST call to /connector-plugins/com.mongodb.kafka.connect.MongoSinkConnector/config/validate (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:107)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource.validateConfigs(ConnectorPluginsResource.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:178)
at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Collectors.java:1136)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at com.mongodb.kafka.connect.util.config.ConfigSoftValidator.logIncompatibleProperties(ConfigSoftValidator.java:79)
at com.mongodb.kafka.connect.sink.SinkConfigSoftValidator.logIncompatibleProperties(SinkConfigSoftValidator.java:143)
at com.mongodb.kafka.connect.sink.MongoSinkConfig$1.validateAll(MongoSinkConfig.java:206)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:533)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
at com.mongodb.kafka.connect.MongoSinkConnector.validate(MongoSinkConnector.java:83)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:564)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$4(AbstractHerder.java:442)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Confluent Platform 실행
docker compose up -d
docker compose를 통해서 모든 모듈을 올립니다.
// -d 옵션은 백그라운드에서 처리하겠다는 옵션
// 저는 도커를 마운트 한 대용량 하드에 설치했기 때문에, 도커에 설치되는 모든 컨테이너 및 이미지의 부담을 줄였습니다.
설치 확인
설치된 서버의 9021 포트로 접근하면 control center 페이지가 나옵니다.
전체적인 클러스터 상황, 토픽, 커넥터 등을 파악 및 핸들링할 수 있습니다.
MongoDB 관련 connector도 설치 및 등록되어있는 것을 확인할 수 있습니다.
주저리...
이번 작업은 시행착오가 꽤나 많았습니다.
우선 Confluent Platform의 사용이 처음이라 초기 설정부터 많이 힘들었었네요
처음에 그냥 설치하려 했지만 우분투 22 버전을 원활하게 지원하지 못하는 상황이 발생해서 도커에 설치를 했습니다.
그리고 커넥터를 docker compose를 통해 connect 이미지에 밀어 넣는 부분이 쫌 힘들었습니다.
사실 connector가 정상적으로 사용 가능할지는 아직 모르겠습니다.
그래도 이제 confluent platform이랑 인사하고 자기소개 정도 끝난 상태에 만족해보려 합니다.
REFERENCE
1. https://docs.confluent.io/platform/current/platform-quickstart.html#step-1-download-and-start-cp
4. https://www.confluent.io/hub/mongodb/kafka-connect-mongodb
6. https://hub.docker.com/u/confluentinc/
'[완] 개인서버 개발 > 공통 서비스 개발(완)' 카테고리의 다른 글
#6 Webflux기반 OAuth2서버 + gateway 구축 (0) | 2023.05.14 |
---|---|
#5 와우 경매장 데이터 적재 (0) | 2023.04.09 |
#4 MongoDB 설치 (0) | 2023.03.30 |
#2 Jenkins를 통한 CI/CD 환경 구축 with docker (0) | 2023.03.17 |
#1 데이터 스트리밍 시스템 구축 기획 (4) | 2023.03.14 |