Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현)

2021. 3. 21. 02:55 JAVA/Reactor,RxJava,Reactive Streams

리액티브 스트림을 기본적으로 이해해야, Reactor 도 수월하게 이해할 수 있다.
cf) 리액티브 스트림 포스팅 : https://sjh836.tistory.com/182

 

예제코드는 io.projectreactor:reactor-core:3.4.2 기준으로 작성되었다.

1. Reactor의 탄생

  • reactor 1.0은 13년 7월에 출시되었다. spring 프레임워크의 개발팀인 pivotal 에서 만든 오픈소스이다.
    • 리액터 패턴, 함수형 프로그래밍, 메세지 기반 등의 설계와 모범사례들을 통합
    • 리액터 패턴 포스팅 : https://sjh836.tistory.com/184
  • 당시, 스프링 개발팀은 대용량 데이터 개발을 단순화하는 Spring XD 프로젝트를 개발 중이었다.
    • 17년 7월부로 deprecated 되었고, Spring Cloud Data Flow 로 넘어갔다.
  • Spring XD 에 비동기 논블로킹을 지원하기 위해 만들어진 프로젝트가 Reactor 이다.
  • 장점 : spring 과 완벽한 통합, netty 도 지원, 비동기 논블로킹 메세지 처리, 고성능
  • 단점 : Back Pressure 기능 없음, 복잡한 오류처리

 

2. Reactive Stream과 Reactor 3.x

  • reactor 2.0 는 위 단점들을 해결하고, 15년 3월에 나왔다.
    • onOverflowBuffer(), onOverflowDrop() 등을 통해서 Back Pressure 를 지원
  • 15년 4월에는 출시된 리액티브 스트림 표준 스펙이 발표되었다.
    • API 및 규칙만 정의하고, 사용할 수 있는 라이브러리를 제공하진 않았다.
  • reactor 는 리액티브 스트림 표준 스펙을 2.5 마일스톤 버전부터 지원했다. (16년 2월)
  • 하지만, 릴리즈는 3.0으로 했다. (16년 8월)
  • 비슷한 시기에 리액티브 스트림 표준을 지원하는 RxJava 2.0 도 출시되었는데, 가장 큰 차이점은 RxJava는 안드로이드 포함한 jdk6 부터 지원했고, reactor는 jdk8부터 지원했다.

 

3. Publisher<T>의 구현체

  • Flux와 Mono가 있다. 이 둘은 대략적인 카디널리티 정보를 담는 식으로 타입을 구분한다.
  • 아이템이 0개 아니면 1개라면 Mono를 써서 표현력을 좀 더 좋게 해주는 식이다.
    • 연산자도 그에 맞는것만 지원하고..
    • Mono의 연산자들은 버퍼 중복, 값비싼 동기화 작업 등이 생략되어 있다.
  • Flux와 Mono는 서로 쉽게 변환할 수 있다.
    • Flux<T>.collectList() = Mono<List<T>>
    • Mono<T>.flux() = Flux<T>

 

3-1. Flux<T>

  • Flux는 여러 요소(0 ~ N)를 생성할 수 있는 리액티브 스트림이다.
  • 메모리 부족을 야기하지 않고도 무한대의 리액티브 스트림을 만들 수 있다.
    • Flux.range(0, 5).repeat()
    • 다만, 이것을 수집하려고 시도하면 OOM이 날 것이다. .collectList().block()

 

3-2. Mono<T>

  • Mono는 최대 하나의 요소(0 ~ 1)를 생성할 수 있는 리액티브 스트림이다.
  • CompletableFuture<T>와 의미론적으로 동일해서, 비슷한 용도로 사용할 수 있다. 그러나 CompletableFuture는 값을 반환해야하고, 즉시 시작한다는 차이가 있다. Mono 는 구독자가 없으면 가만히 있고.
  • Mono<Void>를 통해, 뭔가 완료되었다고 알리는데 활용할 수 있다.

 

4. 리액티브 스트림 시퀀스 생성하기

  • Flux와 Mono는 많은 팩토리 메소드를 제공한다.

 

4-1. Flux

  • Flux.just("A", "B")
  • Flux.fromArray(new Integer[] { 2, 4, 8 })
  • Flux.fromIterable(Arrays.asList(3, 6, 9))
  • Flux.range(1, 5);
  • Flux.from(publisher) : 다른 Publisher를 Flux로 변환
  • Flux.empty()
  • Flux.never() : onComplete, onError 신호까지 보내지 않음
  • Flux.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
  • Flux.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성

 

4-2. Mono

  • Mono.just("devljh")
  • Mono.justOrEmpty(null) : nullable, Optional.empty 모두 가능
  • Mono.fromCallable(this::httpRequest)
  • Mono.fromRunnable(runnable)
  • Mono.fromSupplier(supplier)
  • Mono.fromFuture(future)
  • Mono.fromCompletionStage(completionStage)
  • Mono.from(publisher) : 다른 Publisher를 Mono로 변환
  • Mono.empty()
  • Mono.never() : onComplete, onError 신호까지 보내지 않음
  • Mono.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
  • Mono.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성

 

4-2-1. just VS fromSupplier VS defer

  • 위 3개는 Mono 시퀀스를 만드는 데, 어떤 차이가 있을까?
  • just는 즉시 시퀀스를 만든다.
  • fromSupplier과 defer는 구독시점에 시퀀스를 생성한다. (lazy 처리)
  • fromSupplier 는 Supplier<? extends T> supplier 를 인자로 받는다. Mono가 아닌 값에 사용하면 된다.
  • defer 는 Supplier<? extends Mono<? extends T>> supplier 를 인자로 받는다. 이미 Mono로 반환되는 메소드에 사용하는 게 좋다.
    • 즉, 아래 경우에는 fromSupplier 가 더 적절하고 깔끔해보이는 상황이다.
@Test
public void defer_테스트() throws Exception {
    long start = System.currentTimeMillis();

    // just
    Mono<Long> clock1 = Mono.just(System.currentTimeMillis());

    Thread.sleep(5000);
    long result1 = clock1.block() - start;
    log.info("흐른 시간 = {}", result1); // 5초가 지났으나, 위에서 즉발실행되어서 0 출력


    // fromSupplier
    Mono<Long> clock2 = Mono.fromSupplier(System::currentTimeMillis);

    Thread.sleep(5000);
    long result2 = clock2.block() - start;
    log.info("흐른 시간 = {}", result2); // 10초 지남


    // defer
    Mono<Long> clock3 = Mono.defer(() -> Mono.just(System.currentTimeMillis()));

    Thread.sleep(5000);
    long result3 = clock3.block() - start;
    log.info("흐른 시간 = {}", result3); // 15초 지남
}

// 실행결과
23:41:03.091 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 0
23:41:08.106 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 10108
23:41:13.128 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 15123

 

5. 리액티브 스트림 구독하기

  • Flux와 Mono를 구독하려면 subscribe() 메소드를 사용하면 된다. 메소드 시그니처는 아래와 같다.
    • 내부적으로는 new LambdaSubscriber<>(...) 를 통해 Subscriber를 생성해서 구독한다.
    • LambdaSubscriber 내부 코드를 보면, Subscription.request를 조정하지 않는다면, 기본적으로 s.request(Long.MAX_VALUE) 이다.
  • Disposable subscribe() : 구독은 하지만, onNext, onError, onComplete 처리는 하지 않는다.
  • Disposable subscribe(Consumer<? super T> consumer) : onNext 만 처리한다.
  • Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) : onNext, onError 만 처리한다.
    • errorConsumer 는 NotNull 이다.
  • Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer) : onNext, onError, onComplete 를 처리한다.
  • Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer) : onNext, onError, onComplete 를 처리하고, Subscription(요청량 조절, 구독취소) 도 직접 제어한다.
    • 해당 메소드는 Reactor 3.5 부터 제거될 예정이다. request 제어를 잊는 경우가 많아서 삭제된다고 한다.. 밑의 subscribeWith 를 쓰라고 한다.
  • <E extends Subscriber<? super T>> E subscribeWith(E subscriber) : 밑의 메소드와 동일한데, 반환을 해주는 차이가 있다.
  • void subscribe(Subscriber<? super T> actual) : 리액티브 스트림 표준 Publisher 인터페이스의 subscribe() 를 재정의했기 때문에, 이것도 사용할 수 있다. 다양한 조절을 할 순 있지만, 이걸 직접쓸 일은 거의 없긴 하다.

 

5-1. Disposable 인터페이스

  • Reactor 코어에서 제공하는 인터페이스이며, @FunctionalInterface 이다.
  • Subscription을 취소할 수 있다.
@Test
public void 구독취소_테스트() throws Exception {
    Disposable disposable = Flux.interval(Duration.ofMillis(500))
            .subscribe(i -> log.info("{}", i));
    Thread.sleep(2000);
    disposable.dispose();
}

// 실행결과
01:24:46.144 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 0
01:24:46.652 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 1
01:24:47.158 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 2
01:24:47.657 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 3

 

5-2. Custom Subscriber 로 구독하기

5-2-1. 권장하지 않는 방법

  • 리액티브 스트림 표준 Subscriber 인터페이스를 직접 구현하는 것은 쉬운 일이 아니다.
  • 제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야한다.
  • 아래는 동작은 하지만, 잘못된 구현이다.
  @Test
  public void Subscriber_잘못된_구현() {
      Subscriber<String> subscriber = new Subscriber<String>() {
          /*
          * 저번 포스팅의 리액티브 스트림 파이프라인 병렬화에서 보았듯이,
          * 발행과 구독은 각각 다른 쓰레드에서 처리될 수 있다.
          * 따라서, volatile 키워드를 사용한다.
          */
          private volatile Subscription subscription;

          @Override
          public void onSubscribe(Subscription subscription) {
              this.subscription = subscription;
              this.subscription.request(1); // 구독 후, 최초 요청
          }

          @Override
          public void onNext(String s) {
              log.info("onNext = {}", s);
              this.subscription.request(1); // 데이터 수신 후, 추가 요청
          }

          @Override
          public void onError(Throwable t) {

          }

          @Override
          public void onComplete() {
              log.info("onComplete");
          }
      };

      Flux.just("A", "B", "C").subscribe(subscriber);
  }

  // 실행 결과
  01:48:54.609 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A
  01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B
  01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C
  01:48:54.613 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete
  • this.subscription.request(n) 에서 숫자가 0 이하인지, 구독(subscription) 상태는 정상인지 등을 확인하지 않고, 1개씩 계속 요청했기 때문이다.

 

5-2-2. 안전한 구현 방법

  • 리액터 프레임워크에서 미리 만들어둔, BaseSubscriber 추상클래스를 상속해서 구현한다.
  • Subscriber의 onSubscribe, onNext, onError, onComplete 는 모두 재정의할 수 없게 final 로 선언되어 있다.
  • 제공하는 hook 을 통해서, 상황에 따라 얼마든지 조정이 가능하다.
  public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable {
      protected void hookOnNext(T value){
          // NO-OP
      }

      @Override
      public final void onNext(T value) {
          Objects.requireNonNull(value, "onNext");
          try {
              hookOnNext(value);
          }
          catch (Throwable throwable) {
              onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
          }
      }

      // 생략...
  }
  • 아래는 BaseSubscriber 로 동일하게 다시 구현한 코드이다.
  @Test
  public void Subscriber_안전한_구현() {
      Subscriber<String> subscriber = new BaseSubscriber<String>() {
          @Override
          public void hookOnSubscribe(Subscription subscription) {
              request(1); // 구독 후, 최초 요청
          }

          @Override
          public void hookOnNext(String s) {
              log.info("onNext = {}", s);
              request(1); // 데이터 수신 후, 추가 요청
          }

          @Override
          public void hookOnComplete() {
              log.info("onComplete");
          }
      };

      Flux.just("A", "B", "C").subscribe(subscriber);
  }

  // 실행 결과
  02:03:22.931 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A
  02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B
  02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C
  02:03:22.933 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete



출처: https://sjh836.tistory.com/185?category=679845 [빨간색코딩]