RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현)
1. RxJava는?
- https://github.com/ReactiveX/RxJava
- 리액티브 프레임워크 중 하나이다.
- 다른 구현체로는 Akka, 리액터 등
- RxJava 1.3.8을 끝으로 EOL 되었지만, 개념을 익히는 용도로만 사용해보자
- RxJava 2.x 나 3.x 사용 권장
- 아래에서 사용하는 API들은 매우 기본적인 거라, 다른 버전에서도 호환됨
2. 스트림의 생산과 소비
@Test
public void 스트림의_생산과_소비() {
// 스트림 생산 : 이벤트 생성기
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, reactive world!");
sub.onNext("저는 빨간색소년입니다.");
sub.onCompleted();
}
}); // 구독자가 없으므로, 이벤트가 전파되지는 않는다.
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
log.info(s);
}
@Override
public void onCompleted() {
log.info("수행 완료!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
// 실제 구독이 발생할 때, 위 코드들이 실행된다
observable.subscribe(subscriber);
// .forEach 는 subscriber 를 내가 만들진 않았지만, 내부적으로 만들어서 사용한다. (아래 메소드 참조)
observable.forEach(log::info);
}
// 내부 코드
public final void forEach(final Action1<? super T> onNext) {
생략...
subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
// 실행결과
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수행 완료!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.
3. 비동기 시퀀스
@Test
public void 비동기_시퀀스() throws Exception {
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> log.info("수신 데이터 = {}", e));
Thread.sleep(5000);
subscription.unsubscribe();
Thread.sleep(5000);
}
// 실행결과
17:10:13.562 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 0
17:10:14.546 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 1
17:10:15.554 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 2
17:10:16.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 3
17:10:17.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 4
- 스트림의_생산과_소비() 와 무엇이 달라서, 비동기로 실행될까?(=main 스레드에서 실행되지 않을까?)
- interval() 메소드는 rxjava 내부의 computationScheduler를 사용한다. 해당 스케줄러는 코어 갯수와 같으며, 계산 전용으로 쓰길 권장한다.
public static Observable<Long> interval(long interval, TimeUnit unit) {
return interval(interval, interval, unit, Schedulers.computation());
}
4. 기본적인 연산자
- Observable과 Subscriber 만으로도 다양하게 워크플로우를 구현할 수 있다.
- 이 문구 때문에 아래에서 직접 구현을 해보았음..
- 연산자들을 스트림의 원소를 조정하거나 스트림 구조 자체를 변경한다.
- 아래에선 기본적인 연산자 3가지에 대해서만 살펴본다.
- https://rxmarbles.com/
4-1. map()
- <R> Observable<R> map(Function<? super T, ? extends R> func)
- T를 R로 변환하는 함수를 통해 Observable<T>를 Observable<R>로 변환한다.
4-1-1. 사용 예제
Observable.range(0, 5)
.map(i -> i * 2)
.forEach(i -> log.info("결과 = {}", i));
// 실행결과
18:24:55.143 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.144 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8
4-1-2. 코드 레벨에선 어떻게 동작하는 것일까?
// map 연산자
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 현재 Observable 과 lambda Function을 조합해서 새로운 Observable 을 만든다.
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
// OnSubscribeMap 은 Observable.OnSubscribe 이다.
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
// 인자 o 는 forEach의 ActionSubscriber 가 넘어올 것이다. (이유는 위에 언급되어있음)
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
// 현재의 Observable (= Observable.range(0, 5)) 을 MapSubscriber 가 구독한다.
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual; // foreach의 ActionSubscriber 이다.
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t); // i -> i * 2 수행
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result); // ActionSubscriber의 onNext를 호출한다. log.info를 찍게해두었으니, 실행결과처럼 콘솔이 찍히겠지..
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
}
}
- 정리하면, Observable.range(0, 5)를 MapSubscriber 가 구독하는데, MapSubscriber 내부적으로 연산 후에 ActionSubscriber를 호출하고 있다.
- 나머지 연산자들도 같은 방식으로 동작함
4-1-3. 직접 구현
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// OnSubscribeRange 에서도 반복문에서 onNext(index); 하다가 if (index == endIndex) 면 onCompleted 하고 끝낸다.
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Function<Integer, Integer> transformer = i -> i * 2;
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer i) {
i = transformer.apply(i); // OnSubscribeMap 에서도 subscriber.onNext(transformer.apply(t))
log.info("결과 = {}", i);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
observable.subscribe(subscriber);
// 람다 사용
Observable.create(sub -> {
for (int i = 0; i < 5; i++) {
sub.onNext(i);
}
sub.onCompleted();
}).subscribe(i -> {
i = transformer.apply((Integer) i); // 타입추론을 할 수 없네...
log.info("결과 = {}", i);
});
// 실행 결과
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8
4-2. filter()
- Observable<T> filter(Predicate<? super T> predicate)
4-2-1. 사용 예제
Observable.range(0, 5)
.filter(i -> i % 2 == 0)
.forEach(i -> log.info("결과 = {}", i));
// 실행 결과
18:35:10.314 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
4-3. reduce()
- Observable<T> reduce(BiFunction<T, U, R> accumulator)
4-3-1. 사용 예제
Observable.range(0, 5)
.reduce((x, y) -> x + y)
.forEach(i -> log.info("결과 = {}", i));
// 실행결과
19:07:59.245 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10
4-3-2. 직접 구현
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
BiFunction<Integer, Integer, Integer> accumulator = (x, y) -> x + y;
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Integer result = null;
@Override
public void onNext(Integer i) {
Integer temp = result;
if (temp == null) {
result = i;
} else {
result = accumulator.apply(temp, i);
}
}
@Override
public void onCompleted() {
log.info("결과 = {}", this.result);
}
@Override
public void onError(Throwable e) {
}
};
observable.subscribe(subscriber);
// 실행 결과
19:07:59.250 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10
출처: https://sjh836.tistory.com/183?category=679845 [빨간색코딩]
'JAVA > Reactor,RxJava,Reactive Streams' 카테고리의 다른 글
Spring5 리액티브 (Web flux) (0) | 2021.11.12 |
---|---|
java, stream api에서 NPE 발생 주의 (0) | 2021.07.30 |
Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현) (0) | 2021.03.23 |
Reactive와 Spring 4 (C10K, 리액티브 선언문, 리액티브 스프링 등장 전) (0) | 2021.03.23 |
Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현) (0) | 2021.03.21 |
Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식) (0) | 2021.03.21 |