Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services-queue/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ dependencies {
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-starter-webflux")
testImplementation("io.projectreactor:reactor-test")

testImplementation("io.mockk:mockk:1.13.8")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.yourssu.morupark.queue.business

data class PlatformInfo(
val platformId: Long,
val platformName: String,
val tps: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,36 @@ class QueueService(
) {

fun enqueue(accessToken: String) {
if (!authAdapter.isTokenValid(accessToken)) {
throw IllegalStateException("Access token is invalid")
}
// Validate token by getting user info
authAdapter.getUserInfo(accessToken)
kafkaProducer.send(accessToken)
}

fun getWaitingToken(accessToken: String): String {
if (!authAdapter.isTokenValid(accessToken)) {
throw IllegalStateException("Access token is invalid")
}
val userInfo = authAdapter.getUserInfo(accessToken)
val platformId = userInfo.platform.platformId

if (!queueAdapter.isInQueue(accessToken)) {
if (!queueAdapter.isInQueue(accessToken, platformId)) {
throw IllegalStateException("Access token is not in queue")
}

return authAdapter.getWaitingToken(accessToken)
}

fun getTicketStatusResult(accessToken: String, waitingToken: String): Any {
val ticketStatus = queueAdapter.getTicketStatus(accessToken)
val userInfo = authAdapter.getUserInfo(accessToken)
val platformId = userInfo.platform.platformId

val ticketStatus = queueAdapter.getTicketStatus(accessToken, platformId)
if (ticketStatus == TicketStatus.ALLOWED) {
queueAdapter.deleteFromAllowedQueue(accessToken)
queueAdapter.deleteFromAllowedQueue(accessToken, platformId)
return getAllowedStatusResult(waitingToken)
}
return getWaitingStatusResult(accessToken)
return getWaitingStatusResult(accessToken, platformId)
}

private fun getWaitingStatusResult(accessToken: String): ReadWaitingStatusResult {
val rank = queueAdapter.getRank(accessToken)!!
private fun getWaitingStatusResult(accessToken: String, platformId: Long): ReadWaitingStatusResult {
val rank = queueAdapter.getRank(accessToken, platformId)!!
val estimatedWaitingTime = waitingTimeEstimator.estimateWaitingTime(rank)
return ReadWaitingStatusResult(TicketStatus.WAITING, rank, estimatedWaitingTime)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.yourssu.morupark.queue.business

data class UserInfo(
val userId: Long,
val platform: PlatformInfo
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.yourssu.morupark.queue.implement

import com.yourssu.morupark.queue.business.UserInfo
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient

Expand All @@ -10,22 +11,21 @@ class AuthAdapter(
private val AUTH_SERVICE_URL = "http://localhost:8081"
private val webClientAuth = webClient.baseUrl(AUTH_SERVICE_URL).build()

fun isTokenValid(accessToken: String): Boolean {
fun getUserInfo(accessToken: String): UserInfo {
return webClientAuth.get()
.uri { uriBuilder ->
uriBuilder.path("/auth/token")
uriBuilder.path("/auth/me")
.queryParam("accessToken", accessToken)
.build()
}
.retrieve()
.bodyToMono(Boolean::class.java)
.block() == true
.bodyToMono(UserInfo::class.java)
.block() ?: throw IllegalStateException("Failed to get user info")
}

fun getWaitingToken(accessToken: String): String {
return webClientAuth.get()
.uri { uriBuilder ->
// TODO: 레오에게 이거 엔드포인트 뭔지 물어보기
uriBuilder.path("/auth/waiting-token")
.queryParam("accessToken", accessToken)
.build()
Expand All @@ -38,8 +38,7 @@ class AuthAdapter(
fun getExternalServerToken(waitingToken: String): String {
return webClientAuth.get()
.uri { uriBuilder ->
// TODO: 레오에게 이거 엔드포인트 뭔지 물어보기
uriBuilder.path("/auth/external-sever-token")
uriBuilder.path("/platforms/url")
.queryParam("waitingToken", waitingToken)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import org.springframework.stereotype.Component
@Component
class KafkaConsumer(
private val queueAdapter: QueueAdapter,

) {
private val authAdapter: AuthAdapter,
) {

companion object {
private const val TAG = "WAITING"
Expand All @@ -21,8 +21,11 @@ class KafkaConsumer(
accessToken: String,
@Header(KafkaHeaders.TIMESTAMP) timestamp: Long,
) {

queueAdapter.addToWaitingQueue(accessToken, timestamp)
val userInfo = authAdapter.getUserInfo(accessToken)
val platformId = userInfo.platform.platformId
val tps = userInfo.platform.tps
ServerTPSMap.put(platformId, tps)
queueAdapter.addToWaitingQueue(accessToken, timestamp, platformId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,69 @@ package com.yourssu.morupark.queue.implement

import com.yourssu.morupark.queue.business.TicketStatus
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ScanOptions
import org.springframework.stereotype.Component

@Component
class QueueAdapter(
private val redisTemplate: RedisTemplate<String, String>
) {

private val QUEUE_WAITING_KEY = "queue:waiting"
private val QUEUE_ALLOWED_KEY = "queue:allowed"
private fun getWaitingKey(platformId: Long) = "queue:$platformId:waiting"
private fun getAllowedKey(platformId: Long) = "queue:$platformId:allowed"

fun addToWaitingQueue(accessToken: String, timestamp: Long) {
redisTemplate.opsForZSet().add(QUEUE_WAITING_KEY, accessToken, timestamp.toDouble())
fun addToWaitingQueue(accessToken: String, timestamp: Long, platformId: Long) {
redisTemplate.opsForZSet().add(getWaitingKey(platformId), accessToken, timestamp.toDouble())
}

fun addToAllowedQueue(accessTokens: Set<String>) {
redisTemplate.opsForSet().add(QUEUE_ALLOWED_KEY, *accessTokens.toTypedArray())
fun addToAllowedQueue(accessTokens: Set<String>, platformId: Long) {
redisTemplate.opsForSet().add(getAllowedKey(platformId), *accessTokens.toTypedArray())
}

fun popFromWaitingQueue(count: Long): Set<String>? {
val items = redisTemplate.opsForZSet().range(QUEUE_WAITING_KEY, 0, count - 1)
redisTemplate.opsForZSet().remove(QUEUE_WAITING_KEY, items)
fun popFromWaitingQueue(count: Long, platformId: Long): Set<String>? {
val waitingKey = getWaitingKey(platformId)
val items = redisTemplate.opsForZSet().range(waitingKey, 0, count - 1)
if (!items.isNullOrEmpty()) {
redisTemplate.opsForZSet().remove(waitingKey, *items.toTypedArray())
}
return items
}

fun deleteFromAllowedQueue(accessToken: String) {
redisTemplate.opsForSet().remove(QUEUE_ALLOWED_KEY, accessToken)
fun deleteFromAllowedQueue(accessToken: String, platformId: Long) {
redisTemplate.opsForSet().remove(getAllowedKey(platformId), accessToken)
}

fun isInQueue(accessToken: String): Boolean {
val score = redisTemplate.opsForZSet().score(QUEUE_WAITING_KEY, accessToken)
fun isInQueue(accessToken: String, platformId: Long): Boolean {
val score = redisTemplate.opsForZSet().score(getWaitingKey(platformId), accessToken)
if (score != null) {
return true
}
return redisTemplate.opsForSet().isMember(QUEUE_ALLOWED_KEY, accessToken)!!
return redisTemplate.opsForSet().isMember(getAllowedKey(platformId), accessToken)!!
}

fun getTicketStatus(accessToken: String): TicketStatus {
val rank = redisTemplate.opsForZSet().rank(QUEUE_WAITING_KEY, accessToken)
fun getTicketStatus(accessToken: String, platformId: Long): TicketStatus {
val rank = redisTemplate.opsForZSet().rank(getWaitingKey(platformId), accessToken)
if (rank != null) return TicketStatus.WAITING
val allowed = redisTemplate.opsForSet().isMember(QUEUE_ALLOWED_KEY, accessToken)
val allowed = redisTemplate.opsForSet().isMember(getAllowedKey(platformId), accessToken)
if (allowed != null && allowed) return TicketStatus.ALLOWED
throw IllegalStateException("현재 대기열에 존재하지 않습니다.")
}

fun getRank(accessToken: String): Long? {
return redisTemplate.opsForZSet().rank(QUEUE_WAITING_KEY, accessToken)
fun getRank(accessToken: String, platformId: Long): Long? {
return redisTemplate.opsForZSet().rank(getWaitingKey(platformId), accessToken)
}

fun getAllPlatformWaitingKeys(): Set<String> {
val keys = mutableSetOf<String>()
val scanOptions = ScanOptions.scanOptions().match("queue:*:waiting").count(100).build()
redisTemplate.scan(scanOptions).use { cursor ->
cursor.forEach { key -> keys.add(key) }
}
return keys
}

fun extractPlatformIdFromKey(key: String): Long {
val parts = key.split(":")
return parts[1].toLong()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.yourssu.morupark.queue.implement

object ServerTPSMap {
private val tpsMap = mutableMapOf<Long, Long>()

fun put(platformId: Long, tps: Long) {
tpsMap[platformId] = tps
}

fun get(platformId: Long): Long? {
return tpsMap[platformId]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@ class StatusOperator(

/**
* 시간당 상태를 Waiting -> Allowed로 바꿔준다.
* 모든 플랫폼의 대기열을 Redis 키 패턴 스캔으로 찾아서 처리한다.
* 각 플랫폼의 TPS 설정에 따라 처리량을 조절한다.
*/
@Scheduled(fixedDelayString = "\${queue.processing-interval}")
fun changeStatus() {
val accessTokens = queueAdapter.popFromWaitingQueue(maxSize)
if (!accessTokens.isNullOrEmpty())
queueAdapter.addToAllowedQueue(accessTokens)
val waitingKeys = queueAdapter.getAllPlatformWaitingKeys()

for (key in waitingKeys) {
val platformId = queueAdapter.extractPlatformIdFromKey(key)
val tps = ServerTPSMap.get(platformId) ?: maxSize
val accessTokens = queueAdapter.popFromWaitingQueue(tps, platformId)
if (!accessTokens.isNullOrEmpty()) {
queueAdapter.addToAllowedQueue(accessTokens, platformId)
}
}
}
}
Loading