Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현)

2021. 3. 23. 01:43 JAVA/Reactor,RxJava,Reactive Streams

본격적인 리액티브 프레임워크를 사용하기 전에,
옵저버 패턴 - 발행구독 패턴으로 만들어진 Spring Event와 SSE를 통해 리액티브 어플리케이션을 만들어보자

cf) 옵저버 패턴 포스팅 : https://sjh836.tistory.com/180

1. Spring 에서 Event 를 다룬다?

  • 스프링 프레임워크에서는 EventListener를 통해 발행-구독(Pub/Sub) 패턴, 이벤트 버스(EventBus) 등을 지원한다.
    • ApplicationContext 레벨에서 지원
  • 도메인 서비스간 강한 결합, 강한 의존성을 Event 기반으로 풀어내면, 느슨한 결합 등을 얻을 수 있다.
    • 개인적으론, 이것은 상황에 따라 장점이 될수도 있고, 단점이 될 수도 있다고 본다.
    • 핵심 비즈니스 이외, 부가적인 비즈니스 로직들(핵심로직 후 Web Push 처리, 회원가입 후 환영메일전송 등)과 같은 상황에선 괜찮을 것 같다.

1-1. @EventListener

  • Spring 4.2 이전에는 ApplicationEvent 를 상속받은 CustomEvent를 ApplicationEventPublisher 을 통해 publishEvent(e) 하고 ApplicationListener<CustomEvent>.onApplicationEvent 를 구현해야 했다.
  • 4.2 버전부터는 이것이 편해져서, CustomEvent 도 POJO로 작성하고, 이벤트 핸들러도 @EventListener만 선언하면 된다.
    • ApplicationEventPublisher 는 동일
  • @Order : 해당 어노테이션을 통해 이벤트 핸들러 간에 우선순서를 제어할 수 있다.
  • @Async : 기본적으로 Spring Event 는 동기적이다. 하지만 @Async 를 통해 비동기로 동작할 수 있다.
    • Executor 쓰레드풀을 사용
  • (@TransactionalEventListener 에 대해선, 다음 기회에 다루어보겠다..)

2. SSE(Server-Sent Events)

  • 서버에서 클라이언트로 일방적으로 데이터를 전달할 수 있는 간단한 방법으로 websocket과 sse가 있다.
    • 이걸 활용하지 않는다면 주기적으로 polling 해야하는데, 리액티브 디자인에 맞지 않음
    • 둘 중에, 구현이 더 간단한 sse를 활용
    • cf) spring websocket : https://sjh836.tistory.com/166
      • 1-4. http streaming 중 하나가 SSE 다.
  • 장점 : 단방향이라 구현이 간단, EventSource 는 접속에 문제가 있으면 자동으로 재연결 시도
  • 단점 : IE에선 sse를 지원하지 않음, client 에서 연결을 끊어도(=페이지를 떠나도) 서버단에서 감지가 어려움

3. Spring MVC의 비동기 HTTP 통신 기법

  • 서블릿3.0 부터는 비동기로 HTTP 요청을 처리할 수 있어졌다.
  • 예를들면, 서블릿 쓰레드(nio-8080-exec-1)가 요청을 받고, 다른 스레드(executor-1)에게 작업을 주고, nio-8080-exec-1는 해제되어 버림. executor-1가 작업을 끝내면 서블릿 쓰레드(nio-8080-exec-2)가 다시 받아서 응답을 보내주는 개념
  • 즉, 서블릿 리소스 입장에서 비동기 통신인 것이지, 결국 특정 task 에 대해 단위시간 당 어떤 쓰레드는 동기/블로킹 되고 있다.
    • 서블릿 쓰레드의 고갈이 주요한 화두였던 듯?

3-1. 컨트롤러의 반환 타입

  • Callable<T> : 컨트롤러의 메소드는 곧바로 Callable로 wrapping된 값을 반환하여, 서블릿 Request 쓰레드를 해제시킨다. (응답은 계속 열려있음) 이후, Spring 컨테이너가 관리하는 TaskExecutor 가 Callable 내 작업을 끝마치고, 다시 서블릿 컨테이너에 진짜 값을 넣어주는 방식이다.
    • 내부적으로는 DispatcherServlet가 WebAsyncManager에게 컨트롤러한테 받은 Callable 을 넘겨주고 끝남. WebAsyncManager가 작업끝내고, DispatcherServlet에게 다시 dispatch 하여 응답하는 형식
    • timeout, executor 를 설정하고 싶으면, WebAsyncTask<T> 를 사용
  • DeferredResult<T> : Callable과 비슷하지만, 다른 쓰레드에게 할당하고 그런게 아니라, 단순히 DeferredResult.setResult(결과값) 을 해야 응답이 내려간다.
    • setResult 으로 시점을 제어할 수 있다는 점에서, long polling 에서 많이 사용하는 모양이다.
    • 요청을 Queue<DeferredResult>에 쌓아두고, event 가 발생하면 queue.forEach(dr -> dr.setResult(event)) 으로 한번에 응답하는 형식
  • ListenableFuture<T> : 내부적으론, ListenableFuture의 callback 에서 DeferredResult.setResult 해주는 식으로 동작한다.
    • Future 인터페이스 확장, AsyncRestTemplate 의 기본 반환형이다.
  • ResponseBodyEmitter : 요청에 대해 하나 이상의 비동기 응답을 리턴할 때 주로 쓰인다.
    • DeferredResult 와 동작은 비슷하나, emitter.send 를 여러번 할 수 있다는게 다른 듯
  • SseEmitter : SSE에 쓰이며, ResponseBodyEmitter 를 상속받음
    • ContentType이 text/event-stream 이다.
  • StreamingResponseBody : OutputStream 전송에 쓰이며, ResponseBodyEmitter 를 상속받음
  • CompletableFuture

4. Spring EventListener와 SSE 로 리액티브 어플리케이션 만들기

  • 상황정의 : 유저가 회원가입을 했을 때, Signup Event를 전파하여, 특정 엔드포인트를 구독하고 있는 유저들에게 비동기적으로 알려주는 어플리케이션이다.

4-1. Controller

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
    private static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;

    private Set<SseEmitter> emitterSet = new CopyOnWriteArraySet<>();

    @GetMapping(value = "/notice", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter signup(HttpServletRequest request) {
        log.info("SSE stream 접근 : {}", request.getRemoteAddr());

        SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT);
        emitterSet.add(emitter);

        emitter.onTimeout(() -> emitterSet.remove(emitter));
        emitter.onCompletion(() -> emitterSet.remove(emitter));

        return emitter;
    }

    @Async
    @EventListener
    public void onSignupEvent(final SignupEvent signupEvent) {
        log.info("신규 회원 = {}, 이벤트 구독자 수 = {}", signupEvent.getUser(), emitterSet.size());

        List<SseEmitter> deadEmitters = new ArrayList<>();
        emitterSet.forEach(emitter -> {
            try {
                emitter.send(signupEvent, MediaType.APPLICATION_JSON);
            } catch (Exception ignore) {
                deadEmitters.add(emitter);
            }
        });
        emitterSet.removeAll(deadEmitters);
    }
}

 

4-2. Service

@Service
@RequiredArgsConstructor
public class UserService {
    private final ApplicationEventPublisher applicationEventPublisher;

    public void signup(final User user) {
        // 회원가입 핵심 비즈니스 로직...

        // 부가적 비즈니스 수행 등을 위하여 이벤트 전파
        applicationEventPublisher.publishEvent(new SignupEvent(user));
    }
}

 

4-3. SignupEvent

@Getter
@AllArgsConstructor
public class SignupEvent {
    private User user;
}

 

4-4. 회원가입 발생을 위한 스케줄러

@Component
@RequiredArgsConstructor
public class SignupScheduler {
    private final UserService userService;

    private Random random = new Random();

    @Scheduled(fixedDelay = 2000)
    public void signupTask() {
        userService.signup(createTestUser());
    }

    private User createTestUser() {
        return new User("빨간색소년", random.nextInt(30));
    }
}

 

4-5. Config

@Configuration
public class Config implements AsyncConfigurer, SchedulingConfigurer {
    @Override
    public Executor getAsyncExecutor() { // AsyncConfigurer 구현을 통해 공통Pool 생성
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("ljh-executor-");
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(5);
        executor.initialize();
        return executor;
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(4);
        scheduler.setThreadNamePrefix("ljh-scheduler-");
        scheduler.initialize();
        scheduledTaskRegistrar.setTaskScheduler(scheduler);
    }
}

 

4-6. 결과 및 정리

구독자가 없다가, 엔드포인트에 접근하자 구독자가 생겼다.

브라우저 페이지를 종료하니 구독자가 사라졌다

  • 회원가입이 발생할 때, 구독자들에게 이벤트가 잘 전달된다는 점에서 반응성(리액티브)를 가졌다고 볼 순 있다.
  • 그러나, 구독자가 없을 때도 계속 이벤트가 발생되고 있다. (자원 낭비)
  • Spring ApplicationContext 에 의존성이 있어서, 테스트 등이 쉽지않겠다
    • Guava EventBus 등이 대안이 될 순 있을 듯
  • 비동기 처리라곤 하지만, 결국 AsyncExecutor 라는 다른 쓰레드풀에서 처리하는 것임
  • 어플리케이션 내 이벤트버스들은 다 갖고있는 문제일텐데, 스케일아웃 이슈가 있겠다.

5. RxJava 1.x 와 SSE 로 똑같이 만들어보자

  • 위에서는 Spring Event 를 통해 Event를 전파하고 수신받아서 구현했다면, 이번에는 RxJava의 발행 - 구독 패턴으로 만들어보자

5-1. Controller

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/user")
public class UserController2 {
    private static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;

    private final SignupScheduler2 signupScheduler2;

    @GetMapping(value = "/notice2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public RxSseEmitter signup(HttpServletRequest request) {
        log.info("SSE stream 접근 : {}", request.getRemoteAddr());

        RxSseEmitter<User> emitter = new RxSseEmitter<>(SSE_SESSION_TIMEOUT);

        // 회원가입 스트림 소스를 emitter가 구독한다.
        signupScheduler2.signupUserStream().subscribe(emitter.getSubscriber());

        return emitter;
    }
}

 

5-2. RxSseEmitter

public class RxSseEmitter<T> extends SseEmitter {
    private Subscriber<T> subscriber;

    public RxSseEmitter(long timeout) {
        super(timeout);

        this.subscriber = new Subscriber<T>() {
            @Override
            public void onNext(T user) {
                try {
                    // 4-1번에서는 emitter.send(signupEvent) 하는 코드 부분이겠다.
                    RxSseEmitter.this.send(user);
                } catch (Exception e) {
                    unsubscribe();
                }
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }
        };

        onCompletion(subscriber::unsubscribe);
        onTimeout(subscriber::unsubscribe);
    }

    public Subscriber<T> getSubscriber() {
        return subscriber;
    }
}

 

5-3. 회원가입 발생을 위한 스케줄러

@Component
@RequiredArgsConstructor
public class SignupScheduler2 {
    private final UserService2 userService2;

    private Observable<User> hotStream;
    private Random random = new Random();

    @PostConstruct
    private void signupTask() {
        // 콜드스트림에서 핫스트림으로 만든다. 구독자가 1명이상 있을때, 2초마다 회원가입을 발생시켜준다.
        // 다만, 핫스트림이므로 구독자가 N명이어도 동일한 데이터를 전파한다.
        this.hotStream = Observable.interval(2000, TimeUnit.MILLISECONDS)
                .map(tick -> createTestUser(tick))
                .share(); // share() = publish() + refCount()
        //signupUserStream().subscribe(userService2::signup); // 구독자 2개 상황 재현용
    }

    public Observable<User> signupUserStream() {
        return this.hotStream;
    }

    private User createTestUser(long tick) {
        System.out.println(tick);
        return new User("빨간색소년", random.nextInt(30));
    }
}

 

5-4. Service

@Slf4j
@Service
public class UserService2 {
    public void signup(final User user) {
        log.info("회원가입을 시도합니다. user = {}", user);

        // 회원가입 핵심 비즈니스 로직...
    }
}

 

5-5. 결과 및 정리

구독자가 없으니, 데이터가 전달되고 있지 않았다. (자원 낭비X)

브라우저로 접근하니 sse가 구독을 시작하면서 회원가입한 유저 데이터가 전달되기 시작

5-3번의 주석을 해제하여 bean 생성시점에 구독 1개 + sse 접근으로 구독 1개 = 총 구독 2개 상황에도 핫스트림 동작

콜드스트림에서의 동작. 아예 새로운 회원가입 유저스트림이 발생

  • 구독자가 없으니, 데이터가 전달되고 있지 않았다. (자원 낭비X)
  • 구독자가 N개 되는 상황에서도, 핫 스트림을 사용하니, 중간데이터부터 구독을 한다. (형광펜 부분)
    • 여러 브라우저에서 동시에 요청을 와도 마찬가지다. (4번에서는 컨트롤러 내 컬렉션에 모두 저장했다가, emitterSet.forEach - send 하는 방식으로 구현했음)
  • 만약, 콜드 스트림이었다면, 아예 스트림 소스부터 새로만들어졌으므로, 아래와 같이 나온다. (share연산자 제거하면 재현된다)
    • 우리가 정의한 상황과는 맞지 않으므로 핫스트림을 쓰는게 맞음



출처: https://sjh836.tistory.com/181?category=680970 [빨간색코딩]