RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현)

2021. 3. 21. 02:41 JAVA/Reactor,RxJava,Reactive Streams

1. RxJava는?

  • https://github.com/ReactiveX/RxJava
  • 리액티브 프레임워크 중 하나이다.
    • 다른 구현체로는 Akka, 리액터 등
  • RxJava 1.3.8을 끝으로 EOL 되었지만, 개념을 익히는 용도로만 사용해보자
    • RxJava 2.x 나 3.x 사용 권장
    • 아래에서 사용하는 API들은 매우 기본적인 거라, 다른 버전에서도 호환됨

 

2. 스트림의 생산과 소비

@Test
public void 스트림의_생산과_소비() {
    // 스트림 생산 : 이벤트 생성기
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, reactive world!");
            sub.onNext("저는 빨간색소년입니다.");
            sub.onCompleted();
        }
    }); // 구독자가 없으므로, 이벤트가 전파되지는 않는다.

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onCompleted() {
            log.info("수행 완료!");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    // 실제 구독이 발생할 때, 위 코드들이 실행된다
    observable.subscribe(subscriber);

    // .forEach 는 subscriber 를 내가 만들진 않았지만, 내부적으로 만들어서 사용한다. (아래 메소드 참조)
    observable.forEach(log::info);
}

// 내부 코드
public final void forEach(final Action1<? super T> onNext) {
    생략...
    subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

// 실행결과
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수행 완료!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.

 

3. 비동기 시퀀스

@Test
public void 비동기_시퀀스() throws Exception {
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(e -> log.info("수신 데이터 = {}", e));
    Thread.sleep(5000);
    subscription.unsubscribe();
    Thread.sleep(5000);
}

// 실행결과
17:10:13.562 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 0
17:10:14.546 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 1
17:10:15.554 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 2
17:10:16.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 3
17:10:17.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 4
  • 스트림의_생산과_소비() 와 무엇이 달라서, 비동기로 실행될까?(=main 스레드에서 실행되지 않을까?)
  • interval() 메소드는 rxjava 내부의 computationScheduler를 사용한다. 해당 스케줄러는 코어 갯수와 같으며, 계산 전용으로 쓰길 권장한다.
public static Observable<Long> interval(long interval, TimeUnit unit) {
    return interval(interval, interval, unit, Schedulers.computation());
}

 

4. 기본적인 연산자

  • Observable과 Subscriber 만으로도 다양하게 워크플로우를 구현할 수 있다.
    • 이 문구 때문에 아래에서 직접 구현을 해보았음..
  • 연산자들을 스트림의 원소를 조정하거나 스트림 구조 자체를 변경한다.
  • 아래에선 기본적인 연산자 3가지에 대해서만 살펴본다.
  • https://rxmarbles.com/

 

4-1. map()

  • <R> Observable<R> map(Function<? super T, ? extends R> func)
  • T를 R로 변환하는 함수를 통해 Observable<T>를 Observable<R>로 변환한다.

 

4-1-1. 사용 예제

Observable.range(0, 5)
    .map(i -> i * 2)
    .forEach(i -> log.info("결과 = {}", i));

// 실행결과
18:24:55.143 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.144 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8

 

4-1-2. 코드 레벨에선 어떻게 동작하는 것일까?

// map 연산자
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    // 현재 Observable 과 lambda Function을 조합해서 새로운 Observable 을 만든다.
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

// OnSubscribeMap 은 Observable.OnSubscribe 이다.
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    // 인자 o 는 forEach의 ActionSubscriber 가 넘어올 것이다. (이유는 위에 언급되어있음)
    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);

        // 현재의 Observable (= Observable.range(0, 5)) 을 MapSubscriber 가 구독한다.
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual; // foreach의 ActionSubscriber 이다.
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t); // i -> i * 2 수행
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result); // ActionSubscriber의 onNext를 호출한다. log.info를 찍게해두었으니, 실행결과처럼 콘솔이 찍히겠지..
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

    }
}
  • 정리하면, Observable.range(0, 5)를 MapSubscriber 가 구독하는데, MapSubscriber 내부적으로 연산 후에 ActionSubscriber를 호출하고 있다.
  • 나머지 연산자들도 같은 방식으로 동작함

 

4-1-3. 직접 구현

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        // OnSubscribeRange 에서도 반복문에서 onNext(index); 하다가 if (index == endIndex) 면 onCompleted 하고 끝낸다.
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

Function<Integer, Integer> transformer = i -> i * 2;

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    @Override
    public void onNext(Integer i) {
        i = transformer.apply(i); // OnSubscribeMap 에서도 subscriber.onNext(transformer.apply(t))
        log.info("결과 = {}", i);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

// 람다 사용
Observable.create(sub -> {
    for (int i = 0; i < 5; i++) {
        sub.onNext(i);
    }
    sub.onCompleted();
}).subscribe(i -> {
    i = transformer.apply((Integer) i); // 타입추론을 할 수 없네...
    log.info("결과 = {}", i);
});

// 실행 결과
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8

 

4-2. filter()

  • Observable<T> filter(Predicate<? super T> predicate)

 

4-2-1. 사용 예제

Observable.range(0, 5)
    .filter(i -> i % 2 == 0)
    .forEach(i -> log.info("결과 = {}", i));

// 실행 결과
18:35:10.314 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4

 

4-3. reduce()

  • Observable<T> reduce(BiFunction<T, U, R> accumulator)

 

4-3-1. 사용 예제

Observable.range(0, 5)
        .reduce((x, y) -> x + y)
        .forEach(i -> log.info("결과 = {}", i));

// 실행결과
19:07:59.245 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10

 

4-3-2. 직접 구현

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

BiFunction<Integer, Integer, Integer> accumulator = (x, y) -> x + y;

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    private Integer result = null;

    @Override
    public void onNext(Integer i) {
        Integer temp = result;
        if (temp == null) {
            result = i;
        } else {
            result = accumulator.apply(temp, i);
        }
    }

    @Override
    public void onCompleted() {
        log.info("결과 = {}", this.result);
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

// 실행 결과
19:07:59.250 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10



출처: https://sjh836.tistory.com/183?category=679845 [빨간색코딩]