[Toby Spring Reactive Programming] Reactive Streams 시작 (1)
서론
토비의 스프링에 있는 토비님의 라이브 유튜브 강의를 보고 정리한 내용입니다.
본론
Iterable vs Observable
Iterable
public static void main(String[] args) {
Iterable<Integer> iter = () -> new Iterator<>() {
int i = 0;
final static int MAX = 10;
public boolean hasNext() {
return i < MAX;
}
public Integer next() {
return ++i;
}
};
iter.forEach(System.out::println);
}
Iterable 인터페이스를 구현하면, for-each를 사용할 수 있다.
주로, Collection을 사용할 때 많이 접한다.
Observable
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for (int i = 0; i <= 10; i++) {
setChanged(); // 변경되었다는 사실을 알려주는 함수
notifyObservers(i); // 데이터를 전달하는 함수
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
// main이 아닌 별도 스레드에서 동작한다.
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
Java 9부터 Depercated됐다.
Observable - 이벤트를 발송하는 주체. addObserver를 통해 여러 Observer에 이벤트를 발송할 수 있다.
Observer - 이벤트를 수신하는 곳. 파라미터로는, Observable - 발송한 곳 / Object - Observable에서 받은 이벤트
Duality (상대성)
Iterable | Observable | |
event | Pull | Push |
fuction call | T next() | void notifyObservers(T) |
return type | O | X |
기능은 동일하지만, 반대방향을 표현한다.
Observer 문제점
1. Complete를 알 수 없다.
2. Error를 알 수 없다.
그래서 나온 해결책
Ractive Streams - https://www.reactive-streams.org/
non-blocking, backPressure를 이용하여 비동기 스트림 처리의 표준을 제공하는 Java 진영 표준 스펙.
리액티브 스트림즈는 동일한 목적을 위해 최소한의 인터페이스를 정의했다.
Publisher - 구독자의 수요에 따라 구독자에게 데이터를 게시하는 데이터 공급자. (==Observable)
Subscriber - 데이터의 소비자로서 Publisher를 구독한 후 데이터를 수신할 수 있다. (==Observer)
Subscription - 발행자(Publisher)와 구독자(Subscriber)사이의 연결을 나타낸다. 이를 통해 구독자는 데이터의 흐름을 제어할 수 있다. (백프레셔 기능) Observer 패턴에서는 이 개념이 없다.
Processor - Publisher, Subscriber 역할을 동시에 수행하는 요소로, 중간에서 데이터를 받고 변환하여 다른 구독자에게 전달한다. Subscription과 동일하게 Observer에는 이런 개념이 없다.
Java9에서는 Flow API로 위 스펙을 추가했고, reactive-streams 란 라이브러리도 제공하고 있다.
위에 설명한 4가지 인터페이스는 거의 동일하게 정의되어 있다.
구현체로는 RxJava(ReactiveX), Reactor 등이 있다.
Java9의 FLow API를 이용하여 만들어보기.
public static void main(String[] args) throws InterruptedException {
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService es = Executors.newCachedThreadPool();
// Publisher <- Observable
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
es.execute(() -> {
int i = 0;
try {
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
// Subscriber <- Observer
Subscriber<Integer> subscriber = new Subscriber<>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " onNext = " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError = " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
p.subscribe(subscriber);
es.awaitTermination(10, java.util.concurrent.TimeUnit.HOURS);
es.shutdown();
}
onSubscribe
pool-1-thread-1 onNext = 1
pool-1-thread-2 onNext = 2
pool-1-thread-3 onNext = 3
pool-1-thread-4 onNext = 4
pool-1-thread-3 onNext = 5
onComplete