[완] 개인서버 개발/공통 서비스 개발(완)

#5 와우 경매장 데이터 적재

북극곰은콜라 2023. 4. 9. 21:23
반응형


개요

와우 경매장 데이터를 한 시간마다 데이터를 GET해서 kafka로 publish 하며, publish 된 데이터를 sink connector를 통해 MongoDB에 raw data를 적재하는 프로세스에 대한 설계 및 구현

project 소스: https://github.com/p-bear/data-collect-server

프로젝트

 - spring boot project

 - spring-kafka를 통한 producer 구현

 - Logback을 통한 로깅

 - Java HttpClient를 통한 REST 통신

CI / CD

 - Jenkinsfile을 통한 빌드 관리

 - Dockerfile을 통한 이미지 빌드

Connector

 - Kafka-connect를 통한 connector 추가 및 관리

 - MongoSinkConnector

DB

 - MongoDB

 


구현사항

data-collect-server

gradle.build

plugins {
    id 'java'
    id 'org.springframework.boot' version '2.7.10'
    id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.pbear'
version = '1.1.4'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'com.fasterxml.jackson.core:jackson-core:2.11.4'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.4'

    implementation 'org.springframework.kafka:spring-kafka:2.9.6'

    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    implementation 'com.google.code.gson:gson:2.8.9'

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

tasks.named('test') {
    useJUnitPlatform()
}

서버역할을 할것은 아니라서 spring-boot-starter만 사용

spring-kafka를 통해 손쉬운 카프카 producer 구현

 

application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: {ip}:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        max:
          request:
            size: 5008588
        compression:
          type: snappy

server:
  name: data-collect-server

blizzard:
  auth:
    uri: https://oauth.battle.net
  api:
    uri: https://kr.api.blizzard.com

kafka 키는 String, value는 json

request size를 조금 키우고

snappy로 압축을 쫌 진행한다 (속도가 중요하진 않기 때문에)

 

Blizzard 토큰 발급

블리자드 developer 페이지의 가이드로, client_credential 토큰을 발급받아 추후 데이터 관련 API를 사용한다.

 

와우 경매장 데이터 GET

Connected Realm Id를 받아와서 그 값을 통해 경매장 데이터를 조회한다.

와우 내 경매장은 여러 서버가 묶인 Realm 단위로 경매장이 묶여있기에 개별적으로 GET 해야 한다.

 

데이터 Publish

public void publishWowAuctionData() {
    GetRealmRes getRealmRes;
    try {
      getRealmRes = this.blizzardApiService.getRealm("hyjal");
    } catch (IOException | InterruptedException e) {
      log.error("fail to get hyjal realm from blizzard", e);
      throw new RuntimeException(e);
    }

    String connectedRealm = getRealmRes.getConnectedRealm().getHref()
        .split("/connected-realm/")[1]
        .split("\\?")[0];
    GetAuctionsRes getAuctionsRes;
    try {
      getAuctionsRes = this.blizzardApiService.getAuctions(Long.parseLong(connectedRealm));
    } catch (IOException | InterruptedException e) {
      log.error("fail to get Auction data from blizzard", e);
      throw new RuntimeException(e);
    }

    String eventId = UUID.randomUUID().toString();
    Long issueTimestamp = new Date().getTime();
    getAuctionsRes.getAuctions().stream()
        .filter(Objects::nonNull)
        .map(auctionData -> EventMessage.builder()
            .id(eventId)
            .timestamp(issueTimestamp)
            .version(1L)
            .data(auctionData)
            .build())
        .forEach(eventMessage -> this.kafkaEventProduceService.sendSimpleMessage(TOPIC_COLLECT_WOW_AUCTION, eventMessage));
  }

1. hyjal이 속한 realm id를 받아온다

2. Auction 데이터를 받아온다

3. Auction에서 아이템 별로 쪼개서 publish 한다.

 

이벤트 데이터 설계

@Getter
@Setter
@Builder
public class EventMessage<T> {
  private String id;
  private String issuer;
  private Long timestamp;
  private Long version;
  private T data;
}

 - id: 이벤트의 id, transactionId의 역할도 한다.

 - issuer: 이벤트의 이슈 발급자

 - timestamp: 이벤트 생성 일시

 - version: 이벤트 메시지의 버전

 - data: 이벤트 데이터

 

Scheduler

@SpringBootApplication
@EnableScheduling
public class DataCollectServerApplication {
  public static void main(String[] args) {
    SpringApplication.run(DataCollectServerApplication.class, args);
  }
}


@Scheduled(cron = "0 10 * * * ?")
    public void collectWowAuctionData() {
    log.info("collect wow auctionData start");
    this.wowDataCollectService.publishWowAuctionData();
    log.info("collect wow auctionData end");
}

스케쥴링 활성화 후 매시 10분에 데이터 collect 및 publish를 트리거링

 

Dockerfile

FROM openjdk:11

RUN ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime

ARG JAR_FILE=./build/libs/*.jar
COPY ${JAR_FILE} ./app.jar
ENTRYPOINT ["java","-jar","-Dspring.profiles.active=release", "-Duser.timezone=Asia/Seoul","/app.jar"]

 - openjdk 11 버전 사용

 - timezone 아시아 서울로 맞춤

 - spring profile을 release로 배포

 

Jenkinsfile

pipeline {
  agent any
  options {
    buildDiscarder(logRotator(numToKeepStr: '3'))
  }
  environment {
    DOCKERHUB_CREDENTIALS = credentials('dockerhub')
    repository = "pbear41/data-collect-server"
    dockerImage = ''
    isDockerBuildSuccess = false;
    version_value = ''
    version = ''
    imageName = ''
  }
  stages {
    stage('get version from gradle') {
      steps {
        script {
          version_value = sh(returnStdout: true, script: "cat build.gradle | grep -o 'version = [^,]*' | tr -d \\'").trim()
          sh "echo Project in version value: $version_value"
          version = version_value.split(/=/)[1].trim()
          sh "echo final version: $version"
          imageName = repository + ":" + version
        }
      }
    }
    stage('get release properties file') {
      steps {
        sh 'cp /mnt/datadisk/data/properties/data-collect-server/application-release.yml ./src/main/resources/application-release.yml'
      }
    }
    stage('Gradle Build') {
      steps {
        sh './gradlew clean build'
      }
    }
    stage('Docker Build') {
      steps {
        script {
        sh 'whoami'
            sh 'echo imageName: ' + imageName
            dockerImage = docker.build imageName
            isDockerBuildSuccess = true
        }
      }
    }
    stage('Login') {
      steps {
        sh 'echo $DOCKERHUB_CREDENTIALS_PSW | docker login -u $DOCKERHUB_CREDENTIALS_USR --password-stdin'
      }
    }
    stage('Push') {
      steps {
        sh 'docker push ' + imageName
      }
    }
  }
  post {
    always {
      sh 'docker logout'
    }
    success {
      script {
        if (isDockerBuildSuccess == true) {
          sh 'docker rmi ' + imageName
        }
      }
    }
  }
}

CI는 Jenkinsfile로 구성

1. gradle.build로부터 project 버전을 가져와서 image 태그를 설정

2. github에 올릴 수 없는 민감정보를 application-release.yml로 미리 서버에 배포한 파일 가져오기

3. gradle build

4. docker build (이미지 빌드)

5. dockerhub login

6. docker push

7. 마지막으로 도커 로그아웃 및 이미지 파일 삭제

 

CD pipeline

pipeline {
    agent any
    
    stages {
        stage('docker pull') {
            steps {
                sh 'docker pull pbear41/data-collect-server:$docker_image_tag'
            }
        }
        stage('stop prev container') {
            steps {
                script {
                    try {
                        sh 'docker stop data-collect-server'
                        sh 'docker rm data-collect-server'
                    } catch (Exception e) {
                        echo 'no prev container'
                    }
                }
            }
        }
        stage('run container') {
            steps {
                sh 'docker run -d --name data-collect-server -v /mnt/datadisk/data/logs:/logs pbear41/data-collect-server:$docker_image_tag'
            }
        }
    }
}

매개변수를 받아서 배포할 버전 선택

1. docker pull

2. stop & remove current container

3. container 가동

 

Connector

미리 이미지에 세팅한 Mongo Connector를 생성헀다.

{
  "name": "WowAuctionDataMongoSinkConnector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "collect.wow.auction",
    "connection.uri": "mongodb://{ip}:27017",
    "database": "rawData",
    "collection": "wowAuction",
    "value.converter.schemas.enable": "false"
  }
}

topic을 그대로 가져와서 insert 하는 sink connector이다.

 

add connector를 통해 손쉽게 connector를 구성할 수 있다.

 


동작 확인

20230409 21:10:00.001 [scheduling-1] INFO  c.p.d.s.s.FixedScheduleService - collect wow auctionData start
20230409 21:10:01.167 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@346a8ce
20230409 21:10:01.193 [scheduling-1] INFO  o.a.k.c.p.ProducerConfig - ProducerConfig values:
        acks = -1
        batch.size = 16384
...
20230409 21:10:01.271 [scheduling-1] INFO  o.a.k.c.p.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
20230409 21:10:01.304 [scheduling-1] INFO  o.a.k.c.u.AppInfoParser - Kafka version: 3.1.2
20230409 21:10:01.305 [scheduling-1] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId: f8c67dc3ae0a3265
20230409 21:10:01.305 [scheduling-1] INFO  o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1681042201301
20230409 21:10:01.521 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition collect.wow.auction-0 to 0 since the associated topicId changed from null to KCGd5qDVQT6QHU4QRsWXbQ
20230409 21:10:01.523 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.Metadata - [Producer clientId=producer-1] Cluster ID: aBUTkXopQjuLHE0Iqs-rIw
20230409 21:10:01.524 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1] ProducerId set to 1019 with epoch 0
20230409 21:10:01.577 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@2aa7b44a
20230409 21:10:01.578 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@297456bd
20230409 21:10:01.578 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@2bef2d7
20230409 21:10:01.579 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@152c032e
20230409 21:10:01.579 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@5aedd051

...
20230409 21:10:04.339 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@4e7a5ce8
20230409 21:10:04.339 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@7037639b
20230409 21:10:04.340 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@7e1d9bfa
20230409 21:10:04.340 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@4bbd62b1
20230409 21:10:04.340 [scheduling-1] INFO  c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@5b412b76
20230409 21:10:04.341 [scheduling-1] INFO  c.p.d.s.s.FixedScheduleService - collect wow auctionData end

정상 동작 확인

insert 된 데이터 확인

 


주저리주저리

구성하면서 쫌 힘든 구간들이 있었지만 최대한 워크어라운드 없이 정공법으로 해결했다.

문제들을 확인하면서, 카프카는 자유도와 가용성을 최대한 열어주어서 에코시스템이 구성될 수 있었지만

그만큼 고려해야 할 사항이 너무 많았다...

단순하게 메시지만 큐잉할 목적으로는 그냥 RabbitMQ를 사용하는 게 좋아 보인다.

 

이제 남은 건 데이터를 알맞은 형태로 제공하는 API 서버와

데이터를 사용자에게 보여줄 수 있는 client다 ㅎㅎ

 

추후 데이터가 쫌 더 모인 시점에 해당 데이터를 분석하는 프로세스를 구성해 보려 한다.


REFERENCE

1. https://openjdk.org/groups/net/httpclient/recipes.html

2. https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/mongodb-connection/

3. https://docs.confluent.io/platform/current/installation/docker/config-reference.html#required-kconnect-long-settings

4. https://develop.battle.net/documentation/world-of-warcraft/game-data-apis

 

 

반응형