Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식)
- 참조문서
1. Reactive Streams 란?
- 리액티브 스트림 스펙을 정의하고, 인터페이스를 제공하는 reactive-streams.org 를 살펴보면, 다음과 같다.
- Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
- non-blocking back pressure 로 비동기적 스트림처리를 제공하는 표준
- 리액티브 스트림은 관찰자(Observer) 패턴, 반복자(Iterator) 패턴, 함수형 패러다임의 조합으로 정의되곤 한다.
1-1. 왜 표준 Reactive Streams가 필요했나?
- 비동기 요청을 위해 CompletableStage, ListenableFuture 등 다양한 라이브러리들을 사용할 수 있는데, 문제는 각각 호환이 안된다는 것이다. 상호 변환을 위해서는 유틸성 로직을 따로 작성해야 한다.
- ex) spring AsyncRestTemplate은 ListenableFuture 을 반환하는데, 이걸 상위에서는 CompletableFuture 로 쓰고 싶은 경우..
- 초기에 가장 인기있던 RxJava 1.x 에 의존하던 다른 라이브러리들이 기능들을 막 추가하면서, 충돌, 버전문제 등이 이슈가 됨
- RxJava 1.x 가 따라가지 못함
2. 관찰자 패턴의 문제점과 결합
- 먼저, 용어부터 정리해보자
- Publisher = Subject = Observable = Producer
- Subscriber = Observer = Consumer
- Publisher는 Subscriber에게 Event를 Push한다.
- cf) 옵저버 패턴 : https://sjh836.tistory.com/180
2-1. 관찰자 패턴의 한계와 리액티브 스트림에서의 해법
- Observer 가 준비가 되지 않았는데, 이벤트(=데이터)가 전달된다면?(=흐른다면?)
- 제대로 처리하지 못할 것이다
- 연속되는 데이터의 끝을 알려주지 못한다.
- 리액티브 스트림에서는 Iterator 패턴과 조합으로 해결
- Observer 가 이벤트를 처리하는 속도보다, Subject 가 이벤트를 발행하는 속도가 빠르다면?
- (빠른 프로듀서와 느린 컨슈머 이슈)
- Subject 는 이벤트를 발행하고 전달하는 데만 집중할 뿐이다.
- Observer 는 유실없이 처리하기 위해 Queue를 따로 두거나(이래도 결국 쌓여서 터짐), 별도 조치가 필요할 것이다.
- 이전 포스팅에서 언급한, Spring EventBus를 사용하는 방식이라면, AsyncTaskExecutor 쓰레드풀이 쉬지않을 것
- 무제한 Queue = OOM
- 크기제한 Queue = drop으로 인한 유실
- BlockingQueue = 비동기동작을 사실상 모두 무효화..
- 리액티브 스트림에서는 back pressure 를 통해, 이 문제를 해결한다.
2-2. 결합하기
// 관찰자 패턴의 주체
public interface Observable<T> {
void registerObserver(Observer<T> observer); // 리액티브 스트림에서는 Publisher - Subscriber 가 한 쌍이다. (복잡성, 동시성, 메모리누수 등을 해결)
void unregisterObserver(Observer<T> observer); // 리액티브 스트림에서는 Subscription.cancle 을 통해 구독을 취소할 수 있다.
void notifyObservers(T event); // 리액티브 스트림에선 Subscription를 통해 Subscriber.onNext, onComplete, onError 로 전달된다.
}
// 리액티브스트림의 발행자
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s); // 관찰자의 registerObserver
}
3. 반복자 패턴과 결합
- next()를 통해 데이터를 리턴받기 때문에, Pull이라고 볼 수 있다.
- 데이터의 끝을 hasNext()를 통해 알 수 있다.
- 이것을 결합해보면, 다음과 같다.
// 관찰자 패턴의 관찰자
public interface Observer<T> {
void observe(T event);
}
// 반복자 패턴의 반복자
public interface Iterator<E> {
boolean hasNext();
E next();
}
// 리액티브 스트림의 구독자
public interface Subscriber<T> {
public void onNext(T t); // 관찰자의 observe + 반복자의 next
public void onComplete(); // 반복자의 hasNext
public void onError(Throwable t); // next에서 Exception이 발생할 때, 전파를 위하여
}
4. Back Pressure (배압)
- 빠른 Publisher - 느린 Subscriber 문제를 해결하는 원리이다.
- Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것이다.
- 중간에 Queue 같은게 필요없어짐
- 이것을 dynamic pull 이라고 부른다.
- 리액티브 스트림에서는 이것을 Subscription 로 제어한다. request 메소드를 통해 요청량을 조절
- request 를 LONG.MAX 개씩 요청하면 순수 push 모델이 되는 것이고,
- request 를 onNext 당 1개씩 요청하면 pull 모델이 된다.
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s); // 전달받은 Subscription 은 Publisher와 Subscriber 사이의 통신 매개체가 된다.
// 나머지는 위와 동일해서 생략...
}
5. 최종적인 흐름
- Subscriber 가 subscribe 메소드를 통해 Publisher 에게 구독을 요청
- Publisher 는 onSubscribe 메소드로 Subscriber 에게 Subscription 를 전달
- Subscriber 는 Subscription.request 을 통해, 자신에게 데이터를 흘려줄 것을 요구
- Publisher 는 Subscription 를 통해 Subscriber.onNext로 데이터를 전달한다.
- Subscriber 는 내부에 Subscription를 set하였기 때문 (2번)
- 전달이 잘 끝났으면, onComplete, 오류났다면 onError 로 끝낸다.
6. Processor
- Publisher 와 Subscriber 를 혼합한 Processor 라는 것도 있다.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
- 이 둘 사이에서 몇 가지 처리 단계를 유연하게 추가할 수 있다.
- 하나의 subscriber 의 결과물을 다른 subscriber 에 그대로 전달하거나, 변형할 때도 사용할 수있다. 마치 새로운 Publisher 처럼 행동하는 것이다.
- ex1) 구독자 중 기준에 일치하는 구독자들에게만 전송한다던지..
- ex2) 멀티캐스팅
7. jdk9 Flow
- 리액티브 스트림은 jdk9 에서도 Flow API 로 반영이 되었다.
- 리액티브 스트림 타입들을 Flow 로도 쉽게 변환이 가능하다. (ex. FlowAdapters)
8. 리액티브 스트림은 어떻게 비동기 및 병렬을 구현할까?
8-1. 리액티브 스트림 규약
- reactive-streams.org 에서, Publisher 가 생성하고 Subscriber 가 소비하는 모든 신호는 non-blocking 이어야한다고 정의했다.
- 단순 인터페이스와 규칙만 정해준거니.. 논블로킹은 개발자의 책임인 듯? (사실 Publisher - Subscriber 를 직접 구현할 일은 거의 없긴함)
- 따라서, Publisher - Subscriber 를 구현할 때, block이 되지않도록 해야함
- 전제를 만족했다면, 멀티코어를 모두 활용하기 위해서 Subscriber.onNext 메소드를 병렬로 호출해야하는데, 이것도 규약에 막혀있다.
- on*** 호출은 스레드 안전성을 보장하여 신호를 보내야하며, 멀티 스레드에서 수행될 경우, 외부에서 동기화 요망
- 따라서, 순차적으로만 onNext 를 호출할 수 밖에 없다..
- Publisher 가 멀티스레딩하여 Subscriber.onNext 를 동시에 호출할수 없는 것..
- 그렇다면, 도대체 어떻게 병렬로 구현할 수 있는가?
8-2. 리액티브 스트림 파이프라인의 병렬화
- 파이프는 데이터 소스에서 몇가지 처리 또는 변환 단계를 거쳐 목적지까지 흘러간다.
- 이 때, 데이터 소스에서 메세지 생성이 오래걸리거나 리소스를 많이 소모시킬 수 있고,
- 메세지 생성은 금방되는데, 변환 처리가 오래걸릴 수도 있고,
- 목적지가 데이터를 수신하는데, 리소스를 많이소모하며 오래걸릴 수도 있다.
- 위에서 보듯이, 각 단계를 비동기적으로 메세지를 전달하는 것이다.
- 정리하면, 처리 단계별로. 별도의 스레드에 바인딩되는 컨셉이다.
- 하지만, 이것을 잘 균형있게 분할하고 배치하고 스레드에 바인딩하는 것은 까다로운 작업이다. 다행히, 리액티브 라이브러리들은 이런걸 스케줄러 API로 제공해준다.
출처: https://sjh836.tistory.com/182?category=679845 [빨간색코딩]
'JAVA > Reactor,RxJava,Reactive Streams' 카테고리의 다른 글
Spring5 리액티브 (Web flux) (0) | 2021.11.12 |
---|---|
java, stream api에서 NPE 발생 주의 (0) | 2021.07.30 |
Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현) (0) | 2021.03.23 |
Reactive와 Spring 4 (C10K, 리액티브 선언문, 리액티브 스프링 등장 전) (0) | 2021.03.23 |
Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현) (0) | 2021.03.21 |
RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현) (0) | 2021.03.21 |