[완] 개인서버 개발/FlagWar(중단)

#2 Session Connector Server 구현

북극곰은콜라 2023. 8. 3. 18:00
반응형


개요

FlagWar 관련 개발을 한동안 바빠서 소흘했었네요...
이제 슬슬 다시 재개하려 합니다.
이번 구현한 사항은 client와 session을 유지하며, 메시지를 보낼 client를 찾아서 서버 push를 보낼 수 있는 서버입니다.
EX) A, B가 X라는 reference로 서버와 session을 유지하고 있을 때, X로 향하는 메시지가 A, B 모두에게 전달될 수 있어야 함

 


요구사항 요약

1. 서버는 Client와의 Session을 유지할 수 있어야 한다.
2. 서버에 연결된 Client는 자신을 식별할 수 있는 TAG 정보를 서버에 저장 및 삭제할 수 있어야 한다.
3. 서버는 메시지를 수신하여 TAG 기반으로 Client에게 메시지를 전달 할 수 있어야 한다.
4. 서버는 Scale-out이 용이한 구조를 가진다.

 


설계

Infra Structure

기본적으로 Server <-> Client는 Nginx를 통해 연결된다.
세션 및 서버 PUSH를 위한 프로토콜은 Websocket을 사용한다.
Client가 Message를 받을 TAG CRUD는 REST, Websocket Message를 사용한다.
Server는 Kafka에서 Message 관련 Topic을 구독한다.
Message 전파를 위해서, Tag를 포함한 Message를 해당 Topic에 Publish 한다.

Kafka를 사용한 이유는 Scale-out에 용이한 구조를 가지기 위함이며, 모든 message 전달은 kafka를 통해서 전달하여 session server는 자신의 session에 해당되는 Tag를 기반으로 메시지를 전달한다.
// kafka 관련 인프라가 이미 구축되어 있기에 다른 메시지 큐 도입은 안하기로 결정

 


구현

- Kotlin 기반
- Spring Webflux
- Spring Websocket
- Reactor-Kafka
SessionManager: Session을 관리하는 클래스, websocket session을 wrapping한 객체를 가지고 있으며, 해당 객체는 websocket session 외 referenceTag 등을 추가로 가진다.
WebsocketConifig는 StringWebsocketHandler를 등록하며, 해당 handler에서 세션처리 및 메시지 처리를 한다.
SessionController는 REST 인터페이스를 제공
KafkaConsumeManager는 특정 Topic을 Consume 하며, SessionManager를 통해 websocket session에 message를 전달한다.

 


Session 기능 (Websocket)

@Configuration
@EnableWebSocket
class WebsocketConfig(val stringWebsocketHandler: StringWebsocketHandler): WebSocketConfigurer {
    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        registry
            .addHandler(stringWebsocketHandler, "/connect").setAllowedOrigins("*")
    }
}
Websocket Enable 및 handler, resource 설정
CORS를 위한 origin 설정
override fun afterConnectionEstablished(session: WebSocketSession) {
    val sessionWrapper = SessionWrapper(id = session.id, webSocketSession = session)
    session.uri?.rawQuery?.let { addReferenceTags(it, sessionWrapper) }
    this.sessionManager.addSession(sessionWrapper)
}

override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
    this.sessionManager.removeSession(session.id)
}
Websocket connection 연결/끊기 관련 코드
establish시 메모리 상 session을 추가
close 시 session 제거
class SessionWrapper(
    val id: String,
    var referenceTagSet: MutableSet<String> = HashSet(),
    val timestamp: Long = Date().time,
    @JsonIgnore
    val webSocketSession: WebSocketSession
)
메모리상의 Session Wrapper 객체
고유 id를 가지며, referenceTag를 Set으로 가지고 있는다.
referenceTag는 중복없는 단순 String 값이다.

Reference 관련 기능

fun addReferenceTag(sessionId: String, referenceTag: String) =
        this.sessionMap[sessionId]?.referenceTagSet?.add(referenceTag)

fun removeReferenceTag(sessionId: String, referenceTag: String) =
    this.sessionMap[sessionId]?.referenceTagSet?.remove(referenceTag)
Reference를 추가/제거하는 메서드
websocket message handle 및 REST Controller에서 호출된다.

SendMessage 기능

fun sendMessage(referenceTags: Set<String>, message: String) {
    val failList = this.getSessionsByReferenceTags(referenceTags)
        .map {
            log.info("send message, id: ${it.id}")
            try {
                it.webSocketSession.sendMessage(TextMessage(message))
                null
            } catch (e: Exception) {
                it
            }
        }
        .filterNotNull()
        .toList()
    failList.forEach { this.removeSession(it.id) }
}

fun getSessionsByReferenceTags(target: Set<String>): Set<SessionWrapper> {
    return this.sessionMap.entries
        .map { it.value }
        .filter { target.intersect(it.referenceTagSet).isNotEmpty() }
        .toSet()
}
message를 전달할 대상은 referenceTag를 기반으로 하며
메모리상의 session들에게서 referenceTag가 매칭되는 객체를 찾아서, websocket session에 메시지를 보낸다.
sendMessage에 실패하는 경우, 비활성화된 세션으로 간주하여, session에서 remove하고, close 처리를 한다.

Consume Kafka Topic

@PostConstruct
fun init() {
    val consumerConfig = HashMap<String, Any>();
    consumerConfig[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServer
    consumerConfig[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()
    consumerConfig[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    consumerConfig[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    consumerConfig[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] =
        OffsetResetStrategy.LATEST.name.lowercase(Locale.getDefault())
    consumerConfig[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10

    this.receiverOptions = ReceiverOptions.create(consumerConfig)
    this.connectorMessageReceiver = KafkaReceiver
        .create(
            this.receiverOptions
                .commitInterval(Duration.of(10, ChronoUnit.MILLIS))
                .commitBatchSize(1)
                .subscription(listOf("connector.message"))
        )

    this.consumeConnectorMessage()
}

fun consumeConnectorMessage() {
    this.connectorMessageReceiver
        .receiveAutoAck()
        .concatMap { it }
        .onErrorContinue { t, _ -> t.printStackTrace() }
        .subscribe { record ->
            try {
                val commonConnectorMessage = this.gson.fromJson(record.value(), CommonConnectorMessage::class.java)
                this.sessionManager.sendMessage(
                    commonConnectorMessage.referenceTags.toSet(),
                    this.gson.toJson(commonConnectorMessage.message)
                )
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
}
Server는 상시 message Topic을 구독하고 있으며
유효한 메시지를 기반으로 message 전파를 호출한다.
이미 지나간 메시지는 실시간 메시지 전파와 맞지 않기에, offset 전략은 latest
reactor 설정인 offset commit은 autoAck(비동기)로 설정

 


마치며

최소 기능만 충족하도록 구현했으며
추후 기능추가 및 리펙토링이 진행될 것으로 예상된다.

지금은 핵심 기능은 게임 엔진 설계에 집중하고 있다.
목표는 기물을 기반으로 한 보드게임을 어느정도 커버할 수 있는 엔진을 만들고
해당 엔진으로 서버 interface를 구성한 뒤
web 또는 app을 구현하려 한다.

 

 

반응형

'[완] 개인서버 개발 > FlagWar(중단)' 카테고리의 다른 글

#1 FlagWar 기획  (0) 2023.07.12