개요
와우 경매장 데이터를 한 시간마다 데이터를 GET해서 kafka로 publish 하며, publish 된 데이터를 sink connector를 통해 MongoDB에 raw data를 적재하는 프로세스에 대한 설계 및 구현
project 소스: https://github.com/p-bear/data-collect-server
프로젝트
- spring boot project
- spring-kafka를 통한 producer 구현
- Logback을 통한 로깅
- Java HttpClient를 통한 REST 통신
CI / CD
- Jenkinsfile을 통한 빌드 관리
- Dockerfile을 통한 이미지 빌드
Connector
- Kafka-connect를 통한 connector 추가 및 관리
- MongoSinkConnector
DB
- MongoDB
구현사항
data-collect-server
gradle.build
plugins {
id 'java'
id 'org.springframework.boot' version '2.7.10'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}
group = 'com.pbear'
version = '1.1.4'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'com.fasterxml.jackson.core:jackson-core:2.11.4'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.4'
implementation 'org.springframework.kafka:spring-kafka:2.9.6'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation 'com.google.code.gson:gson:2.8.9'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
tasks.named('test') {
useJUnitPlatform()
}
서버역할을 할것은 아니라서 spring-boot-starter만 사용
spring-kafka를 통해 손쉬운 카프카 producer 구현
application.yml
spring:
kafka:
producer:
bootstrap-servers: {ip}:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
max:
request:
size: 5008588
compression:
type: snappy
server:
name: data-collect-server
blizzard:
auth:
uri: https://oauth.battle.net
api:
uri: https://kr.api.blizzard.com
kafka 키는 String, value는 json
request size를 조금 키우고
snappy로 압축을 쫌 진행한다 (속도가 중요하진 않기 때문에)
Blizzard 토큰 발급
블리자드 developer 페이지의 가이드로, client_credential 토큰을 발급받아 추후 데이터 관련 API를 사용한다.
와우 경매장 데이터 GET
Connected Realm Id를 받아와서 그 값을 통해 경매장 데이터를 조회한다.
와우 내 경매장은 여러 서버가 묶인 Realm 단위로 경매장이 묶여있기에 개별적으로 GET 해야 한다.
데이터 Publish
public void publishWowAuctionData() {
GetRealmRes getRealmRes;
try {
getRealmRes = this.blizzardApiService.getRealm("hyjal");
} catch (IOException | InterruptedException e) {
log.error("fail to get hyjal realm from blizzard", e);
throw new RuntimeException(e);
}
String connectedRealm = getRealmRes.getConnectedRealm().getHref()
.split("/connected-realm/")[1]
.split("\\?")[0];
GetAuctionsRes getAuctionsRes;
try {
getAuctionsRes = this.blizzardApiService.getAuctions(Long.parseLong(connectedRealm));
} catch (IOException | InterruptedException e) {
log.error("fail to get Auction data from blizzard", e);
throw new RuntimeException(e);
}
String eventId = UUID.randomUUID().toString();
Long issueTimestamp = new Date().getTime();
getAuctionsRes.getAuctions().stream()
.filter(Objects::nonNull)
.map(auctionData -> EventMessage.builder()
.id(eventId)
.timestamp(issueTimestamp)
.version(1L)
.data(auctionData)
.build())
.forEach(eventMessage -> this.kafkaEventProduceService.sendSimpleMessage(TOPIC_COLLECT_WOW_AUCTION, eventMessage));
}
1. hyjal이 속한 realm id를 받아온다
2. Auction 데이터를 받아온다
3. Auction에서 아이템 별로 쪼개서 publish 한다.
이벤트 데이터 설계
@Getter
@Setter
@Builder
public class EventMessage<T> {
private String id;
private String issuer;
private Long timestamp;
private Long version;
private T data;
}
- id: 이벤트의 id, transactionId의 역할도 한다.
- issuer: 이벤트의 이슈 발급자
- timestamp: 이벤트 생성 일시
- version: 이벤트 메시지의 버전
- data: 이벤트 데이터
Scheduler
@SpringBootApplication
@EnableScheduling
public class DataCollectServerApplication {
public static void main(String[] args) {
SpringApplication.run(DataCollectServerApplication.class, args);
}
}
@Scheduled(cron = "0 10 * * * ?")
public void collectWowAuctionData() {
log.info("collect wow auctionData start");
this.wowDataCollectService.publishWowAuctionData();
log.info("collect wow auctionData end");
}
스케쥴링 활성화 후 매시 10분에 데이터 collect 및 publish를 트리거링
Dockerfile
FROM openjdk:11
RUN ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime
ARG JAR_FILE=./build/libs/*.jar
COPY ${JAR_FILE} ./app.jar
ENTRYPOINT ["java","-jar","-Dspring.profiles.active=release", "-Duser.timezone=Asia/Seoul","/app.jar"]
- openjdk 11 버전 사용
- timezone 아시아 서울로 맞춤
- spring profile을 release로 배포
Jenkinsfile
pipeline {
agent any
options {
buildDiscarder(logRotator(numToKeepStr: '3'))
}
environment {
DOCKERHUB_CREDENTIALS = credentials('dockerhub')
repository = "pbear41/data-collect-server"
dockerImage = ''
isDockerBuildSuccess = false;
version_value = ''
version = ''
imageName = ''
}
stages {
stage('get version from gradle') {
steps {
script {
version_value = sh(returnStdout: true, script: "cat build.gradle | grep -o 'version = [^,]*' | tr -d \\'").trim()
sh "echo Project in version value: $version_value"
version = version_value.split(/=/)[1].trim()
sh "echo final version: $version"
imageName = repository + ":" + version
}
}
}
stage('get release properties file') {
steps {
sh 'cp /mnt/datadisk/data/properties/data-collect-server/application-release.yml ./src/main/resources/application-release.yml'
}
}
stage('Gradle Build') {
steps {
sh './gradlew clean build'
}
}
stage('Docker Build') {
steps {
script {
sh 'whoami'
sh 'echo imageName: ' + imageName
dockerImage = docker.build imageName
isDockerBuildSuccess = true
}
}
}
stage('Login') {
steps {
sh 'echo $DOCKERHUB_CREDENTIALS_PSW | docker login -u $DOCKERHUB_CREDENTIALS_USR --password-stdin'
}
}
stage('Push') {
steps {
sh 'docker push ' + imageName
}
}
}
post {
always {
sh 'docker logout'
}
success {
script {
if (isDockerBuildSuccess == true) {
sh 'docker rmi ' + imageName
}
}
}
}
}
CI는 Jenkinsfile로 구성
1. gradle.build로부터 project 버전을 가져와서 image 태그를 설정
2. github에 올릴 수 없는 민감정보를 application-release.yml로 미리 서버에 배포한 파일 가져오기
3. gradle build
4. docker build (이미지 빌드)
5. dockerhub login
6. docker push
7. 마지막으로 도커 로그아웃 및 이미지 파일 삭제
CD pipeline
pipeline {
agent any
stages {
stage('docker pull') {
steps {
sh 'docker pull pbear41/data-collect-server:$docker_image_tag'
}
}
stage('stop prev container') {
steps {
script {
try {
sh 'docker stop data-collect-server'
sh 'docker rm data-collect-server'
} catch (Exception e) {
echo 'no prev container'
}
}
}
}
stage('run container') {
steps {
sh 'docker run -d --name data-collect-server -v /mnt/datadisk/data/logs:/logs pbear41/data-collect-server:$docker_image_tag'
}
}
}
}
매개변수를 받아서 배포할 버전 선택
1. docker pull
2. stop & remove current container
3. container 가동
Connector
미리 이미지에 세팅한 Mongo Connector를 생성헀다.
{
"name": "WowAuctionDataMongoSinkConnector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "collect.wow.auction",
"connection.uri": "mongodb://{ip}:27017",
"database": "rawData",
"collection": "wowAuction",
"value.converter.schemas.enable": "false"
}
}
topic을 그대로 가져와서 insert 하는 sink connector이다.
add connector를 통해 손쉽게 connector를 구성할 수 있다.
동작 확인
20230409 21:10:00.001 [scheduling-1] INFO c.p.d.s.s.FixedScheduleService - collect wow auctionData start
20230409 21:10:01.167 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@346a8ce
20230409 21:10:01.193 [scheduling-1] INFO o.a.k.c.p.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
...
20230409 21:10:01.271 [scheduling-1] INFO o.a.k.c.p.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
20230409 21:10:01.304 [scheduling-1] INFO o.a.k.c.u.AppInfoParser - Kafka version: 3.1.2
20230409 21:10:01.305 [scheduling-1] INFO o.a.k.c.u.AppInfoParser - Kafka commitId: f8c67dc3ae0a3265
20230409 21:10:01.305 [scheduling-1] INFO o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1681042201301
20230409 21:10:01.521 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition collect.wow.auction-0 to 0 since the associated topicId changed from null to KCGd5qDVQT6QHU4QRsWXbQ
20230409 21:10:01.523 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.Metadata - [Producer clientId=producer-1] Cluster ID: aBUTkXopQjuLHE0Iqs-rIw
20230409 21:10:01.524 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1] ProducerId set to 1019 with epoch 0
20230409 21:10:01.577 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@2aa7b44a
20230409 21:10:01.578 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@297456bd
20230409 21:10:01.578 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@2bef2d7
20230409 21:10:01.579 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@152c032e
20230409 21:10:01.579 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@5aedd051
...
20230409 21:10:04.339 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@4e7a5ce8
20230409 21:10:04.339 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@7037639b
20230409 21:10:04.340 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@7e1d9bfa
20230409 21:10:04.340 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@4bbd62b1
20230409 21:10:04.340 [scheduling-1] INFO c.p.d.s.k.KafkaEventProduceService - send message, topic: collect.wow.auction, message class: com.pbear.datacollectserver.data.blizzard.call.response.GetAuctionsRes$Auction, message: com.pbear.datacollectserver.data.kafka.EventMessage@5b412b76
20230409 21:10:04.341 [scheduling-1] INFO c.p.d.s.s.FixedScheduleService - collect wow auctionData end
정상 동작 확인
insert 된 데이터 확인
주저리주저리
구성하면서 쫌 힘든 구간들이 있었지만 최대한 워크어라운드 없이 정공법으로 해결했다.
문제들을 확인하면서, 카프카는 자유도와 가용성을 최대한 열어주어서 에코시스템이 구성될 수 있었지만
그만큼 고려해야 할 사항이 너무 많았다...
단순하게 메시지만 큐잉할 목적으로는 그냥 RabbitMQ를 사용하는 게 좋아 보인다.
이제 남은 건 데이터를 알맞은 형태로 제공하는 API 서버와
데이터를 사용자에게 보여줄 수 있는 client다 ㅎㅎ
추후 데이터가 쫌 더 모인 시점에 해당 데이터를 분석하는 프로세스를 구성해 보려 한다.
REFERENCE
1. https://openjdk.org/groups/net/httpclient/recipes.html
4. https://develop.battle.net/documentation/world-of-warcraft/game-data-apis
'[완] 개인서버 개발 > 공통 서비스 개발(완)' 카테고리의 다른 글
#7 3rd OAuth 연동 (google) (0) | 2023.05.19 |
---|---|
#6 Webflux기반 OAuth2서버 + gateway 구축 (0) | 2023.05.14 |
#4 MongoDB 설치 (0) | 2023.03.30 |
#3 Confluent Platform 설치 (0) | 2023.03.29 |
#2 Jenkins를 통한 CI/CD 환경 구축 with docker (0) | 2023.03.17 |