Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spring webflux] 리액티브 프로그래밍 #53

Open
backtony opened this issue May 18, 2023 · 0 comments
Open

[spring webflux] 리액티브 프로그래밍 #53

backtony opened this issue May 18, 2023 · 0 comments

Comments

@backtony
Copy link
Owner

리액티브 시스템과 리액티브 프로그래밍

리액티브 프로그래밍

  • 리액티브 시스템을 구축하는데 필요한 프로그래밍 모델
  • blocking i/o가 아닌 non-blocking i/o 방식
    • blocking i/o
      • 해당 스레드가 작업을 처리할 때까지 남아 있는 작업들은 해당 작업이 끝날 때까지 차단되어 대기
    • non-blocking i/o
      • 스레드가 차단되지 않음

리액티브 프로그래밍 코드 구성

크게 publisher, subscriber, data source, operator 등으로 구성된다.

  • publisher
    • 발행인, 발행자
    • 입력으로 들어오는 데이터 제공
  • subscriber
    • 구독자
    • publisher가 제공하는 데이터를 전달받아 사용하는 주체
  • data source
    • publisher의 입력으로 들어오는 데이터
    • data stream
  • operator
    • publisher로부터 전달된 데이터가 순수하게 처리를 거치지 않고 그대로 subscriber에 전달되는 경우는 거의 없다.
    • 그 사이에 적절한 가공 처리가 이뤄지는데 이를 담당한다.

리액티브 스트림즈(reactive streams)

  • Non-Blocking 이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
  • 구현체로는 RxJava, Reactor, Akka Streams 등이 있는데 Spring과 잘 맞는 것은 Reactor

리액티브 스트림즈 구성 요소

  • Publisher
    • 데이터를 생성하고 발행하는 역할
  • Subscriber
    • 구독한 Publisher로부터 발행된 데이터를 전달받아 처리하는 역할
  • Subscription
    • Publisher에 요청할 데이터 개수를 지정하고, 데이터의 구독을 취소하는 역할
  • Processor
    • Publisher와 Subscriber의 기능을 모두 갖는다.
    • Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다.

흐름도

  1. 먼저 subscriber는 전달받을 데이터를 구독한다.(subscribe)
  2. 다음으로 Publisher는 데이터 발행할 준비가 되었음을 Subscriber에게 알린다.(onSubscribe)
  3. Publisher가 데이터를 통지할 준비가 되었다는 알림을 받은 Subscriber는 전달받기를 원하는 데이터의 개수를 Publisher에게 요청한다.(Subscription.request)
  4. 다음으로 Publisher는 Subscriber로부터 요청받은 만큼의 데이터를 통지한다.(onNext)
  5. 이렇게 Publisher와 Subscriber 간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알립니다.(onComplete). 만약에 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다.(onError)

subscriber가 subscription.request를 통해 데이터의 요청 개수를 지정하는 이유는 뭘까? 흐름도를 보면 publisher와 subscriber가 마치 같은 스레드에서 동기적으로 상호작용하는 것처럼 보이지만 보통 비동기로 상호작용하는 경우가 대부분이다.
이럴 경우 publisher가 통지하는 속도가 publisher로부터 통지된 데이터를 subscriber가 처리하는 속도보다 더 빠르면 처리를 기다리는 데이터는 쌓이게 되고, 결과적으로 시스템 부하가 커진다. 이러한 문제를 방지하기 위해서 subscription.request를 통해 데이터 개수를 제어한다.

코드로 살펴보기

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

publisher는 인터페이스로, subscribe 메서드를 구현하면 된다. subscribe 메서드는 파라미터로 전달받은 subscriber를 등록하는 역할을 한다.

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
  • onSubscribe
    • 구독 시작 시점에 publisher에게 요청할 데이터 개수를 지정하거나 구독을 해지하는 역할을 한다.
    • 인자로 들어온 Subscription 객체를 통해 이뤄진다.
  • onNext
    • publisher가 통지한 데이터를 처리하는 역할
  • onError
    • publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생한 경우 처리하는 역할
  • onComplete
    • Publisher가 데이터 통지를 완료했음을 알릴 때 호출
    • 데이터 통지가 정상적으로 완료된 경우, 후처리를 한다면 여기서 하면 된다.

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscriber가 구독한 데이터의 개수를 요청하거나 또는 데이터 요청의 취소(구독 해지) 역할을 한다.

  • request
    • 데이터의 개수 요청
  • cancel
    • 구독 해지

코드 관점 흐름도 다시보기

  1. publisher가 subscriber 인터페이스 구현체를 subscribe 메서드의 파라미터로 받는다.
  2. publisher 내부에서는 전달받은 subscriber 구현체의 onSubscribe 메서드를 호출하면서 subscriber의 구독을 의미하는 subscription 구현체를 Subscriber에게 전달한다.
  3. 호출된 subscriber 구현체의 onSubscribe 메서드에서 전달받은 Subscription 객체를 통해 전달받은 데이터의 개수를 Publisher에게 요청한다.
  4. Publisher는 Subscriber로부터 전달받은 요청 개수만큼 데이터를 onNext 메서드를 호출해 Subscriber에게 전달한다.
  • Publisher는 통지할 데이터가 더 없을 경우 onComplete 메서드를 호출해 Subscriber에게 종료되었음을 알린다.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

processor는 별도로 구현하는 메서드가 없다. 다른 인터페이스와 다른점은 subscriber와 publisher를 상속한다. 이는 두 기능을 모두 가지고 있기 때문이다.

리액티브 스트림즈 관련 용어

  • Signal
    • Publisher와 Subscriber 간에 주고받는 상요 작용
    • onSubscribe, onNext, onComplete, onError, request, cancel 메서드를 Signal이라고 한다.
    • Subscriber의 인터페이스에 정의된 메서드들은 실제로 호출하는 주체는 Publisher이므로 Publisher가 Subscriber에게 보내는 Signal이라고 볼 수 있다.
    • request와 cancel은 실제로 사용하는 주체는 subscriber이므로 Subscriber가 publisher에게 보내는 signal이다.
  • Demand
    • subscriber가 publisher에게 요청하는 데이터를 의미한다.
    • 구체적으로는 publishcer가 아직 subscriber에게 전달하지 않은 subscriber가 요청한 데이터를 의미
  • Emit
    • publisher가 subscriber에게 데이터를 전달하는 것을 의미한다.
  • Upstream/DownStream
    • stream은 흐르다. upStream은 위로 흐르다. downStream은 아래로 흐르다.
public static void main(String[] args) {
    Flux
        .just(1, 2, 3, 4, 5, 6)
        .filter(n -> n % 2 == 0)
        .map(n -> n * 2)
        .subscribe(System.out::println);
}

데이터 관점으로 볼 때, just 메서드 호출을 통해 반환된 Flux 입장에서는 filter 메서드 호출을 통해 반환된 Flux가 자신보다 더 하위에 있기 때문에 downStream이 된다.

반면에 filter 메서드 호출을 통해 반환된 Flux 입장에서는 just 메서드 호출을 통해 반환된 Flux가 자신보다 상위에 있기 때문에 upstream이 된다.


  • sequence
    • Publisher가 emit 하는 데이터의 연속적인 흐름
    • 앞선 예시처럼 flux 데이터 생성, emit, filter, map 순차적으로 데이터 처리하는 과정을 sequence라고 한다.
    • 쉽게 생각하면 다양한 operator로 데이터의 연속적인 흐름을 정의한 것
  • operator
    • filter, just, map 같은 연산자
  • source
    • data source, source publisher, source flux 등이 보통인데 대부분 최초의 라는 의미로 사용된다.

리액트 스트림즈 구현 규칙

publisher 구현 규칙

  • publisher가 subscriber에게 보내는 onNext signal 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
    • subscriber가 요청한 개수보다 publisher는 더 많이 보낼 수 없다.
  • publisher는 요청된 것보다 작은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
  • publisher는 데이터 처리가 실패하면 onError signal을 보낸다.
  • publisher 데이터 처리가 성공적으로 종료되면 onComplete signal을 보낸다.
  • publisher가 subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 subscriber의 구독은 취소된 것으로 간주되어야 한다.
  • 일단 종료 상태에서 signal을 받으면 더이상 signal일 발생되지 않아야 한다.
  • 구독 취소되면 subscriber는 signal 받는 것을 중지해야 한다.

subscriber 구현 규칙

  • subscriber는 publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)을 통해 Demand Signal을 publisher에게 보내야 한다.
  • subscriber.onComplete() 및 subscriber.onError는 subscription 또는 publisher의 메서드를 호출해서는 안된다.
  • subscriber.onComplete() 및 subscriber.onError는 signal을 수신한 후 구독이 취소된 것으로 간주되어야 한다.
  • 구독이 더이상 필요하지 않은 경우 subscriber는 subscription.cancel()을 호출해야 한다.
    • 리소스를 적절한 시기에 안전하게 해제
  • subscriber.onSubscribe()는 지정된 subscriber에 대해 최대 한 번만 호출되어야 한다.
    • 동일한 구독자가 최대 한 번만 구독할 수 있다는 의미

subscription 구현 규칙

  • 구독은 subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 subscription.request를 호출하도록 허용해야 한다.
  • 구독이 취소된 후 추가적으로 호출되는 subscription.request(n)은 효력이 없어야 한다.
  • 구독이 취소된 후 추가적으로 호출되는 subscription.cancel()은 효력이 없어야 한다.
  • 구독이 취소되지 않은 동안 subscription.request(n)의 매개변수가 0보다 작거나 같으면 IlligalArgumentException과 함께 onError signal을 보내야 한다.
  • 구독이 취소되지 않은 동안 subscription.cancel은 publisher가 subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
  • 구독이 취소되지 않은 동안 subscription.cancel은 publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
  • subscription.cancel, request 호출에 대한 응답으로 예외를 던지는 것은 허용하지 않는다.
  • 구독은 무제한 수의 request 호출을 지원해야 하고 2^63-1 개의 Demand를 지원한다.

Blocking I/O, Non-Blocking I/O

I/O는 일반적으로 컴퓨터 시스템이 외부의 입출력 장치들과 데이터를 주고받는 것을 의미하며, I/O작업의 대표적인 예로는 디스크에 저장된 파일을 읽어들여 메모리에 올리는 것이 있다. 이외에도 DB 데이터 조회, 추가 역시 I/O라고 하며 이는 DB I/O라고 한다. 그리고 웹 애플리케이션에서 다른 웹 애플리케이션으로 네트워크 통신을 한다면 네트워크 I/O가 발생하는 것이다.

Blocking I/O

하나의 스레드가 I/O에 의해 차단되어 대기하는 것을 Blocking I/O라고 한다. 이를 보완하기 위해 멀티스레딩 기법으로 추가 스레드를 할당하여 차단된 그 시간을 효율적으로 사용할 수 있다. 그런데 Blocking I/O방식에서 CPU 대비 많은 스레드 수를 할당하는 멀티스레딩 기법은 문제가 존재한다.

  • 컨텍스트 스위칭으로 인한 스레드 전환 비용
    • 컨텍스트 스위칭 : 프로그램을 번갈아 실행하는 과정에서 기존에 실행되고 있는 프로세스의 정보를 PCB 공간에 저장하고 다시 실행시켜야 할 피로세스 정보를 PCB로부터 불러오는 과정
    • 프로세스의 정보를 PCB에 저장, reload 하는 동안에는 CPU가 다른 작업을 하지 못하고 대기하게 된다.
    • 컨텍스트 스위칭이 많을수록 CPU 전체 대기 시간이 길어진다. -> 성능 저하
  • 과다한 메모리 사용으로 오버헤드 발생
    • 일반적으로 새로운 스레드가 실행되면 JVM에서는 해당 스레드를 위한 스택 영역의 일부를 할당하며, 새로운 스레드의 정보는 스택 영역에 개별 프레임의 형태로 저장
    • JVM 디폴트 스택 사이즈는 64비트인 경우, 1024kb이므로 만약 6만 4천명이 동시 접속을 한다면 64GB 정도의 메모리가 추가로 필요하다.
    • 서블릿 컨테이너 기반 JAVA 웹애플리케이션은 요청당 하나의 스레드를 할당하므로 각 스레드 내부에서 또 다른 처리를 위해 스레드를 추가로 할당하게 되면 감당하기 힘들 정도의 메모리 사용량이 늘어날 수 있다.
  • 스레드 풀에서 응답 지연 발생
    • 스프링부트는 톰캣이라는 서블릿 컨테이너를 내장한다.
    • 톰캣은 요청을 효율적으로 처리하기 위해 스레드 풀을 사용한다.
    • 대량의 요청이 발생하게 되어 스레드 풀에 사용 가능한 유휴 스레드가 없다면 유휴 스레드가 생길 때까지 응답 지연이 발생한다.

Non-Blocking I/O

작업 스레드의 종료 여부와 관계없이 요청한 스레드는 차단되지 않는다. 따라서 하나의 스레드로 많은 수의 요청을 처리할 수 있다. Blocking I/O 방식보다 더 적은 수의 스레드를 사용하기 때문에 Blocking I/O에서 멀티스레딩 기법을 사용할 때 발생한 문제들이 생기지 않는다. 하지만 Non-Blocking I/O 방식에도 단점이 있다.

  • 스레드 내부에서 CPU를 많이 사용하는 작업이 포함된 경우 성능에 악영향 발생
  • 사용자 요청에서 응답 과정 사이에 Blocking I/O 요소가 포함된 경우에는 Non-Blocking의 이점을 발휘하기 어렵다.

spring에서의 Blocking I/O, Non-Blocking I/O

  • spring mvc
    • blocking I/O 기반
  • spring webflux
    • non-blocking I/O 기반
// mvc 
// 책 정보 제공하는 api
@ResponseStatus(HttpStatus.OK)
@GetMapping("/{book-id}")
public ResponseEntity<Book> getBook(@PathVariable("book-id") long bookId)
        throws InterruptedException {

    // 5초 지연            
    Thread.sleep(5000);

    Book book = bookMap.get(bookId);

    return ResponseEntity.ok(book);
}
// mvc
@Bean
public CommandLineRunner run() {
    return (String... args) -> {
        log.info("# 요청 시작 시간: {}", LocalTime.now());

        for (int i = 1; i <= 5; i++) {
            Book book = this.getBook(i);
            log.info("{}: book name: {}", LocalTime.now(), book.getName());
        }
    };
}

private Book getBook(long bookId) {
    RestTemplate restTemplate = new RestTemplate();

    URI getBooksUri = UriComponentsBuilder.fromUri(baseUri)
            .path("/{book-id}")
            .build()
            .expand(bookId)
            .encode()
            .toUri(); 
    
    // blocking -> 5번 호출로 인해 25초 지연 발생
    ResponseEntity<Book> response =
            restTemplate.getForEntity(getBooksUri, Book.class);
    Book book = response.getBody();

    return book;
}

mvc의 경우 blocking으로 인해 25초 지연이 발생한다.

// webflux
// 책 정보 제공 api
@ResponseStatus(HttpStatus.OK)
@GetMapping("/{book-id}")
public Mono<Book> getBook(@PathVariable("book-id") long bookId)
        throws InterruptedException {
    Thread.sleep(5000);

    Book book = bookMap.get(bookId);
    log.info("# book for response: {}, {}", book.getBookId(), book.getName());
    return Mono.just(book);
}
// webflux
@Bean
public CommandLineRunner run() {
    return (String... args) -> {
        log.info("# 요청 시작 시간: {}", LocalTime.now());

        for (int i = 1; i <= 5; i++) {
            int a = i;
            this.getBook(i)
                .subscribe(
                        book -> {
                            // 전달 받은 도서를 처리.
                            log.info("{}: book name: {}",
                                    LocalTime.now(), book.getName());
                        }
                );
        }
    };
}

mvc랑은 다르게 응답 데이터를 바로 처리하지 않고 subscribe에서 응답 데이터를 전달받은 후에 처리하고 있다. 이렇게 처리하는 이유는 API로부터 전달 받은 응답이 Mono 타입이고 Mono는 Reactor에서 지원하는 Publisher 타입 중 하나이기 때문이다. Publisher 인터페이스는 subscribe()를 호출해서 전달받은 데이터를 처리하도록 정의되어 있는데, Mono 역시 publisher이므로 subscribe를 호출해서 전달받은 데이터를 처리한다.

HTTP 통신을 통해서 Publisher 타입(Mono)을 전달받든 애플리케이션 내부에서 메서드 호출을 통해 전달받든, 수신한 데이터를 처리하기 위해서는 전달받은 Publisher의 subscirbe를 호출하는 과정이 필요하다는 점을 기억해야 한다.

실행해보면 조회 시간이 5초정도밖에 걸리지 않는다. webflux는 스레드가 차단되지 않기 때문에 이와 같은 결과가 나온다.


cf) Mono
Mono는 Reactor에서 지원하는 Publisher 타입 중 하나로, 단 하나의 데이터만 emit하는 Publisher 타입이다.

일반적으로 HTTP 요청에 대한 응답으로 JSON 형식의 응답을 많이 사용하는데 JSON 형식으로 전달되는 응답 내부에는 여러 가지 결과 값이 포함될 수 있지만, JSON 형식의 응답 자체는 하나의 문자열로 구성된 데이터이기 때문에 Mono를 사용하기 가장 적합하다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant