Justin의 개발 로그
article thumbnail

메인 코드

public class L11_FlowableSample2Simple {

    @Test
    void flowableTest() throws InterruptedException {
        // 1. 람다 메소드 방식 */
        Flowable<String> flowable = Flowable.create(stringFlowable, BackpressureStrategy.BUFFER);

        // 2. 인스턴스 생성 방식 */
        // Flowable<String> flowable = Flowable.create(new StringFlowable(), BackpressureStrategy.BUFFER);
        flowable.observeOn(Schedulers.computation())
                .subscribe(new StringSubscriber());

        Thread.sleep(1000L);
    }

    /* 1. 람다 메소드 정의 */
    FlowableOnSubscribe<String> stringFlowable = (emitter) -> {
        String[] datas = {"Hello, world!", "Hi, RxJava!"};

        for (String data:datas) {
            if(emitter.isCancelled()) return;

            emitter.onNext(data);
        }

        emitter.onComplete();
    };

    /* 2. 클래스 정의 */
    class StringFlowable implements FlowableOnSubscribe<String> {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
            String[] datas = {"Hello, world!", "Hi, RxJava!"};

            for (String data:datas) {
                if(emitter.isCancelled()) return;

                emitter.onNext(data);
            }

            emitter.onComplete();
        }
    };


    class StringSubscriber implements Subscriber<String> {
        private Subscription subscription;
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            subscription.request(1L);
        }

        @Override
        public void onNext(String data) {
            //실행 중인스레드 이름을 얻는다.
            String threadName = Thread.currentThread().getName();

            //스레드 이름과 받은 데이터를 출력한다.
            System.out.println(threadName + " : " + data);

            //다음에 받을 데이티 개수를 요청한다.
            this.subscription.request(1L);
        }

        @Override
        public void onError(Throwable ex) {//실행 중인스레드 이름을 얻는다.
            ex.printStackTrace();

        }

        @Override
        public void onComplete() {
            //실행 중인스레드 이름을 얻는다.
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " : 완료");

        }
    }
}

 

 

 

Flowable<String> flowable = Flowable.create(stringFlowable, BackpressureStrategy.BUFFER);

이미지1

 

Flowable<T>은 Publisher<T> 인터페이스를 구현해야 한다.

이미지2

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> var1);
}

 

Flowable<T>에는 아래와 같이 subscribe 메소드를 구현하고 있다.

@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport("none")
public final void subscribe(@NonNull Subscriber<? super T> subscriber) {
    if (subscriber instanceof FlowableSubscriber) {
        this.subscribe((FlowableSubscriber)subscriber);
    } else {
        Objects.requireNonNull(subscriber, "subscriber is null");
        this.subscribe((FlowableSubscriber)(new StrictSubscriber(subscriber)));
    }

}

 

  • 이미지1을 보면 FlowableOnSubscribe를 인자로 넘기고 있기 때문에 
  • this.subscribe((FlowableSubscriber)(new StrictSubscriber(subscriber)));  이 코드가 수행된다.
  • New StrictSubscriber(subscriber)로 람다 메소드를 인자로 받아 StrictSubscriber를 생성한 후에 
  • 이를 다시 subscribe(FlowableSubscriber) 메소드의 인자로 호출하고 있다.

 

@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport("none")
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
    Objects.requireNonNull(subscriber, "subscriber is null");

    try {
        Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        this.subscribeActual(flowableSubscriber);
    } catch (NullPointerException var4) {
        throw var4;
    } catch (Throwable var5) {
        Exceptions.throwIfFatal(var5);
        RxJavaPlugins.onError(var5);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(var5);
        throw npe;
    }
}

 

  • subscriber 를 참조로 보관 후 
  • this.subscribeActual(인자)을 호출한다.

 

this.subscribeActual은 어디에?

  • 이미지1에서 Flowable<T>를 생성할 때 FlowableCreate<T> 클래스의 인스턴스를 생성했다.
  • FlowableCreate<T>에는 subscribeActual 메소드가 구현되어 있다.

  • backpressure에 따라 emiiter를 여러 방법으로 생성한다.
  • 본 분석 소스에서는 BackpressureStrategy.BUFFER 로 선언되어 있다.
  • default: 하단의 아래 코드로 Emitter를 생성한다.

 

BufferAsyncEmitter static 클래스는 FlowableCreate의 내부에 존재한다.

  • static 으로 선언되어 바깥 클래스와 독립적으로 인스턴스가 생성된다. 
  • 바깥 클래스의 멤버 클래스가 아니기 때문에 바깥 클래스의 멤버를 참조할 수 없다. 
  • GC에서도 이점이 있으며, 인스턴스 생성에서도 메모리를 추가로 차지하지 않고, 생성도 빠르다.

 

선언된 곳
FlowableCreate.
FlowableOnSubscribe<T> Emitter<T> FlowableEmitter<T> Subscription
람다 메소드 주입 void subscribe(FlowableEmitter<T>)      
BufferAsyncEmitter   void onNext(T value)    
BaseEmitter   void onError(Throwable error)    
BufferASyncEmitter   void onComplete    
BaseEmitter     void setDisposable(Disposable d)  
BaseEmitter     void set Cannellable(Cancellable c)  
BaseEmitter     long requested()  
BaseEmitter     FlowableEmitter<T> serialize()  
BaseEmitter     boolean tryOnError(Throwable t)  
BaseEmitter       void Request(Long var1)
BaseEmitter       void cancel()

 

 

 

profile

Justin의 개발 로그

@라이프노트

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!