JVM/Spring

[Toby Spring Reactive Programming] Reactive Streams 시작 (1)

Hyo Kim 2023. 10. 13. 18:23
728x90
반응형

서론

https://youtu.be/8fenTR3KOJo

토비의 스프링에 있는 토비님의 라이브 유튜브 강의를 보고 정리한 내용입니다.


본론

Iterable vs Observable

Iterable

public static void main(String[] args) {
    Iterable<Integer> iter = () -> new Iterator<>() {
        int i = 0;
        final static int MAX = 10;

        public boolean hasNext() {
            return i < MAX;
        }

        public Integer next() {
            return ++i;
        }
    };

    iter.forEach(System.out::println);
}

Iterable 인터페이스를 구현하면, for-each를 사용할 수 있다.

주로, Collection을 사용할 때 많이 접한다.

Observable

static class IntObservable extends Observable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <= 10; i++) {
            setChanged(); // 변경되었다는 사실을 알려주는 함수
            notifyObservers(i); // 데이터를 전달하는 함수
        }
    }
}
    
public static void main(String[] args) {
    Observer ob = new Observer() {

        @Override
        public void update(Observable o, Object arg) {
            // main이 아닌 별도 스레드에서 동작한다.
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };

    IntObservable io = new IntObservable();
    io.addObserver(ob);

    ExecutorService es = Executors.newSingleThreadExecutor();

    es.execute(io);

    System.out.println(Thread.currentThread().getName() + " EXIT");
    es.shutdown();
}

Java 9부터 Depercated됐다.

Observable - 이벤트를 발송하는 주체. addObserver를 통해 여러 Observer에 이벤트를 발송할 수 있다.

Observer - 이벤트를 수신하는 곳. 파라미터로는, Observable - 발송한 곳 / Object - Observable에서 받은 이벤트

Duality (상대성)

  Iterable Observable
event Pull Push
fuction call T next() void notifyObservers(T)
return type O X

기능은 동일하지만, 반대방향을 표현한다.

 

Observer 문제점

1. Complete를 알 수 없다.

2. Error를 알 수 없다.

 

그래서 나온 해결책

Ractive Streams - https://www.reactive-streams.org/

non-blocking, backPressure를 이용하여 비동기 스트림 처리의 표준을 제공하는 Java 진영 표준 스펙.

 

리액티브 스트림즈는 동일한 목적을 위해 최소한의 인터페이스를 정의했다.

Publisher - 구독자의 수요에 따라 구독자에게 데이터를 게시하는 데이터 공급자.  (==Observable)

Subscriber - 데이터의 소비자로서 Publisher를 구독한 후 데이터를 수신할 수 있다. (==Observer)

Subscription - 발행자(Publisher)와 구독자(Subscriber)사이의 연결을 나타낸다. 이를 통해 구독자는 데이터의 흐름을 제어할 수 있다. (백프레셔 기능) Observer 패턴에서는 이 개념이 없다.

Processor - Publisher, Subscriber 역할을 동시에 수행하는 요소로, 중간에서 데이터를 받고 변환하여 다른 구독자에게 전달한다. Subscription과 동일하게 Observer에는 이런 개념이 없다.

 

Java9에서는 Flow API로 위 스펙을 추가했고, reactive-streams 란 라이브러리도 제공하고 있다.

위에 설명한 4가지 인터페이스는 거의 동일하게 정의되어 있다.

 

구현체로는 RxJava(ReactiveX), Reactor 등이 있다.


Java9의 FLow API를 이용하여 만들어보기.

public static void main(String[] args) throws InterruptedException {
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
    ExecutorService es = Executors.newCachedThreadPool();
    
    // Publisher <- Observable
    Publisher p = new Publisher() {
        @Override
        public void subscribe(Subscriber subscriber) {
            Iterator<Integer> it = itr.iterator();
            subscriber.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    es.execute(() -> {
                        int i = 0;
                        try {
                            while (i++ < n) {
                                if (it.hasNext()) {
                                    subscriber.onNext(it.next());
                                } else {
                                    subscriber.onComplete();
                                    break;
                                }
                            }
                        } catch (Exception e) {
                            subscriber.onError(e);
                        }
                    });
                }

                @Override
                public void cancel() {

                }
            });
        }
    };

    // Subscriber <- Observer
    Subscriber<Integer> subscriber = new Subscriber<>() {
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription subscription) {
            System.out.println("onSubscribe");
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println(Thread.currentThread().getName() + " onNext = " + item);
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError = " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    };

    p.subscribe(subscriber);
    es.awaitTermination(10, java.util.concurrent.TimeUnit.HOURS);
    es.shutdown();
}
onSubscribe
pool-1-thread-1 onNext = 1
pool-1-thread-2 onNext = 2
pool-1-thread-3 onNext = 3
pool-1-thread-4 onNext = 4
pool-1-thread-3 onNext = 5
onComplete

 

728x90
반응형