Coroutine, Webflux<Mono>

2022. 10. 21. 15:54 Spring Framework/Spring webflux

Spring + Webflux

Webflux (Reactive) 를 이해하기 위해 sync, async, blocking, non-blocking 차이를 이해해야 했고,

async 한 코딩을 위해 CoroutineScope 를 사용하면 편리하다는 점을 알게 되었습니다.

Webflux 를 이해하기 위해 일반 sync + blocking 코드를 async + non-blocking 코드로 변환해가며 이해하고자 노력하였습니다.

#1 Sync 코드

​아래 코드를 보면, 1~3 까지 출력을 두번 합니다.

첫번째 loop문이 끝나야 두번째 loop문을 실행합니다.

순서대로 코드가 실행되며 (sync), 앞의 loop문은 뒤의 loop문을 blocking 한다고 볼 수 있습니다.

(1..3).forEach {
    println("#A - $it") // blocking 라인
    delay(1000) // 1초가 걸리는 blocking 라인
} // blocking 라인

(1..3).forEach {
    println("#B - $it") // blocking 라인 
    delay(1000) // 1초가 걸리는 blocking 라인
}

1~3 까지 총 3초가 걸리는 loop 두개를 실행하여 총 6초가 걸립니다.

#2 Async 코드

두개의 loop를 Async 로 변경해 봅니다.

이때 Coroutine을 사용합니다.

Coroutine 의 특징 중 하나로 작업을 양보(yield) 하는 기능이 있습니다.

아래 코드에서는 delay() 함수가 yield 를 대신합니다.

runBlocking { // coroutine을 사용하기 위해 CoroutineScope를 지정해야 한다. 
    async {
        (1..3).forEach {
            println("#A - $it") // blocking 라인
            delay(1000) // 1초가 걸리는 blocking 라인
        } // non-blocking 라인
    }

    async { // 이 부분은 async 하게 돌 것이다.
        (1..3).forEach {
            println("#B - $it") // blocking 라인
            delay(1000) // 1초가 걸리는 blocking 라인
        } // non-blocking 라인
    }
}

첫번째 loop가 끝나지 않았지만 뒷 loop도 같이 실행이 됩니다. loop가 끝날떄까지 대기하지 않으니 3초가 걸립니다.

​Deferred, yield

async 블록은 아래와 같이 이쁘게 (deferred) 정리가 가능합니다.

runBlocking {
    deferred("A")
    deferred("B")

    val c = deferred("C")
    c.start()
}

private fun CoroutineScope.deferred(name: String): Deferred<Unit> {
    val deffered = async {
        (1..3).forEach {
            println("#$name - $it") // blocking 코드
            yield() // 현 thread의 작업을 양보한다.
        }
    }
    return deffered
}

#3 Async + Blocking

RestTemplate 은 Thread-safe한 대표적은 Blocking 호출입니다.

runBlocking {
    deferred(name = "A", userId = 1, apiDelay = 1)
    deferred(name = "B", userId = 1, apiDelay = 1)
}

private fun CoroutineScope.deferred(name: String, userId: Int, apiDelay: Int): Deferred<Unit> {
    val deffered = async {
        (1..3).forEach {
            val result = RestTemplate()
                .getForObject("$BASE_URL/user/$userId/$apiDelay", JSONObject::class.java)!!
            println("$name: ${result.get("header")}")
        }
    }
    return deffered
}

async지만 AAA, BBB 순서로 동작을 한다.

Async하게 코딩을 하였지만, 너무 오래 걸립니다.

yield()를 추가해 줍니다.

val deffered = async {
    (1..3).forEach {
        val result = RestTemplate()
            .getForObject("$BASE_URL/user/$userId/$apiDelay", JSONObject::class.java)!!
        println("$name: ${result.get("header")}")
        yield() // <--- yield 추가
    }
}

A, B 순차로 실행이 되지만 여전히 느리다.

A, B 번갈아 실행이 되지만, yield 를 추가해도 여전히 느립니다.

A가 응답을 받을 때까지 B는 대기하고 있고, 반대로 B도 A를 기다리는것으로 보입니다.

RestTemplate는 Blocking 호출을 하기 때문에 응답을 받을때까지 Thread가 대기를 해야 합니다.

⚠️ ⛔️ blocking이 들어가는 순간 single-thread,

Coroutine 등의 효과는 사라진다는 것을 의미합니다. ⛔️ ⚠️

그렇다면 blocking call을 하는 RestTemplate 대신 async, non-blocking을 하는 WebClient 를 사용해 봅니다.

(1..3).forEach {
    val result = WebClient.create(BASE_URL)
        .get()
        .uri("/user/$userId/$apiDelay")
        .retrieve()
        .bodyToMono(JSONObject::class.java)
    result.subscribe { // async 하기 때문에 실행된 후의 행동을 subscribe 에 등록해야 한다.
        println("$name: ${it!!.get("header")}")
    }
    result.block()
}

하지만 실행결과가 똑같다. 문제는 block() 코드

하지만 실행결과가 똑같이 느립니다.

이유는 block()의 위치입니다.

처음에는 이해하기 어려운 개념인데 Mono, Flux 등 의 방식은 Lazy 실행방식입니다.

최대한 한번에 실행할 수 있는 Mono들을 묶은 다음(Flux)에 한번에 block(실행) 해야 원하는 효과가 나타납니다.

#4 Async + Non-blocking

runBlocking {
    val fluxA = createFlux(name = "A", userId = 1, apiDelay = 1)
    val fluxB = createFlux(name = "B", userId = 1, apiDelay = 1)
    Flux.merge(fluxA, fluxB).blockLast() // fluxA, fluxB를 묶어서 한번에 실행시킨다.
}

private fun createFlux(name: String, userId: Int, apiDelay: Int): Flux<JSONObject> {
    return Flux.range(1, 3) // 1~3까지 반복
        .concatMap { // Mapping된 결과를 합친다.
            val result: Mono<JSONObject> = WebClient.create(BASE_URL) // Mono로 받을 WebClient 생성
                .get()
                .uri("/user/$userId/$apiDelay")
                .retrieve()
                .bodyToMono(JSONObject::class.java)
            result.subscribe { // async 하기 때문에 실행된 후의 행동을 subscribe 에 등록해야 한다.
                println("$name: ${it!!.get("header")}")
            }
            return@concatMap result
        }
}

원하는대로 Async하게 호출되면서 서로 호출을 대기하지 않고 4초대로 끝났습니다.

정확하게 확인을 위해 더 많이 호출을 해보고, ElapsedTime도 추가해 봅니다.

val elapsed = measureTimeMillis {
    runBlocking {
        Flux.merge(
            createFlux(name = "A", userId = 1, apiDelay = 1),
            createFlux(name = "B", userId = 2, apiDelay = 1),
            createFlux(name = "C", userId = 3, apiDelay = 1),
            createFlux(name = "D", userId = 4, apiDelay = 1),
            createFlux(name = "E", userId = 5, apiDelay = 1),
            createFlux(name = "F", userId = 6, apiDelay = 1),
            createFlux(name = "G", userId = 7, apiDelay = 1)
        ).blockLast()
    }
}
println("Elapsed Time: ${elapsed}ms")

꽤 많이 호출하였지만 결과는 똑같습니다.

Async + nonBlocking 의 경우 얻는 성능 이점이 꽤 크다는 것을 알 수 있습니다.

Coroutine의 특징은 하나의 Thread로 열심히 일을 시키는 것입니다.

Thread가 가능한 한 놀고 있지 않는 상태로 코드를 짜는게 가장 Reactive한 방식이라 생각됩니다.

#번외 - await, flow-emit

Async - Await

만약 여러개의 async brace 가 있지만, 특정 작업을 다 끝마치고 다음 작업을 하고 싶다면 await 를 사용하면 됩니다.

NodeJS 등에서도 볼 수 있는 async-await 기능입니다.

await() 이 걸린 부분먼저 실행을 한 후에, 다음 작업을 진행합니다.

A가 먼저 실행된 후 B,C 가 동시에 실행됩니다.

 
runBlocking {
    deferred("A").await()
    deferred("B")
    deferred("C")
}
#A - 1
#A - 2
#A - 3
#B - 1
#C - 1
#B - 2
#C - 2
#B - 3
#C - 3

Flow - Emit

특정 값을 추출하려는 용도로 적합합니다.

collect 를 호출해야 flow 블록이 실행되며, collect brace 안에서 emit된 값들이 차례대로 실행됩니다.

runBlocking {
    deferred("A")
    deferred("B")

    flow {
        for (i in 1..3) {
            emit(i)
            yield()
        }
    }.collect {
        println("#C - $it")
    }
}
 
#C - 1
#A - 1
#B - 1
#C - 2
#A - 2
#B - 2
#C - 3
#A - 3
#B - 3
 

[출처] Coroutine, Webflux<Mono>|작성자 joebak