[Toby Spring Reactive Programming] Operators (2)
서론
https://www.youtube.com/watch?v=DChIxy9g19o
토비님 유튜브 강의를 보고 정리한 내용입니다.
본론
Operator
https://kdhyo98.tistory.com/135 첫 번째 강의에서
Publisher, Subscriber를 하나씩 사용해서 리액티브 스트림즈를 간단하게 알아봤다.
Operator는 기존 Publisher -> Subscriber에서 중간에 연산자를 두어 결과를 변경하거나
10개를 1개만 최종으로 보내거나 하는 녀석을 말한다.
Publisher (DataA) -> (DataA) Operator (DataB) -> (DataB) Subscriber 느낌이다.
Java 8의 스트림을 생각하면 이해하기 쉽다. 스트림의 map, reduce, filter 등 기능을 똑같이 할 수 있다.
Operator를 구현하는 실습을 해봤다.
기본 틀
public class PubSub {
static Logger log = Logger.getLogger(PubSub.class.getName());
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
pub.subscribe(logSub());
}
private static Publisher<Integer> iterPub(Iterable<Integer> iter) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s::onNext);
s.onComplete();
} catch (Exception e) {
s.onError(e);
}
}
@Override
public void cancel() {}
});
}
};
}
private static <T> Subscriber<T> logSub() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe: ");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T i) { log.info("onNext: " + i); }
@Override
public void onError(Throwable t) { log.info("onError: " + t); }
@Override
public void onComplete() { log.info("onComplete"); }
};
}
}
기본 구성이다.
첫 번째 예시와 거의 동일하고, println() 이 log.info()로 바뀐 정도다.
DelegateSub
public class DelegateSub<T, R> implements Subscriber<T> {
private final Subscriber sub;
public DelegateSub(Subscriber<? super R> sub) { this.sub = sub; }
@Override
public void onSubscribe(Subscription s) { sub.onSubscribe(s); }
@Override
public void onNext(T i) { sub.onNext(i); }
@Override
public void onError(Throwable t) { sub.onError(t); }
@Override
public void onComplete() { sub.onComplete(); }
}
Operator에서 사용할 SubScriber다.
주 기능은 Publisher에서 구현할거고, DeleagteSub은 다시 넘겨주는 역할을 한다.
map
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<String> mapPub = mapPub(pub, s -> "[ " + s + " ]");
mapPub.subscribe(logSub());
}
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
});
}
};
}
iterPub ->[ mapPub -> DelegatorSub ] -> logSub 흐름으로 진행되고,
Operator인 mapPub -> DelegatorSub 를 넣어서 Integer -> String 결과로 바꿔주었다.
sumPub
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> sumPub = sumPub(pub);
sumPub.subscribe(logSub());
}
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub<Integer, Integer>(sub) {
Integer sum = 0;
@Override
public void onNext(Integer i) {
sum += i;
}
@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
});
}
};
}
mapPub과 다르게 DelegateSub의 onNext()에서 logSub의 onNext()를 호출하지 않고,
onComplete()에서 호출했다.
이처럼 10개의 데이터를 기존처럼 모두 보내는 게 아닌, 중간 sumPub에서 데이터를 가공해 1번만 보내도록 변경할 수도 있다.
reducePub
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
(a, b) -> a.append(b + ","));
reducePub.subscribe(logSub());
}
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T i) {
result = bf.apply(result, i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
sumPub과 동일하게 중간에 데이터를 가공해 한 번만 보내지만, reduce 개념을 사용하여
BiFunction을 받아서 처리하도록 했다.
Reactor
public class ReactorEx {
public static void main(String[] args) {
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
}).log()
.map(s -> s * 10)
.log()
.reduce(0, Integer::sum)
.log()
.subscribe(System.out::println);
}
}
우리가 직접 만들어본 map, reduce와 같은 것들은 직접 만들 필요 없이, Reactor라는 라이브러리에서 이미 다 구현을 했다.
그래도 한 번 직접 따라서 만들어보니 Reactor를 해볼 때 좀 더 이해가 잘 될 것 같다.
Spring Response
@SpringBootApplication
public class TobyReactiveApplication {
@RestController()
public static class Controller {
@RequestMapping("/hello")
public Publisher<String> hello(String name) {
return new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onNext("Hello "+ name);
s.onComplete();
}
@Override
public void cancel() {
}
});
}
};
}
}
public static void main(String[] args) {
SpringApplication.run(TobyReactiveApplication.class);
}
}
Spring MVC 핸들러에서는 이미 구현이 되어 있어서 리턴타입이 Publisher인걸 보고 자동으로 맵핑해준다.
onComplete(); 가 호출이 되지 않으면, 무한히 response를 보내기에 .. 꼭 호출해주어야 한다.
onNext()를 여러 번 호출해도 결국 마지막 데이터만 전송되는 걸 확인했다.