[완] 개인서버 개발/공통 서비스 개발(완)

#3 Confluent Platform 설치

북극곰은콜라 2023. 3. 29. 23:28
반응형


개요

Confluent Platform은 kafka 기반의 관리 플랫폼이며, 데이터 파이프라인 구성, 스트리밍 애플리케이션을 쉽게 구축 및 관리할 수 있게 해 준다.
아파치 카프카의 최초 개발자들이 만든 회사이며, 카프카를 활용한 시스템을 구축할 때 최고로 평가받고 있다.
이를 활용해서 event 기반 아키텍처를 구성해보려 한다.

 


Confluent Platform 설치

Confluent Hub 설치

curl http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz --output confluent-hub-client-latest.tar.gz
tar xzfv confluent-hub-client-latest.tar.gz

sudo vi ~/.bashrc

// 맨 밑에 추가
PATH=$PATH:/{ConfluentHub 설치경로}/bin

hub 파일 다운로드 및 환경변수 설정


docker-compose.yml 준비

curl --silent --output docker-compose.yml \
  https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.2-post/cp-all-in-one/docker-compose.yml

위 명령어로 docker-compose.yml 파일을 받습니다.

 

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: always

  broker:
    image: confluentinc/cp-server:7.3.2
    hostname: broker
  
  ....

자동 재기동을 위해서 restart: always를 각 모듈마다 추가해 줍니다.

또한 필요 없는 ksql-datagen 모듈을 삭제합니다.

cp-server-connect 모듈 설정을 합니다.

 

참고용으로 제 설정은

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: always

  broker:
    image: confluentinc/cp-server:7.3.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    restart: always

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    restart: always

  kafka-connect:
    image: confluentinc/cp-server-connect:7.3.2
    container_name: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars,/mnt/datadisk/kafkaConnector
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run :
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    volumes:
      - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.3.0
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity
    restart: always

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.3.2
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - kafka-connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'kafka-connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    restart: always

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - kafka-connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
    restart: always

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.3.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - kafka-connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    restart: always

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.3.2
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
    restart: always

입니다.

추후 mongoDB를 활용한 source 및 sink connector를 사용할 것이기에 해당 connector를 설치한 상태로 kafka-connect를 올렸습니다.

수정사항:
mongoDB connector 최신버전 sink connector 부분에 해결이 애매한 이슈가 있어
1.3.0버전으로 차용

문제 로그

더보기
[2023-03-30 02:17:05,413] ERROR Uncaught exception in REST call to /connector-plugins/com.mongodb.kafka.connect.MongoSinkConnector/config/validate (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.util.concurrent.ExecutionException: java.lang.NullPointerException
	at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:107)
	at org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource.validateConfigs(ConnectorPluginsResource.java:134)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.Server.handle(Server.java:516)
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:178)
	at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Collectors.java:1136)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.mongodb.kafka.connect.util.config.ConfigSoftValidator.logIncompatibleProperties(ConfigSoftValidator.java:79)
	at com.mongodb.kafka.connect.sink.SinkConfigSoftValidator.logIncompatibleProperties(SinkConfigSoftValidator.java:143)
	at com.mongodb.kafka.connect.sink.MongoSinkConfig$1.validateAll(MongoSinkConfig.java:206)
	at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:533)
	at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
	at com.mongodb.kafka.connect.MongoSinkConnector.validate(MongoSinkConnector.java:83)
	at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:564)
	at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$4(AbstractHerder.java:442)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more

 

Confluent Platform 실행

docker compose up -d

docker compose를 통해서 모든 모듈을 올립니다.

// -d 옵션은 백그라운드에서 처리하겠다는 옵션

// 저는 도커를 마운트 한 대용량 하드에 설치했기 때문에, 도커에 설치되는 모든 컨테이너 및 이미지의 부담을 줄였습니다.

 


설치 확인

설치된 서버의 9021 포트로 접근하면 control center 페이지가 나옵니다.

전체적인 클러스터 상황, 토픽, 커넥터 등을 파악 및 핸들링할 수 있습니다.

 

MongoDB 관련 connector도 설치 및 등록되어있는 것을 확인할 수 있습니다.

 


주저리...

이번 작업은 시행착오가 꽤나 많았습니다.

우선 Confluent Platform의 사용이 처음이라 초기 설정부터 많이 힘들었었네요

처음에 그냥 설치하려 했지만 우분투 22 버전을 원활하게 지원하지 못하는 상황이 발생해서 도커에 설치를 했습니다.

그리고 커넥터를 docker compose를 통해 connect 이미지에 밀어 넣는 부분이 쫌 힘들었습니다.

 

사실 connector가 정상적으로 사용 가능할지는 아직 모르겠습니다.

그래도 이제 confluent platform이랑 인사하고 자기소개 정도 끝난 상태에 만족해보려 합니다.


REFERENCE

1. https://docs.confluent.io/platform/current/platform-quickstart.html#step-1-download-and-start-cp

2. https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-zero-to-hero/docker-compose.yml#L89-L101

3. https://docs.confluent.io/platform/current/installation/docker/development.html#create-a-docker-image-containing-local-connectors

4. https://www.confluent.io/hub/mongodb/kafka-connect-mongodb

5. https://docs.confluent.io/platform/current/installation/docker/image-reference.html#docker-image-reference-for-cp

6. https://hub.docker.com/u/confluentinc/

 

 

 

반응형