메인 코드
public class L11_FlowableSample2Simple {
@Test
void flowableTest() throws InterruptedException {
// 1. 람다 메소드 방식 */
Flowable<String> flowable = Flowable.create(stringFlowable, BackpressureStrategy.BUFFER);
// 2. 인스턴스 생성 방식 */
// Flowable<String> flowable = Flowable.create(new StringFlowable(), BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.computation())
.subscribe(new StringSubscriber());
Thread.sleep(1000L);
}
/* 1. 람다 메소드 정의 */
FlowableOnSubscribe<String> stringFlowable = (emitter) -> {
String[] datas = {"Hello, world!", "Hi, RxJava!"};
for (String data:datas) {
if(emitter.isCancelled()) return;
emitter.onNext(data);
}
emitter.onComplete();
};
/* 2. 클래스 정의 */
class StringFlowable implements FlowableOnSubscribe<String> {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
String[] datas = {"Hello, world!", "Hi, RxJava!"};
for (String data:datas) {
if(emitter.isCancelled()) return;
emitter.onNext(data);
}
emitter.onComplete();
}
};
class StringSubscriber implements Subscriber<String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1L);
}
@Override
public void onNext(String data) {
//실행 중인스레드 이름을 얻는다.
String threadName = Thread.currentThread().getName();
//스레드 이름과 받은 데이터를 출력한다.
System.out.println(threadName + " : " + data);
//다음에 받을 데이티 개수를 요청한다.
this.subscription.request(1L);
}
@Override
public void onError(Throwable ex) {//실행 중인스레드 이름을 얻는다.
ex.printStackTrace();
}
@Override
public void onComplete() {
//실행 중인스레드 이름을 얻는다.
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " : 완료");
}
}
}
Flowable<String> flowable = Flowable.create(stringFlowable, BackpressureStrategy.BUFFER);
Flowable<T>은 Publisher<T> 인터페이스를 구현해야 한다.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
Flowable<T>에는 아래와 같이 subscribe 메소드를 구현하고 있다.
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport("none")
public final void subscribe(@NonNull Subscriber<? super T> subscriber) {
if (subscriber instanceof FlowableSubscriber) {
this.subscribe((FlowableSubscriber)subscriber);
} else {
Objects.requireNonNull(subscriber, "subscriber is null");
this.subscribe((FlowableSubscriber)(new StrictSubscriber(subscriber)));
}
}
- 이미지1을 보면 FlowableOnSubscribe를 인자로 넘기고 있기 때문에
- this.subscribe((FlowableSubscriber)(new StrictSubscriber(subscriber))); 이 코드가 수행된다.
- New StrictSubscriber(subscriber)로 람다 메소드를 인자로 받아 StrictSubscriber를 생성한 후에
- 이를 다시 subscribe(FlowableSubscriber) 메소드의 인자로 호출하고 있다.
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport("none")
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
try {
Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
this.subscribeActual(flowableSubscriber);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}
- subscriber 를 참조로 보관 후
- this.subscribeActual(인자)을 호출한다.
this.subscribeActual은 어디에?
- 이미지1에서 Flowable<T>를 생성할 때 FlowableCreate<T> 클래스의 인스턴스를 생성했다.
- FlowableCreate<T>에는 subscribeActual 메소드가 구현되어 있다.
- backpressure에 따라 emiiter를 여러 방법으로 생성한다.
- 본 분석 소스에서는 BackpressureStrategy.BUFFER 로 선언되어 있다.
- default: 하단의 아래 코드로 Emitter를 생성한다.
BufferAsyncEmitter static 클래스는 FlowableCreate의 내부에 존재한다.
- static 으로 선언되어 바깥 클래스와 독립적으로 인스턴스가 생성된다.
- 바깥 클래스의 멤버 클래스가 아니기 때문에 바깥 클래스의 멤버를 참조할 수 없다.
- GC에서도 이점이 있으며, 인스턴스 생성에서도 메모리를 추가로 차지하지 않고, 생성도 빠르다.
선언된 곳 FlowableCreate. |
FlowableOnSubscribe<T> | Emitter<T> | FlowableEmitter<T> | Subscription |
람다 메소드 주입 | void subscribe(FlowableEmitter<T>) | |||
BufferAsyncEmitter | void onNext(T value) | |||
BaseEmitter | void onError(Throwable error) | |||
BufferASyncEmitter | void onComplete | |||
BaseEmitter | void setDisposable(Disposable d) | |||
BaseEmitter | void set Cannellable(Cancellable c) | |||
BaseEmitter | long requested() | |||
BaseEmitter | FlowableEmitter<T> serialize() | |||
BaseEmitter | boolean tryOnError(Throwable t) | |||
BaseEmitter | void Request(Long var1) | |||
BaseEmitter | void cancel() |
'프로그래밍 > OOP_Pattern_TDD' 카테고리의 다른 글
Java로 DSL(Domain Specific Language) 개발 (0) | 2023.08.09 |
---|---|
ThreadPool로 초당 API 호출 건수 제한 - ExecutorService (0) | 2023.08.02 |
RxJava Single 구조 파해치기 (0) | 2023.04.18 |
자바 DSL - 중첩된 함수(NestedFunction) + Lambda (0) | 2023.03.08 |
자바 DSL - 중첩된 함수(NestedFunction) 패턴 (0) | 2023.03.08 |