Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현)
- 참조문서
리액티브 스트림을 기본적으로 이해해야, Reactor 도 수월하게 이해할 수 있다.
cf) 리액티브 스트림 포스팅 : https://sjh836.tistory.com/182
예제코드는 io.projectreactor:reactor-core:3.4.2 기준으로 작성되었다.
1. Reactor의 탄생
- reactor 1.0은 13년 7월에 출시되었다. spring 프레임워크의 개발팀인 pivotal 에서 만든 오픈소스이다.
- 리액터 패턴, 함수형 프로그래밍, 메세지 기반 등의 설계와 모범사례들을 통합
- 리액터 패턴 포스팅 : https://sjh836.tistory.com/184
- 당시, 스프링 개발팀은 대용량 데이터 개발을 단순화하는 Spring XD 프로젝트를 개발 중이었다.
- 17년 7월부로 deprecated 되었고, Spring Cloud Data Flow 로 넘어갔다.
- Spring XD 에 비동기 논블로킹을 지원하기 위해 만들어진 프로젝트가 Reactor 이다.
- 장점 : spring 과 완벽한 통합, netty 도 지원, 비동기 논블로킹 메세지 처리, 고성능
- 단점 : Back Pressure 기능 없음, 복잡한 오류처리
2. Reactive Stream과 Reactor 3.x
- reactor 2.0 는 위 단점들을 해결하고, 15년 3월에 나왔다.
- onOverflowBuffer(), onOverflowDrop() 등을 통해서 Back Pressure 를 지원
- 15년 4월에는 출시된 리액티브 스트림 표준 스펙이 발표되었다.
- API 및 규칙만 정의하고, 사용할 수 있는 라이브러리를 제공하진 않았다.
- reactor 는 리액티브 스트림 표준 스펙을 2.5 마일스톤 버전부터 지원했다. (16년 2월)
- 하지만, 릴리즈는 3.0으로 했다. (16년 8월)
- 비슷한 시기에 리액티브 스트림 표준을 지원하는 RxJava 2.0 도 출시되었는데, 가장 큰 차이점은 RxJava는 안드로이드 포함한 jdk6 부터 지원했고, reactor는 jdk8부터 지원했다.
3. Publisher<T>의 구현체
- Flux와 Mono가 있다. 이 둘은 대략적인 카디널리티 정보를 담는 식으로 타입을 구분한다.
- 아이템이 0개 아니면 1개라면 Mono를 써서 표현력을 좀 더 좋게 해주는 식이다.
- 연산자도 그에 맞는것만 지원하고..
- Mono의 연산자들은 버퍼 중복, 값비싼 동기화 작업 등이 생략되어 있다.
- Flux와 Mono는 서로 쉽게 변환할 수 있다.
- Flux<T>.collectList() = Mono<List<T>>
- Mono<T>.flux() = Flux<T>
3-1. Flux<T>
- Flux는 여러 요소(0 ~ N)를 생성할 수 있는 리액티브 스트림이다.
- 메모리 부족을 야기하지 않고도 무한대의 리액티브 스트림을 만들 수 있다.
- Flux.range(0, 5).repeat()
- 다만, 이것을 수집하려고 시도하면 OOM이 날 것이다. .collectList().block()
3-2. Mono<T>
- Mono는 최대 하나의 요소(0 ~ 1)를 생성할 수 있는 리액티브 스트림이다.
- CompletableFuture<T>와 의미론적으로 동일해서, 비슷한 용도로 사용할 수 있다. 그러나 CompletableFuture는 값을 반환해야하고, 즉시 시작한다는 차이가 있다. Mono 는 구독자가 없으면 가만히 있고.
- Mono<Void>를 통해, 뭔가 완료되었다고 알리는데 활용할 수 있다.
4. 리액티브 스트림 시퀀스 생성하기
- Flux와 Mono는 많은 팩토리 메소드를 제공한다.
4-1. Flux
- Flux.just("A", "B")
- Flux.fromArray(new Integer[] { 2, 4, 8 })
- Flux.fromIterable(Arrays.asList(3, 6, 9))
- Flux.range(1, 5);
- Flux.from(publisher) : 다른 Publisher를 Flux로 변환
- Flux.empty()
- Flux.never() : onComplete, onError 신호까지 보내지 않음
- Flux.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
- Flux.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성
4-2. Mono
- Mono.just("devljh")
- Mono.justOrEmpty(null) : nullable, Optional.empty 모두 가능
- Mono.fromCallable(this::httpRequest)
- Mono.fromRunnable(runnable)
- Mono.fromSupplier(supplier)
- Mono.fromFuture(future)
- Mono.fromCompletionStage(completionStage)
- Mono.from(publisher) : 다른 Publisher를 Mono로 변환
- Mono.empty()
- Mono.never() : onComplete, onError 신호까지 보내지 않음
- Mono.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
- Mono.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성
4-2-1. just VS fromSupplier VS defer
- 위 3개는 Mono 시퀀스를 만드는 데, 어떤 차이가 있을까?
- just는 즉시 시퀀스를 만든다.
- fromSupplier과 defer는 구독시점에 시퀀스를 생성한다. (lazy 처리)
- fromSupplier 는 Supplier<? extends T> supplier 를 인자로 받는다. Mono가 아닌 값에 사용하면 된다.
- defer 는 Supplier<? extends Mono<? extends T>> supplier 를 인자로 받는다. 이미 Mono로 반환되는 메소드에 사용하는 게 좋다.
- 즉, 아래 경우에는 fromSupplier 가 더 적절하고 깔끔해보이는 상황이다.
@Test
public void defer_테스트() throws Exception {
long start = System.currentTimeMillis();
// just
Mono<Long> clock1 = Mono.just(System.currentTimeMillis());
Thread.sleep(5000);
long result1 = clock1.block() - start;
log.info("흐른 시간 = {}", result1); // 5초가 지났으나, 위에서 즉발실행되어서 0 출력
// fromSupplier
Mono<Long> clock2 = Mono.fromSupplier(System::currentTimeMillis);
Thread.sleep(5000);
long result2 = clock2.block() - start;
log.info("흐른 시간 = {}", result2); // 10초 지남
// defer
Mono<Long> clock3 = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
Thread.sleep(5000);
long result3 = clock3.block() - start;
log.info("흐른 시간 = {}", result3); // 15초 지남
}
// 실행결과
23:41:03.091 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 0
23:41:08.106 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 10108
23:41:13.128 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 15123
5. 리액티브 스트림 구독하기
- Flux와 Mono를 구독하려면 subscribe() 메소드를 사용하면 된다. 메소드 시그니처는 아래와 같다.
- 내부적으로는 new LambdaSubscriber<>(...) 를 통해 Subscriber를 생성해서 구독한다.
- LambdaSubscriber 내부 코드를 보면, Subscription.request를 조정하지 않는다면, 기본적으로 s.request(Long.MAX_VALUE) 이다.
- Disposable subscribe() : 구독은 하지만, onNext, onError, onComplete 처리는 하지 않는다.
- Disposable subscribe(Consumer<? super T> consumer) : onNext 만 처리한다.
- Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) : onNext, onError 만 처리한다.
- errorConsumer 는 NotNull 이다.
- Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer) : onNext, onError, onComplete 를 처리한다.
- Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer) : onNext, onError, onComplete 를 처리하고, Subscription(요청량 조절, 구독취소) 도 직접 제어한다.
- 해당 메소드는 Reactor 3.5 부터 제거될 예정이다. request 제어를 잊는 경우가 많아서 삭제된다고 한다.. 밑의 subscribeWith 를 쓰라고 한다.
- <E extends Subscriber<? super T>> E subscribeWith(E subscriber) : 밑의 메소드와 동일한데, 반환을 해주는 차이가 있다.
- void subscribe(Subscriber<? super T> actual) : 리액티브 스트림 표준 Publisher 인터페이스의 subscribe() 를 재정의했기 때문에, 이것도 사용할 수 있다. 다양한 조절을 할 순 있지만, 이걸 직접쓸 일은 거의 없긴 하다.
5-1. Disposable 인터페이스
- Reactor 코어에서 제공하는 인터페이스이며, @FunctionalInterface 이다.
- Subscription을 취소할 수 있다.
@Test
public void 구독취소_테스트() throws Exception {
Disposable disposable = Flux.interval(Duration.ofMillis(500))
.subscribe(i -> log.info("{}", i));
Thread.sleep(2000);
disposable.dispose();
}
// 실행결과
01:24:46.144 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 0
01:24:46.652 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 1
01:24:47.158 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 2
01:24:47.657 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 3
5-2. Custom Subscriber 로 구독하기
5-2-1. 권장하지 않는 방법
- 리액티브 스트림 표준 Subscriber 인터페이스를 직접 구현하는 것은 쉬운 일이 아니다.
- 제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야한다.
- 아래는 동작은 하지만, 잘못된 구현이다.
@Test
public void Subscriber_잘못된_구현() {
Subscriber<String> subscriber = new Subscriber<String>() {
/*
* 저번 포스팅의 리액티브 스트림 파이프라인 병렬화에서 보았듯이,
* 발행과 구독은 각각 다른 쓰레드에서 처리될 수 있다.
* 따라서, volatile 키워드를 사용한다.
*/
private volatile Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // 구독 후, 최초 요청
}
@Override
public void onNext(String s) {
log.info("onNext = {}", s);
this.subscription.request(1); // 데이터 수신 후, 추가 요청
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
log.info("onComplete");
}
};
Flux.just("A", "B", "C").subscribe(subscriber);
}
// 실행 결과
01:48:54.609 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A
01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B
01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C
01:48:54.613 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete
- this.subscription.request(n) 에서 숫자가 0 이하인지, 구독(subscription) 상태는 정상인지 등을 확인하지 않고, 1개씩 계속 요청했기 때문이다.
5-2-2. 안전한 구현 방법
- 리액터 프레임워크에서 미리 만들어둔, BaseSubscriber 추상클래스를 상속해서 구현한다.
- Subscriber의 onSubscribe, onNext, onError, onComplete 는 모두 재정의할 수 없게 final 로 선언되어 있다.
- 제공하는 hook 을 통해서, 상황에 따라 얼마든지 조정이 가능하다.
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable {
protected void hookOnNext(T value){
// NO-OP
}
@Override
public final void onNext(T value) {
Objects.requireNonNull(value, "onNext");
try {
hookOnNext(value);
}
catch (Throwable throwable) {
onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
}
}
// 생략...
}
- 아래는 BaseSubscriber 로 동일하게 다시 구현한 코드이다.
@Test
public void Subscriber_안전한_구현() {
Subscriber<String> subscriber = new BaseSubscriber<String>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1); // 구독 후, 최초 요청
}
@Override
public void hookOnNext(String s) {
log.info("onNext = {}", s);
request(1); // 데이터 수신 후, 추가 요청
}
@Override
public void hookOnComplete() {
log.info("onComplete");
}
};
Flux.just("A", "B", "C").subscribe(subscriber);
}
// 실행 결과
02:03:22.931 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A
02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B
02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C
02:03:22.933 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete
출처: https://sjh836.tistory.com/185?category=679845 [빨간색코딩]
'JAVA > Reactor,RxJava,Reactive Streams' 카테고리의 다른 글
Spring5 리액티브 (Web flux) (0) | 2021.11.12 |
---|---|
java, stream api에서 NPE 발생 주의 (0) | 2021.07.30 |
Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현) (0) | 2021.03.23 |
Reactive와 Spring 4 (C10K, 리액티브 선언문, 리액티브 스프링 등장 전) (0) | 2021.03.23 |
RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현) (0) | 2021.03.21 |
Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식) (0) | 2021.03.21 |