JVM/Spring

[Toby Spring Reactive Programming] Operators (2)

Hyo Kim 2023. 10. 15. 01:25
728x90
반응형

서론

https://www.youtube.com/watch?v=DChIxy9g19o

토비님 유튜브 강의를 보고 정리한 내용입니다.


본론

Operator

https://kdhyo98.tistory.com/135 첫 번째 강의에서

Publisher, Subscriber를 하나씩 사용해서 리액티브 스트림즈를 간단하게 알아봤다.

 

Operator는 기존 Publisher -> Subscriber에서 중간에 연산자를 두어 결과를 변경하거나

10개를 1개만 최종으로 보내거나 하는 녀석을 말한다.

 

Publisher (DataA) -> (DataA) Operator (DataB) -> (DataB) Subscriber  느낌이다.

Java 8의 스트림을 생각하면 이해하기 쉽다. 스트림의 map, reduce, filter 등 기능을 똑같이 할 수 있다.

Operator를 구현하는 실습을 해봤다.

 

기본 틀

public class PubSub {
    static Logger log = Logger.getLogger(PubSub.class.getName());

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
        pub.subscribe(logSub());
    }

    private static Publisher<Integer> iterPub(Iterable<Integer> iter) {
        return new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super Integer> s) {
                s.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s::onNext);
                            s.onComplete();
                        } catch (Exception e) {
                            s.onError(e);
                        }
                    }
                    @Override
                    public void cancel() {}
                });
            }
        };
    }

    private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe: ");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(T i) { log.info("onNext: " + i); }
            @Override
            public void onError(Throwable t) { log.info("onError: " + t); }
            @Override
            public void onComplete() { log.info("onComplete"); }
        };
    }
}

기본 구성이다.

첫 번째 예시와 거의 동일하고, println() 이 log.info()로 바뀐 정도다.

 

DelegateSub

public class DelegateSub<T, R> implements Subscriber<T> {

    private final Subscriber sub;

    public DelegateSub(Subscriber<? super R> sub) { this.sub = sub; }

    @Override
    public void onSubscribe(Subscription s) { sub.onSubscribe(s); }

    @Override
    public void onNext(T i) { sub.onNext(i); }

    @Override
    public void onError(Throwable t) { sub.onError(t); }

    @Override
    public void onComplete() { sub.onComplete(); }
}

Operator에서 사용할 SubScriber다.

주 기능은 Publisher에서 구현할거고, DeleagteSub은 다시 넘겨주는 역할을 한다.

 

map

public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
    Publisher<String> mapPub = mapPub(pub, s -> "[ " + s + " ]");
    mapPub.subscribe(logSub());
}
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
    return new Publisher<R>() {
        @Override
        public void subscribe(Subscriber<? super R> sub) {
            pub.subscribe(new DelegateSub<T, R>(sub) {
                @Override
                public void onNext(T i) {
                    sub.onNext(f.apply(i));
                }
            });
        }
    };
}

iterPub ->[ mapPub -> DelegatorSub ] -> logSub 흐름으로 진행되고,

Operator인 mapPub -> DelegatorSub 를 넣어서 Integer -> String 결과로 바꿔주었다.

 

sumPub

public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
    Publisher<Integer> sumPub = sumPub(pub);
    sumPub.subscribe(logSub());
}

private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
    return new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer> sub) {
            pub.subscribe(new DelegateSub<Integer, Integer>(sub) {
                Integer sum = 0;

                @Override
                public void onNext(Integer i) {
                    sum += i;
                }

                @Override
                public void onComplete() {
                    sub.onNext(sum);
                    sub.onComplete();
                }
            });
        }
    };
}

mapPub과 다르게 DelegateSub의 onNext()에서 logSub의 onNext()를 호출하지 않고,

onComplete()에서 호출했다.

이처럼 10개의 데이터를 기존처럼 모두 보내는 게 아닌, 중간 sumPub에서 데이터를 가공해 1번만 보내도록 변경할 수도 있다.

 

reducePub

public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
    Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
            (a, b) -> a.append(b + ","));
    reducePub.subscribe(logSub());
}

private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Publisher<R>() {
        @Override
        public void subscribe(Subscriber<? super R> sub) {
            pub.subscribe(new DelegateSub<T, R>(sub) {
                R result = init;

                @Override
                public void onNext(T i) {
                    result = bf.apply(result, i);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);
                    sub.onComplete();
                }
            });
        }
    };
}

sumPub과 동일하게 중간에 데이터를 가공해 한 번만 보내지만, reduce 개념을 사용하여

BiFunction을 받아서 처리하도록 했다.

 

Reactor

public class ReactorEx {

    public static void main(String[] args) {
        Flux.<Integer>create(e -> {
                    e.next(1);
                    e.next(2);
                    e.next(3);
                    e.complete();
                }).log()
                .map(s -> s * 10)
                .log()
                .reduce(0, Integer::sum)
                .log()
                .subscribe(System.out::println);
    }
}

우리가 직접 만들어본 map, reduce와 같은 것들은 직접 만들 필요 없이, Reactor라는 라이브러리에서 이미 다 구현을 했다.

그래도 한 번 직접 따라서 만들어보니 Reactor를 해볼 때 좀 더 이해가 잘 될 것 같다.

 

Spring Response

@SpringBootApplication
public class TobyReactiveApplication {

    @RestController()
    public static class Controller {
        @RequestMapping("/hello")
        public Publisher<String> hello(String name) {
            return new Publisher<String>() {
                @Override
                public void subscribe(Subscriber<? super String> s) {
                    s.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            s.onNext("Hello "+ name);
                            s.onComplete();
                        }

                        @Override
                        public void cancel() {

                        }
                    });
                }
            };
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(TobyReactiveApplication.class);
    }
}

Spring MVC 핸들러에서는 이미 구현이 되어 있어서 리턴타입이 Publisher인걸 보고 자동으로 맵핑해준다.

onComplete(); 가 호출이 되지 않으면, 무한히 response를 보내기에 .. 꼭 호출해주어야 한다.

onNext()를 여러 번 호출해도 결국 마지막 데이터만 전송되는 걸 확인했다.

728x90
반응형