Coroutine, Webflux<Mono>
Spring + Webflux
Webflux (Reactive) 를 이해하기 위해 sync, async, blocking, non-blocking 차이를 이해해야 했고,
async 한 코딩을 위해 CoroutineScope 를 사용하면 편리하다는 점을 알게 되었습니다.
Webflux 를 이해하기 위해 일반 sync + blocking 코드를 async + non-blocking 코드로 변환해가며 이해하고자 노력하였습니다.
#1 Sync 코드
아래 코드를 보면, 1~3 까지 출력을 두번 합니다.
첫번째 loop문이 끝나야 두번째 loop문을 실행합니다.
순서대로 코드가 실행되며 (sync), 앞의 loop문은 뒤의 loop문을 blocking 한다고 볼 수 있습니다.
(1..3).forEach {
println("#A - $it") // blocking 라인
delay(1000) // 1초가 걸리는 blocking 라인
} // blocking 라인
(1..3).forEach {
println("#B - $it") // blocking 라인
delay(1000) // 1초가 걸리는 blocking 라인
}
1~3 까지 총 3초가 걸리는 loop 두개를 실행하여 총 6초가 걸립니다.
#2 Async 코드
두개의 loop를 Async 로 변경해 봅니다.
이때 Coroutine을 사용합니다.
Coroutine 의 특징 중 하나로 작업을 양보(yield) 하는 기능이 있습니다.
아래 코드에서는 delay() 함수가 yield 를 대신합니다.
runBlocking { // coroutine을 사용하기 위해 CoroutineScope를 지정해야 한다.
async {
(1..3).forEach {
println("#A - $it") // blocking 라인
delay(1000) // 1초가 걸리는 blocking 라인
} // non-blocking 라인
}
async { // 이 부분은 async 하게 돌 것이다.
(1..3).forEach {
println("#B - $it") // blocking 라인
delay(1000) // 1초가 걸리는 blocking 라인
} // non-blocking 라인
}
}
첫번째 loop가 끝나지 않았지만 뒷 loop도 같이 실행이 됩니다. loop가 끝날떄까지 대기하지 않으니 3초가 걸립니다.
Deferred, yield
async 블록은 아래와 같이 이쁘게 (deferred) 정리가 가능합니다.
runBlocking {
deferred("A")
deferred("B")
val c = deferred("C")
c.start()
}
private fun CoroutineScope.deferred(name: String): Deferred<Unit> {
val deffered = async {
(1..3).forEach {
println("#$name - $it") // blocking 코드
yield() // 현 thread의 작업을 양보한다.
}
}
return deffered
}
#3 Async + Blocking
RestTemplate 은 Thread-safe한 대표적은 Blocking 호출입니다.
runBlocking {
deferred(name = "A", userId = 1, apiDelay = 1)
deferred(name = "B", userId = 1, apiDelay = 1)
}
private fun CoroutineScope.deferred(name: String, userId: Int, apiDelay: Int): Deferred<Unit> {
val deffered = async {
(1..3).forEach {
val result = RestTemplate()
.getForObject("$BASE_URL/user/$userId/$apiDelay", JSONObject::class.java)!!
println("$name: ${result.get("header")}")
}
}
return deffered
}
async지만 AAA, BBB 순서로 동작을 한다.
Async하게 코딩을 하였지만, 너무 오래 걸립니다.
yield()를 추가해 줍니다.
val deffered = async {
(1..3).forEach {
val result = RestTemplate()
.getForObject("$BASE_URL/user/$userId/$apiDelay", JSONObject::class.java)!!
println("$name: ${result.get("header")}")
yield() // <--- yield 추가
}
}
A, B 순차로 실행이 되지만 여전히 느리다.
A, B 번갈아 실행이 되지만, yield 를 추가해도 여전히 느립니다.
A가 응답을 받을 때까지 B는 대기하고 있고, 반대로 B도 A를 기다리는것으로 보입니다.
RestTemplate는 Blocking 호출을 하기 때문에 응답을 받을때까지 Thread가 대기를 해야 합니다.
⚠️ ⛔️ blocking이 들어가는 순간 single-thread,
Coroutine 등의 효과는 사라진다는 것을 의미합니다. ⛔️ ⚠️
그렇다면 blocking call을 하는 RestTemplate 대신 async, non-blocking을 하는 WebClient 를 사용해 봅니다.
(1..3).forEach {
val result = WebClient.create(BASE_URL)
.get()
.uri("/user/$userId/$apiDelay")
.retrieve()
.bodyToMono(JSONObject::class.java)
result.subscribe { // async 하기 때문에 실행된 후의 행동을 subscribe 에 등록해야 한다.
println("$name: ${it!!.get("header")}")
}
result.block()
}
하지만 실행결과가 똑같다. 문제는 block() 코드
하지만 실행결과가 똑같이 느립니다.
이유는 block()의 위치입니다.
처음에는 이해하기 어려운 개념인데 Mono, Flux 등 의 방식은 Lazy 실행방식입니다.
최대한 한번에 실행할 수 있는 Mono들을 묶은 다음(Flux)에 한번에 block(실행) 해야 원하는 효과가 나타납니다.
#4 Async + Non-blocking
runBlocking {
val fluxA = createFlux(name = "A", userId = 1, apiDelay = 1)
val fluxB = createFlux(name = "B", userId = 1, apiDelay = 1)
Flux.merge(fluxA, fluxB).blockLast() // fluxA, fluxB를 묶어서 한번에 실행시킨다.
}
private fun createFlux(name: String, userId: Int, apiDelay: Int): Flux<JSONObject> {
return Flux.range(1, 3) // 1~3까지 반복
.concatMap { // Mapping된 결과를 합친다.
val result: Mono<JSONObject> = WebClient.create(BASE_URL) // Mono로 받을 WebClient 생성
.get()
.uri("/user/$userId/$apiDelay")
.retrieve()
.bodyToMono(JSONObject::class.java)
result.subscribe { // async 하기 때문에 실행된 후의 행동을 subscribe 에 등록해야 한다.
println("$name: ${it!!.get("header")}")
}
return@concatMap result
}
}
원하는대로 Async하게 호출되면서 서로 호출을 대기하지 않고 4초대로 끝났습니다.
정확하게 확인을 위해 더 많이 호출을 해보고, ElapsedTime도 추가해 봅니다.
val elapsed = measureTimeMillis {
runBlocking {
Flux.merge(
createFlux(name = "A", userId = 1, apiDelay = 1),
createFlux(name = "B", userId = 2, apiDelay = 1),
createFlux(name = "C", userId = 3, apiDelay = 1),
createFlux(name = "D", userId = 4, apiDelay = 1),
createFlux(name = "E", userId = 5, apiDelay = 1),
createFlux(name = "F", userId = 6, apiDelay = 1),
createFlux(name = "G", userId = 7, apiDelay = 1)
).blockLast()
}
}
println("Elapsed Time: ${elapsed}ms")
꽤 많이 호출하였지만 결과는 똑같습니다.
Async + nonBlocking 의 경우 얻는 성능 이점이 꽤 크다는 것을 알 수 있습니다.
Coroutine의 특징은 하나의 Thread로 열심히 일을 시키는 것입니다.
Thread가 가능한 한 놀고 있지 않는 상태로 코드를 짜는게 가장 Reactive한 방식이라 생각됩니다.
#번외 - await, flow-emit
Async - Await
만약 여러개의 async brace 가 있지만, 특정 작업을 다 끝마치고 다음 작업을 하고 싶다면 await 를 사용하면 됩니다.
NodeJS 등에서도 볼 수 있는 async-await 기능입니다.
await() 이 걸린 부분먼저 실행을 한 후에, 다음 작업을 진행합니다.
A가 먼저 실행된 후 B,C 가 동시에 실행됩니다.
runBlocking {
deferred("A").await()
deferred("B")
deferred("C")
}
#A - 1
#A - 2
#A - 3
#B - 1
#C - 1
#B - 2
#C - 2
#B - 3
#C - 3
Flow - Emit
특정 값을 추출하려는 용도로 적합합니다.
collect 를 호출해야 flow 블록이 실행되며, collect brace 안에서 emit된 값들이 차례대로 실행됩니다.
runBlocking {
deferred("A")
deferred("B")
flow {
for (i in 1..3) {
emit(i)
yield()
}
}.collect {
println("#C - $it")
}
}
#C - 1
#A - 1
#B - 1
#C - 2
#A - 2
#B - 2
#C - 3
#A - 3
#B - 3
[출처] Coroutine, Webflux<Mono>|작성자 joebak
'Spring Framework > Spring webflux' 카테고리의 다른 글
Webflux 가 빠른 이유, MVC 와 비교했을 때 장단점[출처] Webflux 가 빠른 이유, MVC 와 비교했을 때 장단점|작성자 joebak (0) | 2022.10.21 |
---|---|
spring webflux와 armeria 살펴보기 (Mono, Flux, gRPC, Thrift) (0) | 2021.03.23 |