Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sprinb webflux] operator #56

Open
backtony opened this issue May 20, 2023 · 0 comments
Open

[sprinb webflux] operator #56

backtony opened this issue May 20, 2023 · 0 comments

Comments

@backtony
Copy link
Owner

operators

Sequence 생성을 위한 operator

justOrEmpty

justOrEmpty는 just의 확장 operator로서, just와 달리 emit할 데이터가 null일 경우 NPE이 발생하지 않고 onComplete Signal을 전송한다. 그리고 emit할 데이터가 null이 아닐 경우 해당 데이터를 emit하는 Mono를 생성한다.

public static void main(String[] args) {
    Mono
        .justOrEmpty(null)
        // emit할 데이터가 null이지만 NPE가 발생하지 않고(onError가 아니라) onComplete signal이 발생한다.
        .subscribe(data -> {},
                error -> {},
                () -> log.info("# onComplete"));
}

fromIterable

fromIterable은 iterable에 포함된 데이터를 emit하는 Flux를 생성한다.

public static void main(String[] args) {
    Flux
            .fromIterable(
                    Arrays.asList(
                    Tuples.of("BTC", 52_000_000),
                    Tuples.of("ETH", 1_720_000),
                    Tuples.of("XRP", 533),
                    Tuples.of("ICX", 2_080),
                    Tuples.of("EOS", 4_020),
                    Tuples.of("BCH", 558_000))
            )
            .subscribe(coin ->
                    log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2())
            );
}

fromStream

stream에 포함된 데이터를 emit하는 Flux를 생성한다.

public static void main(String[] args) {
    Flux
        .fromStream(() -> Arrays.asList("BTC", "ETH", "XRP", "ICX", "EOS", "BCH").stream())
        .filter(coin -> coin.equals("BTC") || coin.equals("ETH"))
        .subscribe(data -> log.info("{}", data));
}

range

n부터 1씩 증가한 연속된 수를 m개 emit하는 Flux를 생성한다. for문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용된다.

public static void main(String[] args) {
    Flux
        .range(5, 10)
        .subscribe(data -> log.info("{}", data));
}

defer

operator를 선언한 시점에 데이터를 emit하는 것이 아니라 구독하는 시점에 데이터를 emit하는 Flux 또는 Mono를 생성한다. defer는 데이터 emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit하여 불필요한 프로세스를 줄인다.

public static void main(String[] args) throws InterruptedException {
    log.info("# start: {}", LocalDateTime.now());
    Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
    Mono<LocalDateTime> deferMono = Mono.defer(() ->
                                                Mono.just(LocalDateTime.now()));

    Thread.sleep(2000);

    justMono.subscribe(data -> log.info("# onNext just1: {}", data)); 
    deferMono.subscribe(data -> log.info("# onNext defer1: {}", data));

    Thread.sleep(2000);

    justMono.subscribe(data -> log.info("# onNext just2: {}", data));
    deferMono.subscribe(data -> log.info("# onNext defer2: {}", data));
}

// 출력 -> justMono는 같은 시간을 출력한 것을 볼 수 있다. == just는 hot publisher로 구독과 상관 없이 데이터가 emit되고 replay된다.
// defer는 구독이 발생되기 전까지 emit을 지연시키기 때문에 구독이 발생할때 emit되어 각각 시간이 다르다.
23:14:53.097 [main] INFO - # onNext just1: 2023-05-18T23:14:51.000760
23:14:53.098 [main] INFO - # onNext defer1: 2023-05-18T23:14:53.098494
23:14:55.108 [main] INFO - # onNext just2: 2023-05-18T23:14:51.000760
23:14:55.109 [main] INFO - # onNext defer2: 2023-05-18T23:14:55.109637
public static void main(String[] args) throws InterruptedException {
    log.info("# start: {}", LocalDateTime.now());
    Mono
        .just("Hello")
        .switchIfEmpty(sayDefault())
//            .switchIfEmpty(Mono.defer(() -> sayDefault()))
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(3500);
}

private static Mono<String> sayDefault() {
    log.info("# Say Hi");
    return Mono.just("Hi");
}

just로 데이터가 존재하기 때문에 switchIfEmpty안의 sayDefault 함수가 호출되지 않을 것 같지만 실제로는 호출이 된다. 즉, 불필요하게 호출되는 것이다. 이때 defer로 감싸주면 호출되지 않는다.

using

using은 파라미터로 전달받은 resource를 emit하는 Flux를 생성한다. 첫 번째 파라미터는 읽어 올 resource이고, 두 번째 파라미터는 읽어 온 resource를 emit하는 Flux다. 세 번째 마라미터는 종료 signal(onComplete 또는 onError)이 발생할 경우, resource를 해제하는 등의 후처리를 할 수 있게 해준다.

public static void main(String[] args) {
    Path path = Paths.get("D:\\resources\\using_example.txt");

    Flux
        .using(() -> 
            Files.lines(path), // 파일의 한 라인 씩 읽는다.
            Flux::fromStream, // stream 형태로 emit한다.
            Stream::close) // 라인 문자열 데이터 emit이 끝나면 종료 signal 후 처리  -> 이 3 과정을 파일 라인이 끝날때까지 반복한다.
        .subscribe(log::info);
}

generate

프로그래밍방식으로 signal 이벤트를 발생시키며 특히 동기적으로 데이터를 하나씩 순차적으로 emit하고자 할 경우 사용된다.
첫 번째 파라미터는 emit할 숫자의 초기값을 지정한다. 두 번째 파라미터는 람다 표현식으로 초기값으로 지정한 숫자부터 emit하고 emit한 숫자를 1씩 증가시켜서 다시 emit하는 작업을 반복하는데 이렇게 1씩 증가하는 숫자를 상태(state) 값으로 정의한다.

public static void main(String[] args) {
    Flux
        .generate(
            // 초기값
            () -> 0,
            // 람다식 (state, synschronousSink) synschronousSink는 최대 하나의 상태값만 동기적으로 emit하는 인터페이스
            (state, sink) -> {
                // 상태값을 emit
                sink.next(state);
                // 특정 상태값 도달 시 onComplete
                if (state == 10)
                    sink.complete();
                // 상태값 조정
                return ++state;
            })
        .subscribe(data -> log.info("# onNext: {}", data));
}
public static void main(String[] args) {
    // 3단 구구단
    final int dan = 3;
    Flux
        .generate(() -> Tuples.of(dan, 1), (state, sink) -> {
            // emit
            sink.next(state.getT1() + " * " +
                    state.getT2() + " = " + state.getT1() * state.getT2());
            if (state.getT2() == 9)
                sink.complete();
            // state 업데이트    
            return Tuples.of(state.getT1(), state.getT2() + 1);
        }, state -> log.info("# 구구단 {}단 종료!", state.getT1()))
        .subscribe(data -> log.info("# onNext: {}", data));
}

create

generate처럼 프로그래밍 방식으로 signal 이벤트를 발생시키나 한번에 여러 건의 데이터를 비동기적으로 emit한다.

public class Example {
    static int SIZE = 0;
    static int COUNT = -1;
    final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

    public static void main(String[] args) {
        log.info("# start");
        Flux.create((FluxSink<Integer> sink) -> {
            sink.onRequest(n -> {
                try {
                    Thread.sleep(1000L);
                    for (int i = 0; i < n; i++) {
                        if (COUNT >= 9) {
                            sink.complete();
                        } else {
                            COUNT++;
                            sink.next(DATA_SOURCE.get(COUNT));
                        }
                    }
                } catch (InterruptedException e) {}
            });

            sink.onDispose(() -> log.info("# clean up"));
        })
        .subscribe(new BaseSubscriber<>() {

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(2);
            }

            @Override
            protected void hookOnNext(Integer value) {
                SIZE++;
                log.info("# onNext: {}", value);
                if (SIZE == 2) {
                    request(2);
                    SIZE = 0;
                }
            }

            @Override
            protected void hookOnComplete() {
                log.info("# onComplete");
            }
        });
    }
}
  1. 구독이 발생하면 hookOnSubscribe 메서드가 호출된다. 여기서는 request(2) 이므로 2개의 데이터를 요청하게 된다.
  2. subscriber에서 request 메서드를 호출하면 create operator 내부에서 sink.onRequest 메서드가 호출된다. request(2)이므로 n은 2가 된다.
  3. subscriber가 요청한 데이터 개수만큼 emit한다.
  4. hookOnNext에서 emit된 데이터를 받아서 처리한다. 해당 메서드 내부에 request(2)가 있으므로 publisher는 다시 2개의 데이터를 emit한다.
  5. 반복하다가 모두 emit되면 onComplete signal을 발생시킨다.(sink.complete())
  6. hookOnComplete 메서드가 호출된다.
  7. sink.onDispose에서 후처리가 진행된다.

create의 경우 subscriber가 request 메서드를 통해 요청을 보내면 publisher가 해당 요청 개수만큼의 데이터를 emit하는 일종의 pull 방식으로 데이터를 처리한다.

그런데 create는 subscriber의 요청과 상관없이 비동기적으로 데이터를 emit하는 push 방식 역시 사용할 수 있다.

public class CryptoCurrencyPriceEmitter {
    private CryptoCurrencyPriceListener listener;

    public void setListener(CryptoCurrencyPriceListener listener) {
        this.listener = listener;
    }

    public void flowInto() {
        listener.onPrice(SampleData.btcPrices);
    }

    public void complete() {
        listener.onComplete();
    }
}
public interface CryptoCurrencyPriceListener {
    void onPrice(List<Integer> priceList);
    void onComplete();
}


public static void main(String[] args) throws InterruptedException {
    CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter();

    Flux.create((FluxSink<Integer> sink) ->
                    priceEmitter.setListener(new CryptoCurrencyPriceListener() {
        @Override
        public void onPrice(List<Integer> priceList) {
            priceList.stream().forEach(price -> {
                sink.next(price);
            });
        }

        @Override
        public void onComplete() {
            sink.complete();
        }
    }))
    .publishOn(Schedulers.parallel())
    .subscribe(
        data -> log.info("# onNext: {}", data),
        error -> {},
        () -> log.info("# onComplete"));

    Thread.sleep(3000L);

    priceEmitter.flowInto();

    Thread.sleep(2000L);
    priceEmitter.complete();
}
  1. 구독이 발생하면 create operator의 파라미터인 람다 표현식이 실행된다.
  2. 암호화폐 가격이 변동하면 변동된 가격 데이터를 emit할 수 있도록 cryptoCurrencyPriceEmitter가 CryptoCurrencyPriceListener를 Listener로 등록한다. 화폐 가겨 변동이 있을 때마다 리스너의 onPrice메서드가 호출된다.
  3. 가격 변동 전에 스레드 3초 쉬고 가격 변동 시키는 메서드 호출한다.
  4. 처리후 종료한다.

subscriber에서 request 요청을 보내는 것과 상관 없이 listener를 통해 들어오는 데이터를 리스닝하고 있다가 실제로 들어오는 데이터가 있을 경우에만 데이터를 emit하는 일종의 push 방식으로 데이터를 처리한다.


create는 한 번에 여러 건의 데이터를 emit할 수 있기 때문에 backpressure 전략이 필요하다.

public class Exam {
    static int start = 1;
    static int end = 4;

    public static void main(String[] args) throws InterruptedException {
        Flux.create((FluxSink<Integer> emitter) -> {
            emitter.onRequest(n -> {
                log.info("# requested: " + n);
                try {
                    Thread.sleep(500L);
                    for (int i = start; i <= end; i++) {
                        emitter.next(i);
                    }
                    start += 4;
                    end += 4;
                } catch (InterruptedException e) {}
            });

            emitter.onDispose(() -> {
                log.info("# clean up");
            });
        }, FluxSink.OverflowStrategy.DROP)
        .subscribeOn(Schedulers.boundedElastic())
        // 구독이 발생하면 2만큼의 데이터를 요청한다. -> 2개를 요청했는데 create에서는 4개를 만들고 있으므로 backpressure 전략에 따라 2개는 drop된다.
        // oncomplete 문이 없기 때문에 계속 반복하다가 메인 스레드가 종료되면 종료된다.
        .publishOn(Schedulers.parallel(), 2)
        .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(3000L);
    }
}

정리

  • just : hot publisher이기 때문에 subscriber 구독과는 관계없이 데이터를 emit하며, 구독이 발생하면 emit된 데이터를 다시 replay한다.
  • defer : 구독이 발생하기 전까지 데이터의 emit을 지연시킨다. 따라서 just를 감싸게 되면 실제 구독이 발생해야 데이터를 emit한다.
  • using : 파라미터로 전달받은 resource를 emit하는 Flux를 생성한다.
  • generator : 프로그래밍 방식으로 signal 이벤트를 발생시키며 동기적으로 하나씩 순차적으로 emit한다.
  • create : 프로그래밍 방식으로 signal 이벤트를 발생시키며 한 번에 여러 건을 비동기로 emit할 수 있다.

Sequence 필터링 operator

filter

filter는 upstream에서 emit된 데이터 중에서 조건에 일치하는 데이터만 downstream으로 emit한다.

public static void main(String[] args) {
    Flux
        .range(1, 20)
        .filter(num -> num % 2 != 0)
        .subscribe(data -> log.info("# onNext: {}", data));
}
public static void main(String[] args) throws InterruptedException {
    Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
                                                            getCovidVaccines();
    Flux
        .fromIterable(SampleData.coronaVaccineNames)  // 백신명 emit        
        // innerSequence로 조건에 맞는 데이터인지를 비동기적으로 테스트한 후 결과가 true라면 downstream으로 emit
        .filterWhen(vaccine -> Mono
                                .just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
                                .publishOn(Schedulers.parallel()))
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(1000);
}

// 출력
00:09:10.661 [parallel-2] INFO - # onNext: AstraZeneca
00:09:10.693 [parallel-3] INFO - # onNext: Moderna

skip

upstream에서 emit된 데이터 중에서 파라미터로 입력받은 숫자만큼 건너뛴 후, 나머지 데이터를 downstream으로 emit

public static void main(String[] args) throws InterruptedException {
    Flux
        // interval에서 0부터 1씩 증가한 숫자를 1초 간격으로 emit하는데 skip2이므로 0,1을 skip한다.
        .interval(Duration.ofSeconds(1))
        .skip(2)// 개수가 아니라 duration을 줄 수도 있다.
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(5500L);
}

take

upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 downstream으로 emit한다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .interval(Duration.ofSeconds(1))
        .take(3) // 숫자가 아니라 duration을 줄 수도 있다.
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(4000L);
}
public static void main(String[] args) {
    Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        .takeLast(2) // emit된 데이터 중 가장 마지막에 emit된 데이터만 개수만큼 downstream으로 emit한다.
        .subscribe(tuple -> log.info("# onNext: {}, {}",
                                        tuple.getT1(), tuple.getT2()));
}
public static void main(String[] args) {
    Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        // predicate이 treu가 될 때까지 emit된 데이터를 downstream으로 emit한다.
        // upstream에서 emit된 데이터는 predicate를 평가할 때 사용한 데이터가 포함된다.
        // Tuples.of(2018, 19_521_543L),
        // Tuples.of(2019, 15_761_568L),
        // Tuples.of(2020, 22_439_002L), -> 여기까지 emit된다.
        // Tuples.of(2021, 63_364_000L)
        .takeUntil(tuple -> tuple.getT2() > 20_000_000) 
        .subscribe(tuple -> log.info("# onNext: {}, {}",
                                        tuple.getT1(), tuple.getT2()));
}
public static void main(String[] args) {
    Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        // predicate이 true가 되는 동안에만 downstream으로 emit한다.
        // predicate을 평가할 때 사용한 데이터가 downstream으로 emit되지 않는다.
        // 평가시 사용된 Tuples.of(2017, 22_483_583L)는 emit되지 않는다.
        .takeWhile(tuple -> tuple.getT2() < 20_000_000)
        .subscribe(tuple -> log.info("# onNext: {}, {}",
                                            tuple.getT1(), tuple.getT2()));
}

next

upstream에서 emit되는 데이터 중에서 첫 번째 데이터만 downstream으로 전달한다. 만약 empty라면 empty mono를 emit한다.

public static void main(String[] args) {
    Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        .next()
        .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
}

Sequence 변환 operator

map

upstream에서 emit된 데이터를 mapper function을 사용하여 가공한 후 downstream으로 emit한다.

public static void main(String[] args) {
    Flux
        .just("1-Circle", "3-Circle", "5-Circle")
        .map(circle -> circle.replace("Circle", "Rectangle"))
        .subscribe(data -> log.info("# onNext: {}", data));
}

flatMap

upstream에서 emit된 한 건의 데이터는 flatMap의 inner sequence에서 여러 건의 데이터로 변환되고 평탄화 작업을 통해서 하나의 sequence로 병합되어 downstream으로 emit된다.

public static void main(String[] args) {
    Flux
        .just("Good", "Bad")
        .flatMap(feeling -> Flux
                                .just("Morning", "Afternoon", "Evening")
                                .map(time -> feeling + " " + time))
        .subscribe(log::info);
}

upstream인 just에서 두 개의 데이터를 emit하면 flatMap 내부의 innser sequence에서 3개의 데이터를 emit한다. upstream 2 * innser sequence 3 = 6개의 데이터가 subscriber에게 전달된다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .range(2, 8)
        .flatMap(dan -> Flux
                            .range(1, 9)
                            // scheduler를 사용해 비동기적으로 진행 
                            // 각 단마다 다른 스레드에서 돈다.
                            // 결과를 보면 2단부터가 아니라 뒤죽박죽인데 flatMap의 inner sequence를 비동기로 동작시키면 순서가 보장되지 않는다는 것을 확인할 수 있다.
                            .publishOn(Schedulers.parallel())
                            .map(n -> dan + " * " + n + " = " + dan * n))
        .subscribe(log::info);

    Thread.sleep(100L);
}

concat

파라미터로 입력되는 publisher의 sequence를 연결해서 데이터를 순차적으로 emit한다. 먼저 입력된 publisher의 sequence가 종료될 때까지 나머지 publisher의 sequence는 subscribe되지 않고 대기하는 특성을 가진다.

public static void main(String[] args) {
    Flux
        .concat(Flux.just(1, 2, 3), Flux.just(4, 5))
        .subscribe(data -> log.info("# onNext: {}", data));
}

merge

입력되는 publisher의 sequence에서 emit된 데이터를 인터리빙(interleave)방식으로 병합한다.

merge는 concat처럼 먼저 입력된 publisher의 sequence가 종료될 까지 나머지 publisher의 sequence가 subscribe되지 않고 대기하는 것이 아니라 모든 publisher의 sequence가 즉시 subscribe된다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .merge(
                // 두 flux중에서 먼저 emit되는 아무 요소나 먼저 emit된다.
                Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)),
                Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L))
        )
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(2000L);
}

zip

zip은 파라미터로 입력되는 publisher sequence에서 emit된 데이터를 결합하는데 각 publisher가 데이터를 하나씩 emit할 때까지 기다렸다가 결합한다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .zip(
                Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
                Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L))
        )
        .subscribe(tuple2 -> log.info("# onNext: {}", tuple2));

    Thread.sleep(2500L);
}
public static void main(String[] args) throws InterruptedException {
    Flux
        .zip(
                Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
                Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)),
                // 세번째 파라미터로 BiFunction 함수형 인터페이스를 추가해서 emit 데이터 쌍을 변환작업을 거친 후 데이터를 전달할 수 도 있다.
                (n1, n2) -> n1 * n2
        )
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(2500L);
}

and

Mono의 Complete signal과 파라미터로 입력된 publisher의 complete signal을 결합하여 새로운 Mono를 반환한다. 즉, Mono와 파라미터로 입력된 publisher의 sequence가 모두 종료되었음을 subscriber에게 알릴 수 있다.

public static void main(String[] args) throws InterruptedException {
    Mono
            .just("Task 1")
            .delayElement(Duration.ofSeconds(1)) // 1초 지연 후 task1 emit
            .doOnNext(data -> log.info("# Mono doOnNext: {}", data))
            .and(
                    Flux
                        .just("Task 2", "Task 3")
                        // 0.6초에 한번씩 emit
                        .delayElements(Duration.ofMillis(600))
                        .doOnNext(data -> log.info("# Flux doOnNext: {}", data))
            )
            // subscriber에게는 next가 호출되지 않고 onComplete만 호출됨. 즉, data가 subscriber에게 가지 않음
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error),
                    () -> log.info("# onComplete")
            );

    Thread.sleep(5000);
}
public static void main(String[] args) throws InterruptedException {
    restartApplicationServer()
            .and(restartDBServer())
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error),
                    // 최종적으로 후처리 작업에 적합한 operator라고 볼 수 있다.
                    () -> log.info("# sent an email to Administrator: " +
                            "All Servers are restarted successfully")
            );

    Thread.sleep(6000L);
}

private static Mono<String> restartApplicationServer() {
    return Mono
            .just("Application Server was restarted successfully.")
            .delayElement(Duration.ofSeconds(2))
            .doOnNext(log::info);
}

private static Publisher<String> restartDBServer() {
    return Mono
            .just("DB Server was restarted successfully.")
            .delayElement(Duration.ofSeconds(4))
            .doOnNext(log::info);
}

collectList

Flux에서 emit된 데이터를 모아서 List로 변환 후 변환된 List를 emit하는 Mono를 반환한다. 만약 upstream sequence가 비어있다면 비어있는 List를 downstream으로 emit한다.

public static void main(String[] args) {
    Flux
        .just("...", "---", "...")
        .map(code -> transformMorseCode(code))
        .collectList()
        .subscribe(list -> log.info(list.stream().collect(Collectors.joining())));
}

// 출력
01:02:02.030 [main] INFO - sos

CollectMap

Flux에서 emit된 데이터를 기반으로 key와 value를 생성하여 Map의 Element로 추가한 후, 최종적으로 Map을 emit하는 Mono를 반환한다. 만약 upstream이 비어있다면 비어있는 Map을 downstream으로 emit한다.

public static void main(String[] args) {
    Flux
        .range(0, 26)
        .collectMap(key -> SampleData.morseCodes[key],
                value -> transformToLetter(value))
        .subscribe(map -> log.info("# onNext: {}", map));
}

Sequence의 내부 동작 확인을 위한 operator

reactor에서는 upstream publisher에서 emit되는 데이터의 변경 없이 부수 효과만 수행하기 위한 operator들이 있다. doOnXXX로 시작하는 operator들이 그러하다.

doOnXXX로 시작하는 operator는 consumer 또는 runnable 타입의 함수형 인터페이스를 파라미터로 가지기 때문에 별도의 리턴 값이 없다. 따라서 upstream publisher로부터 emiㅅ되는 데이터를 이용해 upstream publisher의 내부 동작을 엿볼 수 있으며 로그를 출력하는 등의 디버깅 용도로 많이 사용된다. 또한 emit과정에서 error가 발생하면 해당 에러에 대한 알림을 전송하는 로직을 추가하는 등 부수 효과를 위한 다양한 로직을 적용할 수 있다.

  • doOnSubscribe
    • publisher가 구독 중일 때 트리거되는 동작 추가
  • doOnRequest
    • publisher가 요청을 수신할 때 트리거되는 동작 추가
  • doOnNext
    • publisher가 데이터를 emit할 때 트리거되는 동작 추가
  • doOnComplete
    • publisher가 성공적으로 완료되었을 때 트리거되는 동작 추가
  • doOnError
    • publisher가 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가
  • doOnCancel
    • publisher가 취소되었을 때 트리거되는 동작 추가
  • doOnTerminate
    • publisher가 성공적으로 완료되었을 때 또는 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가
  • doOnEach
    • publisher가 데이터를 emit할 때, 성공적으로 완료되었을 때, 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가
  • doOnDiscard
    • updtream에 있는 전체 operator 체인의 동작 중에서 operator에 의해 폐기되는 요소를 조건부로 정리
  • doAlterTerminate
    • downstream을 성공적으로 완료한 직후 또는 에러가 발생하여 publisher가 종료된 직후 트리거되는 동작 추가
  • doFirst
    • publisher가 구독되기 전에 트리거되는 동작 추가
  • doFinally
    • 에러를 포함해서 어떤 이유이든 간에 publisher가 종료된 후 트리거되는 동작을 추가
public static void main(String[] args) {
    Flux.range(1, 5)
        .doFinally(signalType -> log.info("# doFinally 1: {}", signalType))
        .doFinally(signalType -> log.info("# doFinally 2: {}", signalType))
        .doOnNext(data -> log.info("# range > doOnNext(): {}", data))
        .doOnRequest(data -> log.info("# doOnRequest: {}", data))
        .doOnSubscribe(subscription -> log.info("# doOnSubscribe 1"))
        .doFirst(() -> log.info("# doFirst()"))
        .filter(num -> num % 2 == 1)
        .doOnNext(data -> log.info("# filter > doOnNext(): {}", data))
        .doOnComplete(() -> log.info("# doOnComplete()"))
        .subscribe(new BaseSubscriber<>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);
            }

            @Override
            protected void hookOnNext(Integer value) {
                log.info("# hookOnNext: {}", value);
                request(1);
            }
        });
}

// 출력
12:52:07.347 [main] INFO - # doFirst()
12:52:07.351 [main] INFO - # doOnSubscribe 1
12:52:07.351 [main] INFO - # doOnRequest: 1
12:52:07.352 [main] INFO - # range > doOnNext(): 1
12:52:07.352 [main] INFO - # filter > doOnNext(): 1
12:52:07.352 [main] INFO - # hookOnNext: 1
12:52:07.353 [main] INFO - # doOnRequest: 1
12:52:07.353 [main] INFO - # range > doOnNext(): 2
12:52:07.353 [main] INFO - # range > doOnNext(): 3
12:52:07.354 [main] INFO - # filter > doOnNext(): 3
12:52:07.354 [main] INFO - # hookOnNext: 3
12:52:07.354 [main] INFO - # doOnRequest: 1
12:52:07.355 [main] INFO - # range > doOnNext(): 4
12:52:07.355 [main] INFO - # range > doOnNext(): 5
12:52:07.355 [main] INFO - # filter > doOnNext(): 5
12:52:07.355 [main] INFO - # hookOnNext: 5
12:52:07.355 [main] INFO - # doOnRequest: 1
12:52:07.356 [main] INFO - # doOnComplete()
12:52:07.356 [main] INFO - # doFinally 2: onComplete
12:52:07.357 [main] INFO - # doFinally 1: onComplete

doFirst가 제일 먼저 동작하고 doFinally는 가장 마지막에 동작하는 것을 볼 수 있다. 구독이 발생하면 doOnSubscribe가 동작하고 subscriber의 요청이 있을 때 doOnRequest가 동작하며 upstream에서 데이터가 emit될 때마다 doOnNext가 동작하는 것을 볼 수 있다. doFinally는 여러 번 호출될 경우 선언한 시점의 역순으로(아래 선언한 것이 우선) 동작한다.

에러 처리를 위한 operator

error

error는 파라미터로 지정된 에러로 종료하는 Flux를 생성한다. 마치 java의 throw 키워드를 사용해서 예외를 의도적으로 던지는 것과 같은 역할을 하며 주로 check exception을 캐치해서 다시 던져야하는 경우 사용한다.

public static void main(String[] args) {
    Flux
        .range(1, 5)
        .flatMap(num -> {
            if ((num * 2) % 3 == 0) {
                return Flux.error(
                        new IllegalArgumentException("Not allowed multiple of 3"));
            } else {
                return Mono.just(num * 2);
            }
        })
        .subscribe(data -> log.info("# onNext: {}", data),
                error -> log.error("# onError: ", error));
}

// 출력
12:56:17.670 [main] INFO - # onNext: 2
12:56:17.672 [main] INFO - # onNext: 4
12:56:17.676 [main] ERROR- # onError: 
java.lang.IllegalArgumentException: Not allowed multiple of 3
	at chapter14.operator_5_error.Example14_43.lambda$main$0(Example14_43.java:19)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
	at chapter14.operator_5_error.Example14_43.main(Example14_43.java:25)
public static void main(String[] args) {
    Flux
        .just('a', 'b', 'c', '3', 'd')
        .flatMap(letter -> {
            // 명시적으로 error 이벤트를 발생시켜야 하는 경우
            // flatMap처럼 Inner Sequence가 존재하는 경우 
            // 체크 예외 발생 시 Flux로 래핑해서 onError Signal을 전송할 수 있다.
            try {
                return convert(letter);
            } catch (DataFormatException e) {
                return Flux.error(e);
            }
        })
        .subscribe(data -> log.info("# onNext: {}", data),
                error -> log.error("# onError: ", error));
}

private static Mono<String> convert(char ch) throws DataFormatException {
    if (!Character.isAlphabetic(ch)) {
        // checked exception
        throw new DataFormatException("Not Alphabetic");
    }
    return Mono.just("Converted to " + Character.toUpperCase(ch));
}

onErrorReturn

onErrorReturn는 에러 이벤트가 발생했을 때, 에러 이벤트를 downstream으로 전파하지 않고 대체 값을 emit한다. java에서 try-catch문의 catch 블록에서 예외에 해당하는 값을 다른 것으로 처리하는 것과 같다고 보면 된다.

public static void main(String[] args) {
    getBooks()
            // null이면 NPE이 발생할텐데
            .map(book -> book.getPenName().toUpperCase())
            // 다른 값으로 emit
            .onErrorReturn("No pen name")
            // error 타입을 지정 가능
            // .onErrorReturn(NullPointerException.class, "no pen name")
            // .onErrorReturn(IllegalFormatException.class, "Illegal pen name")
            .subscribe(log::info);
}

onErrorResume

에러 이벤트가 발생했을 때, downstream으로 전파하지 않고 대체 publisher를 리턴한다.

/**
 *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, 대체 Publisher로 데이터를 emit하고자 할 경우
 *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
 */
public static void main(String[] args) {
    final String keyword = "DDD";
    getBooksFromCache(keyword)
            .onErrorResume(error -> getBooksFromDatabase(keyword))
            .subscribe(data -> log.info("# onNext: {}", data.getBookName()),
                    error -> log.error("# onError: ", error));
}

public static Flux<Book> getBooksFromCache(final String keyword) {
    return Flux
            .fromIterable(SampleData.books)
            .filter(book -> book.getBookName().contains(keyword))
            .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
}

public static Flux<Book> getBooksFromDatabase(final String keyword) {
    List<Book> books = new ArrayList<>(SampleData.books);
    books.add(new Book("DDD: Domain Driven Design",
            "Joy", "ddd-man", 35000, 200));
    return Flux
            .fromIterable(books)
            .filter(book -> book.getBookName().contains(keyword))
            .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
}

private static class NoSuchBookException extends RuntimeException {
    NoSuchBookException(String message) {
        super(message);
    }
}

onErrorContinue

에러가 발생했을 때, 에러 영역 내에 있는 데이터를 제거하고, upstream의 후속 데이터를 emit하는 방식으로 에러를 복구할 수 있는 기능을 제공해준다.
파라미터로 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 진행할 수 있다.
하지만 공식 문서에는 명확하지 않은 sequence의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 신중한 사용을 권고하며, 대부분의 에러는 doOnError로 로그를 기록하고 onErrorResume등으로 처리할 수 있다고 명시한다.

public static void main(String[] args) {
    Flux
        .just(1, 2, 4, 0, 6, 12)
        .map(num -> 12 / num)
        .onErrorContinue((error, num) ->
                log.error("error: {}, num: {}", error, num))
        .subscribe(data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError: ", error));
}

// 출력 -> onError는 발생하지 않았고 onErrorContinue 로그만 찍힌 것을 볼 수 있다.
13:08:11.142 [main] INFO - # onNext: 12
13:08:11.143 [main] INFO - # onNext: 6
13:08:11.143 [main] INFO - # onNext: 3
13:08:11.145 [main] ERROR- error: java.lang.ArithmeticException: / by zero, num: 0
13:08:11.145 [main] INFO - # onNext: 2
13:08:11.145 [main] INFO - # onNext: 1

retry

retry는 publisher가 데이터를 emit하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수만큼 원본 Flux의 sequence를 다시 구독한다. 만약 파라미터로 Long.Max_Value를 입력하면 재구독을 무한 반복한다.

public static void main(String[] args) throws InterruptedException {
    final int[] count = {1};
    Flux
        .range(1, 3)
        .delayElements(Duration.ofSeconds(1))
        .map(num -> {
            try {
                if (num == 3 && count[0] == 1) {
                    count[0]++;
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {}

            return num;
        })
        // 지연시간동안 응답을 받지 못하면 timeoutException에러 발생
        .timeout(Duration.ofMillis(1500))
        // 재요청
        .retry(1)
        .subscribe(data -> log.info("# onNext: {}", data),
                (error -> log.error("# onError: ", error)),
                () -> log.info("# onComplete"));

    Thread.sleep(7000);
}

Sequence의 동작 시간 측정을 위한 operator

elapsed

emit된 데이터 사이의 경과 시간을 측정해서 tuple<Long, T> 형태로 downstream에 emit한다. emit된 데이터는 onSubscribe Signal과 데이터 사이의 시간을 기준으로 측정한다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .range(1, 5)
        .delayElements(Duration.ofSeconds(1))
        .elapsed()
        .subscribe(data -> log.info("# onNext: {}, time: {}",
                                            data.getT2(), data.getT1()));

    Thread.sleep(6000);
}

// 출력
11:35:08.489 [parallel-1] INFO - # onNext: 1, time: 1096
11:35:09.521 [parallel-2] INFO - # onNext: 2, time: 1032
11:35:10.524 [parallel-3] INFO - # onNext: 3, time: 1003
11:35:11.543 [parallel-4] INFO - # onNext: 4, time: 1019
11:35:12.550 [parallel-5] INFO - # onNext: 5, time: 1007

Flux Sequence 분할을 위한 operator

window

window는 upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다. reactor에서는 이렇게 분할된 flux를 윈도우라고 부른다.

public static void main(String[] args) {
    Flux.range(1, 11)
            .window(3) // 3개씩 분할된 Flux -> Flux<Flux<Integer>>
            .flatMap(flux -> { // flatMap으로 펼치기 -> Flux<Integer> 을 return
                log.info("======================"); 
                return flux;
            })
            .subscribe(new BaseSubscriber<>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    subscription.request(2);
                }

                @Override
                protected void hookOnNext(Integer value) {
                    log.info("# onNext: {}", value);
                    request(2);
                }
            });
}

// 출력
11:40:55.107 [main] INFO - ======================
11:40:55.112 [main] INFO - # onNext: 1
11:40:55.113 [main] INFO - # onNext: 2
11:40:55.113 [main] INFO - # onNext: 3
11:40:55.113 [main] INFO - ======================
11:40:55.113 [main] INFO - # onNext: 4
11:40:55.113 [main] INFO - # onNext: 5
11:40:55.113 [main] INFO - # onNext: 6
11:40:55.114 [main] INFO - ======================
11:40:55.114 [main] INFO - # onNext: 7
11:40:55.114 [main] INFO - # onNext: 8
11:40:55.114 [main] INFO - # onNext: 9
11:40:55.114 [main] INFO - ======================
11:40:55.114 [main] INFO - # onNext: 10
11:40:55.115 [main] INFO - # onNext: 11

buffer

upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit한다. 마지막 버퍼에 포함된 데이터 개수는 maxSize보다 적거나 같다. maxSize가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, maxSize가 될때까지 무한정 기다리게 된다. 따라서 bufferTimeout을 사용하는 것이 좋은 선택이다.

public static void main(String[] args) {
    Flux.range(1, 95)
            .buffer(10) // 최대 10개가 담기면 List 버퍼 형태로 emit
            .subscribe(buffer -> log.info("# onNext: {}", buffer));
}

// 출력
11:45:18.439 [main] INFO - # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
11:45:18.441 [main] INFO - # onNext: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
11:45:18.441 [main] INFO - # onNext: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
11:45:18.441 [main] INFO - # onNext: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
11:45:18.441 [main] INFO - # onNext: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
11:45:18.441 [main] INFO - # onNext: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
11:45:18.441 [main] INFO - # onNext: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
11:45:18.441 [main] INFO - # onNext: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
11:45:18.442 [main] INFO - # onNext: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
11:45:18.442 [main] INFO - # onNext: [91, 92, 93, 94, 95]

bufferTimeout

upstream에서 emit되는 첫번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한 번에 emit한다. maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit된 데이터를 List버퍼로 emit한다.

public static void main(String[] args) {
    Flux
        .range(1, 20)
        .map(num -> {
            try {
                if (num < 10) {
                    Thread.sleep(100L);
                } else {
                    Thread.sleep(300L);
                }
            } catch (InterruptedException e) {}
            return num;
        })
        .bufferTimeout(3, Duration.ofMillis(400L))
        .subscribe(buffer -> log.info("# onNext: {}", buffer));
}

groupBy

emit되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화한 groupedFlux를 리턴하며, 이를 통해 그룹별로 작업을 수행할 수 있다.

public static void main(String[] args) {
    Flux.fromIterable(SampleData.books)
            // 작가이름 별로 group -> Flux<GroupedFlux<String, Book>>
            .groupBy(book -> book.getAuthorName())
            .flatMap(groupedFlux ->
                    groupedFlux
                            .map(book -> book.getBookName() +
                                    "(" + book.getAuthorName() + ")")
                            .collectList()
            )
            .subscribe(bookByAuthor ->
                    log.info("# book by author: {}", bookByAuthor));
}

// 출력
12:02:24.405 [main] INFO - # book by author: [Advance Kotlin(Kevin), Getting started Kotlin(Kevin)]
12:02:24.415 [main] INFO - # book by author: [Advance Javascript(Mike), Getting started Javascript(Mike)]
12:02:24.416 [main] INFO - # book by author: [Advance Java(Tom), Getting started Java(Tom)]
12:02:24.416 [main] INFO - # book by author: [Advance Python(Grace), Getting started Python(Grace)]
12:02:24.416 [main] INFO - # book by author: [Advance Reactor(Smith), Getting started Reactor(Smith)]

groupBy의 경우 keyMapper를 통해 생성되는 key를 기준으로 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공할 수도 있다.

public static void main(String[] args) {
    Flux.fromIterable(SampleData.books)
            // Flux<GroupedFlux<String, String>>
            .groupBy(book ->
                    book.getAuthorName(), // keyMapper
                    book -> book.getBookName() + "(" + book.getAuthorName() + ")") // valueMapper 
            // value들을 List로 묶기 -> Flux<List<String>>
            // flatMap에는 GroupFlux<String, String> 하나씩 들어간다고 보면 된다.
            .flatMap(groupedFlux -> groupedFlux.collectList())
            .subscribe(bookByAuthor ->
                    log.info("# book by author: {}", bookByAuthor));
}
// 데이터
public static final List<Book> books =
        Arrays.asList(
                new Book("Advance Java", "Tom",
                        "Tom-boy", 25000, 100),
                new Book("Advance Python", "Grace",
                        "Grace-girl", 22000, 150),
                new Book("Advance Reactor", "Smith",
                        "David-boy", 35000, 200),
                new Book("Getting started Java", "Tom",
                        "Tom-boy", 32000, 230),
                new Book("Advance Kotlin", "Kevin",
                        "Kevin-boy", 32000, 250),
                new Book("Advance Javascript", "Mike",
                        "Tom-boy", 32000, 320),
                new Book("Getting started Kotlin", "Kevin",
                        "Kevin-boy", 32000, 150),
                new Book("Getting started Python", "Grace",
                        "Grace-girl", 32000, 200),
                new Book("Getting started Reactor", "Smith",
                        null, 32000, 250),
                new Book("Getting started Javascript", "Mike",
                        "David-boy", 32000, 330)
        );

public static void main(String[] args) {
    Flux.fromIterable(SampleData.books)
            // 작가 이름을 key로 묶기 -> Flux<GroupedFlux<String, Book>>
            .groupBy(book -> book.getAuthorName())
            .flatMap(groupedFlux -> // groupedFlux<String, Book> 형태로 하나씩 들어간다.
                Mono
                    .just(groupedFlux.key())
                    // zipWith로 두 개의 Mono를 하나로 합친다.
                    .zipWith(
                        groupedFlux
                            // 저자가 얻을 수 있는 인세 계산 -> value로 여러 Book들이 있고, 그것들의 인세 계산된다.
                            .map(book ->
                                (int)(book.getPrice() * book.getStockQuantity() * 0.1))
                            // 인세들을 더하고 BiFunction으로 최종 value 세팅
                            .reduce((y1, y2) -> y1 + y2),
                                (authorName, sumRoyalty) ->
                                    authorName + "'s royalty: " + sumRoyalty)
            )
            .subscribe(log::info);
}

// 출력
12:38:04.288 [main] INFO - Kevin's royalty: 1280000
12:38:04.298 [main] INFO - Mike's royalty: 2080000
12:38:04.298 [main] INFO - Tom's royalty: 986000
12:38:04.298 [main] INFO - Grace's royalty: 970000
12:38:04.298 [main] INFO - Smith's royalty: 1500000

다수의 Subscriber에게 Flux를 멀티캐스팅하기 위한 Operator

subscriber가 구독하면 upstream에서 emit된 데이터가 구독 중인 모든 subscriber에게 멀티캐스팅되는데 이를 가능케 해주는 operator들은 cold Sequence를 hot sequence로 동작하게 하는 특징이 있다.

publish

publish operator는 구독을 하더라도 구독 시점에 즉시 데이터를 emit하지 않고, connect를 호출하는 시점에 비로소 데이터를 emit한다. 그리고 hot sequence로 변환되기 때문에 구독 시점 이후에 emit된 데이터만 전달받을 수 있다.

public static void main(String[] args) throws InterruptedException {
    ConnectableFlux<Integer> flux =
            Flux
                .range(1, 5)
                .delayElements(Duration.ofMillis(300L)) 
                .publish(); // 0.3초에 한번씩 emit하는 ConnectableFlux<Integer>을 리턴, 아직 connect 호출이 없어 emit되는 데이터는 없다.

    Thread.sleep(500L);    
    flux.subscribe(data -> log.info("# subscriber1: {}", data));

    Thread.sleep(200L);
    flux.subscribe(data -> log.info("# subscriber2: {}", data));

    // connect 호출이 발생하면서 데이터 emit -> 이 시점부터 0.3초에 한번씩 emit
    flux.connect();

    // 이미 emit된 데이터에 대해서는 전달받지 못함 : hot sequence
    Thread.sleep(1000L);
    flux.subscribe(data -> log.info("# subscriber3: {}", data));

    Thread.sleep(2000L);
}

// 출력
13:16:25.225 [parallel-1] INFO - # subscriber1: 1
13:16:25.257 [parallel-1] INFO - # subscriber2: 1
13:16:25.566 [parallel-2] INFO - # subscriber1: 2
13:16:25.570 [parallel-2] INFO - # subscriber2: 2
13:16:25.873 [parallel-3] INFO - # subscriber1: 3
13:16:25.874 [parallel-3] INFO - # subscriber2: 3
13:16:26.180 [parallel-4] INFO - # subscriber1: 4
13:16:26.182 [parallel-4] INFO - # subscriber2: 4
13:16:26.183 [parallel-4] INFO - # subscriber3: 4
13:16:26.487 [parallel-5] INFO - # subscriber1: 5
13:16:26.490 [parallel-5] INFO - # subscriber2: 5
13:16:26.491 [parallel-5] INFO - # subscriber3: 5

autoConnect

publish operator는 구독이 발생하더라도 connect를 직접 호출하기 전까지는 데이터 emit하지 않기 때문에 코드 상에서 connect 호출이 필요하다. 반면 autoConnect는 파라미터로 지정하는 숫자만큼 구독이 발생하는 시점에 upstream 소스에 자동으로 연결되기 때문에 별도의 connect 호출이 필요하지 않다.

public static void main(String[] args) throws InterruptedException {
    Flux<String> publisher =
            Flux
                .just("Concert part1", "Concert part2", "Concert part3")
                .delayElements(Duration.ofMillis(300L))
                .publish()
                .autoConnect(2);

    Thread.sleep(500L);
    publisher.subscribe(data -> log.info("# audience 1 is watching {}", data));

    Thread.sleep(500L);
    publisher.subscribe(data -> log.info("# audience 2 is watching {}", data));

    Thread.sleep(500L);
    publisher.subscribe(data -> log.info("# audience 3 is watching {}", data));

    Thread.sleep(1000L);
}

// 출력
13:19:09.507 [parallel-1] INFO - # audience 1 is watching Concert part1
13:19:09.529 [parallel-1] INFO - # audience 2 is watching Concert part1
13:19:09.836 [parallel-2] INFO - # audience 1 is watching Concert part2
13:19:09.839 [parallel-2] INFO - # audience 2 is watching Concert part2
13:19:09.840 [parallel-2] INFO - # audience 3 is watching Concert part2
13:19:10.150 [parallel-3] INFO - # audience 1 is watching Concert part3
13:19:10.152 [parallel-3] INFO - # audience 2 is watching Concert part3
13:19:10.153 [parallel-3] INFO - # audience 3 is watching Concert part3

refCount

파라미터로 입력된 숫자만큼 구독이 발생하는 시점에 upstream소스에 연결되며, 모든 구독이 취소되거나 upstream의 데이터 emit이 종료되면 연결이 해제된다. 이는 주로 무한 스트림 상황에서 모든 구독이 취소될 경우 연결을 해제하는 데 사용된다.

public static void main(String[] args) throws InterruptedException {
    Flux<Long> publisher =
            Flux
                .interval(Duration.ofMillis(500))
                .publish().refCount(1);

    // 구독                   
    Disposable disposable =
            publisher.subscribe(data -> log.info("# subscriber 1: {}", data));

    Thread.sleep(2100L);
    // 구독 취소 -> 모든 구독 취소되었으므로 연결 해제
    disposable.dispose();

    // 구독 발생 -> upstream 소스에 다시 연결 -> upstream은 0부터 다시 emit
    publisher.subscribe(data -> log.info("# subscriber 2: {}", data));

    Thread.sleep(2500L);
}

// 출력
13:24:58.590 [parallel-1] INFO - # subscriber 1: 0
13:24:59.089 [parallel-1] INFO - # subscriber 1: 1
13:24:59.591 [parallel-1] INFO - # subscriber 1: 2
13:25:00.088 [parallel-1] INFO - # subscriber 1: 3
13:25:00.705 [parallel-2] INFO - # subscriber 2: 0
13:25:01.203 [parallel-2] INFO - # subscriber 2: 1
13:25:01.702 [parallel-2] INFO - # subscriber 2: 2
13:25:02.202 [parallel-2] INFO - # subscriber 2: 3
13:25:02.708 [parallel-2] INFO - # subscriber 2: 4

만약 autoCommit이었다면 연결이 끊어진 것이 아니기 때문에 다시 구독이 발생하면 0부터가 아니라 이어서 데이터가 emit되었을 것이다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant