JVM/Spring

[Toby Spring Reactive Programming] Schedulers (3)

Hyo Kim 2023. 10. 15. 18:37
728x90
반응형

서론

https://www.youtube.com/watch?v=Wlqu1xvZCak 

토비님의 유튜브 강의를 보고 정리한 내용입니다.


본론

https://kdhyo98.tistory.com/136 이전 코드까지는 모두 main스레드에서 동작했다.

하지만, IO나 큰 계산같은 작업들이 있다면 main스레드가 끝날 때까지 멈추게 되는데,

모바일 어플로 생각하면 버튼을 누를 때 작업을 완료될 때까지 멈추게 된다.

사용자가 작업 처리를 기다리지 않고 다른 이벤트를 받을 수 있도록 백그라운드에서 동작시켜야 하는데,

이걸 main이 아닌 다른스레드에 작업을 넘겨 처리할 수 있다.

 

Ractive Streams 구현

사용자가 다 확인할 필요가 없는 경우에는 백그라운드에서 동작을 시켜 main은 다른 이벤트를 받게 할 수 있다.

이걸 Flow에서는 subscribeOn, publishOn을 통해 비동기로 동작할 수 있게 한다.

 

subsrcibeOn

- 주로 데이터 생성 로직이 느릴 때 사용한다.

- 리액티브 스트림의 시작부분부터 작업을 수행하는 스레드를 변경할 수 있다.

 

publishOn

- 주로 데이터를 소비가 느릴 때 사용한다.

- 리액티브 스트림의 중간 또는 끝 부분에서 작업을 수행하는 스레드를 변경하는 데 사용한다.

- 연산자의 일부 작업을 다른 스레드에서 실행하게 하고 싶을 때 사용하는 편이다.

public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 5)
            .doOnNext(i -> System.out.println("Value " + i + " on thread " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.parallel())
            .doOnNext(i -> System.out.println("Value " + i + " on thread after subscribeOn " + Thread.currentThread().getName()))
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(i -> System.out.println("Value " + i + " on thread after publishOn " + Thread.currentThread().getName()))
            .subscribe();

    TimeUnit.SECONDS.sleep(30);
}

 

Value 1 on thread parallel-1
Value 1 on thread after subscribeOn parallel-1
Value 2 on thread parallel-1
Value 2 on thread after subscribeOn parallel-1
Value 3 on thread parallel-1
Value 3 on thread after subscribeOn parallel-1
Value 4 on thread parallel-1
Value 4 on thread after subscribeOn parallel-1
Value 5 on thread parallel-1
Value 5 on thread after subscribeOn parallel-1
Value 1 on thread after publishOn boundedElastic-1
Value 2 on thread after publishOn boundedElastic-1
Value 3 on thread after publishOn boundedElastic-1
Value 4 on thread after publishOn boundedElastic-1
Value 5 on thread after publishOn boundedElastic-1

만들어보기

@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {

                    @Override
                    public void request(long n) {
                        sub.onNext(1);
                        sub.onNext(2);
                        sub.onNext(3);
                        sub.onNext(4);
                        sub.onNext(5);
                        sub.onComplete();
                    }

                    @Override
                    public void cancel() {
                    }
                });
            }
        };

        Publisher<Integer> subOnPub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                    @Override
                    public String getThreadNamePrefix() {
                        return "subOn";
                    }
                });
                es.execute(() -> pub.subscribe(
                        new Subscriber<Integer>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                sub.onSubscribe(s);
                            }

                            @Override
                            public void onNext(Integer integer) {
                                sub.onNext(integer);
                            }

                            @Override
                            public void onError(Throwable t) {
                                sub.onError(t);
                            }

                            @Override
                            public void onComplete() {
                                sub.onComplete();
                                es.shutdown();
                            }
                        }
                ));
            }
        };

        Publisher<Integer> pubOnPub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                subOnPub.subscribe(new Subscriber<Integer>() {
                    ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                        @Override
                        public String getThreadNamePrefix() {
                            return "pubOn";
                        }
                    });

                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        es.execute(() -> sub.onNext(integer));
                    }

                    @Override
                    public void onError(Throwable t) {
                        es.execute(() -> sub.onError(t));
                        es.shutdown();
                    }

                    @Override
                    public void onComplete() {
                        es.execute(() -> sub.onComplete());
                        es.shutdown();
                    }
                });
            }
        };

        pubOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer i) {
                log.info("onNext:{}", i);
            }

            @Override
            public void onError(Throwable t) {
                log.info("onError:{}", t.toString());
            }

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        });
    }
}
18:16:34.829 [subOn1] INFO three.IntervalEx -- onSubscribe
18:16:34.831 [pubOn1] INFO three.IntervalEx -- onNext:1
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:2
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:3
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:4
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:5
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onComplete

pubOnPub에서 onSubscribe를 제외하고 onNext, onError, onComplete는 pubOn 스레드를 주었기 때문에

onSubscribe는 subOnPub에서 만든 subOn 스레드를 타고 있다.


데몬쓰레드(Demon Thread) vs 사용자쓰레드(User Thread)

맨 처음 Flux예제에서 TimeUnit.SECONDS.sleep(30); 을 통해 30초 딜레이를 주었다.

만약 그 부분이 없다면 Flux 구문은 제대로 실행되지 않고 종료될텐데, 직접만든 예제에서는 딜레이가 없어도 정상적으로 끝까지 진행된다.

이 이유가 데몬쓰레드와 사용자쓰레드의 차이이다.

 

  사용자 쓰레드(User Thread) 데몬 쓰레드(Demon Thread)
장점 - 프로그램의 주요 작업을 수행한다.
- JVM은 모든 사용자 쓰레드가 종료될 때까지 실행을 유지한다.
- 애플리케이션의 시작부터 종료까지 실행될 수 있다.
- 백그라운드에서 작업을 수행한다.
- 모든 사용자 쓰레드가 종료되면 자동으로 종료된다.
- 주로 자원을 정리하거나 로깅과 같은 보조 작업에 사용된다.
단점 - 프로그램 종료 시 모든 사용자 쓰레드가 종료될 때까지 기다려야 한다.
- 종료를 관리하는 것이 복잡할 수 있다.
- 애플리케이션의 주요 작업을 수행하는 데에 적합하지 않다.
- 갑작스럽게 종료될 수 있으므로 주요 작업에 사용하기 어렵다.

 

리액티브 스트림즈의 별도 스레드들이 데몬스레드인 이유는 아마 리소스 관리와 애플리케이션 종료 과정을 단순화하기 위해서로 보인다.

또한, 비동기 작업이 주 스레드의 종료를 방해하지 않기 위해서 주로 데몬스레드를 사용하는 것 같다.

728x90
반응형