[Toby Spring Reactive Programming] Schedulers (3)
서론
https://www.youtube.com/watch?v=Wlqu1xvZCak
토비님의 유튜브 강의를 보고 정리한 내용입니다.
본론
https://kdhyo98.tistory.com/136 이전 코드까지는 모두 main스레드에서 동작했다.
하지만, IO나 큰 계산같은 작업들이 있다면 main스레드가 끝날 때까지 멈추게 되는데,
모바일 어플로 생각하면 버튼을 누를 때 작업을 완료될 때까지 멈추게 된다.
사용자가 작업 처리를 기다리지 않고 다른 이벤트를 받을 수 있도록 백그라운드에서 동작시켜야 하는데,
이걸 main이 아닌 다른스레드에 작업을 넘겨 처리할 수 있다.
Ractive Streams 구현
사용자가 다 확인할 필요가 없는 경우에는 백그라운드에서 동작을 시켜 main은 다른 이벤트를 받게 할 수 있다.
이걸 Flow에서는 subscribeOn, publishOn을 통해 비동기로 동작할 수 있게 한다.
subsrcibeOn
- 주로 데이터 생성 로직이 느릴 때 사용한다.
- 리액티브 스트림의 시작부분부터 작업을 수행하는 스레드를 변경할 수 있다.
publishOn
- 주로 데이터를 소비가 느릴 때 사용한다.
- 리액티브 스트림의 중간 또는 끝 부분에서 작업을 수행하는 스레드를 변경하는 데 사용한다.
- 연산자의 일부 작업을 다른 스레드에서 실행하게 하고 싶을 때 사용하는 편이다.
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 5)
.doOnNext(i -> System.out.println("Value " + i + " on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.parallel())
.doOnNext(i -> System.out.println("Value " + i + " on thread after subscribeOn " + Thread.currentThread().getName()))
.publishOn(Schedulers.boundedElastic())
.doOnNext(i -> System.out.println("Value " + i + " on thread after publishOn " + Thread.currentThread().getName()))
.subscribe();
TimeUnit.SECONDS.sleep(30);
}
Value 1 on thread parallel-1
Value 1 on thread after subscribeOn parallel-1
Value 2 on thread parallel-1
Value 2 on thread after subscribeOn parallel-1
Value 3 on thread parallel-1
Value 3 on thread after subscribeOn parallel-1
Value 4 on thread parallel-1
Value 4 on thread after subscribeOn parallel-1
Value 5 on thread parallel-1
Value 5 on thread after subscribeOn parallel-1
Value 1 on thread after publishOn boundedElastic-1
Value 2 on thread after publishOn boundedElastic-1
Value 3 on thread after publishOn boundedElastic-1
Value 4 on thread after publishOn boundedElastic-1
Value 5 on thread after publishOn boundedElastic-1
만들어보기
@Slf4j
public class IntervalEx {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
}
};
Publisher<Integer> subOnPub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
@Override
public String getThreadNamePrefix() {
return "subOn";
}
});
es.execute(() -> pub.subscribe(
new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
es.shutdown();
}
}
));
}
};
Publisher<Integer> pubOnPub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
subOnPub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
@Override
public String getThreadNamePrefix() {
return "pubOn";
}
});
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
es.execute(() -> sub.onError(t));
es.shutdown();
}
@Override
public void onComplete() {
es.execute(() -> sub.onComplete());
es.shutdown();
}
});
}
};
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer i) {
log.info("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.info("onError:{}", t.toString());
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
}
}
18:16:34.829 [subOn1] INFO three.IntervalEx -- onSubscribe
18:16:34.831 [pubOn1] INFO three.IntervalEx -- onNext:1
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:2
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:3
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:4
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onNext:5
18:16:34.832 [pubOn1] INFO three.IntervalEx -- onComplete
pubOnPub에서 onSubscribe를 제외하고 onNext, onError, onComplete는 pubOn 스레드를 주었기 때문에
onSubscribe는 subOnPub에서 만든 subOn 스레드를 타고 있다.
데몬쓰레드(Demon Thread) vs 사용자쓰레드(User Thread)
맨 처음 Flux예제에서 TimeUnit.SECONDS.sleep(30); 을 통해 30초 딜레이를 주었다.
만약 그 부분이 없다면 Flux 구문은 제대로 실행되지 않고 종료될텐데, 직접만든 예제에서는 딜레이가 없어도 정상적으로 끝까지 진행된다.
이 이유가 데몬쓰레드와 사용자쓰레드의 차이이다.
사용자 쓰레드(User Thread) | 데몬 쓰레드(Demon Thread) | |
장점 | - 프로그램의 주요 작업을 수행한다. - JVM은 모든 사용자 쓰레드가 종료될 때까지 실행을 유지한다. - 애플리케이션의 시작부터 종료까지 실행될 수 있다. |
- 백그라운드에서 작업을 수행한다. - 모든 사용자 쓰레드가 종료되면 자동으로 종료된다. - 주로 자원을 정리하거나 로깅과 같은 보조 작업에 사용된다. |
단점 | - 프로그램 종료 시 모든 사용자 쓰레드가 종료될 때까지 기다려야 한다. - 종료를 관리하는 것이 복잡할 수 있다. |
- 애플리케이션의 주요 작업을 수행하는 데에 적합하지 않다. - 갑작스럽게 종료될 수 있으므로 주요 작업에 사용하기 어렵다. |
리액티브 스트림즈의 별도 스레드들이 데몬스레드인 이유는 아마 리소스 관리와 애플리케이션 종료 과정을 단순화하기 위해서로 보인다.
또한, 비동기 작업이 주 스레드의 종료를 방해하지 않기 위해서 주로 데몬스레드를 사용하는 것 같다.