Single을 활용한 간단한 샘플 소스를 가지고, Single이 어떻게 동작하는지 구조를 파해쳐 보도록 하겠다.
단계가 복잡하게 얽혀 전체를 파악하기 쉽지 않지만, 찬찬히 따라가다 보면 적어도 이렇게 구현되어 있구나라는 점은 파악할 수 있으리라.
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
public class L30_SingleSample {
@Test
void SingleSampleTest() {
Single<DayOfWeek> single = Single.create(getDayOfWeekSingleOnSubscribe); //1번
single.subscribe(new SingleObserver<DayOfWeek>() { //2번
@Override
public void onSubscribe(@NonNull Disposable d) {
//Do Nothing
}
@Override
public void onSuccess(@NonNull DayOfWeek dayOfWeek) {
System.out.println(dayOfWeek);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
});
}
@NotNull
private SingleOnSubscribe<DayOfWeek> getDayOfWeekSingleOnSubscribe = emitter -> {
emitter.onSuccess(LocalDateTime.now().getDayOfWeek());
};
}
Single 인스턴스를 만드는 과정
a. Single은 추상 클래스이며, SingleSource 인터페이스를 구현해야 한다.
b. SingleSource
- 1번 소스코드에서 Single.create() 메서드를 통해 Single 객체를 생성했다.
c. Single.create()
- c-1 : create 메서드는 SingleOnSubscribe를 파라미터로 받는다.
- c-2 : create() 메서드의 내부 로직에서 RxJavaPlugins.onAssembly 메서드를 통해 Single 객체를 생성 후 리턴한다.
c-1-1. SingleOnSubscribe
- SingleOnSubscribe는 함수형 인터페이스로 subscribe 메서드를 메서드 참조로 사용한다.
- subscribe 메서드는 void subscribe(SingleEmitter<T> emitter)로서 리턴값은 없으며, emitter를 소비하는 메서드이다.
- java.util.Objects 패키지의 Consumer<T> 함수형 인터페이스와 닮아있다.
C-1-1-1. SingleEmitter
SingleEmitter는 인터페이스이며, 6개의 메서드가 정의되어 있다.
public interface SingleEmitter<@NonNull T> {
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable t);
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
boolean tryOnError(@NonNull Throwable t);
}
즉, 위의 코드가 동작하려면 SingleEmitter가 로직 내에서 객체로 제공되어야 하는데, 아직까지는 SingleEmitter 객체를 생성하는 로직은 보이지 않는다.
c. Single.create()로 돌아가 보면
- Single<DayOfWeek> single = Single.create(getDayOfWeekSingleOnSubscribe);
에서 SingleOnSubscribe 함수형 인터페이스를 사용해 메서드 참조를 파라미터로 넣도록 되어 있고, - 아래와 같이 함수형 인터페이스의 메서드를 참조로 제공했다.
@NotNull
private SingleOnSubscribe<DayOfWeek> getDayOfWeekSingleOnSubscribe = emitter -> {
emitter.onSuccess(LocalDateTime.now().getDayOfWeek());
};
- 문제는 여기에서 SingleOnSubscribe 람다함수(함수형 인터페이스, subscribe 메서드 형식을 따름)에서 SingleEmitter를 소비하는데 SingleEmitter를 구현한 구현체가 소스코드에 보이지 않는다.
void subscribe(@NonNull SingleEmitter<T> emitter) throws Throwable;
c-2-1. RxJavaPlugins.onAssembly 메소드
- onAssembly는 static 메소드이며, 선언부는 다음과 같다.
- onAssembly는 파라미터로 Single 객체를 입력받는다.
- C. Single.create()에서 아래와 같이 new SingleCreate 클래스를 생성해서 Single 객체를 만들었다.
C-2-1-1. onAssembly 메서드
- Function f에 onSingleAssembly 메서드 참조를 연결하고,
- f가 not Null이면, apply 메서드 호출 결과를 리턴한다.
- f가 Null이면, source를 리턴한다.
- 지금 소스 상으로는 f에 해당하는 onSingleAssembly에 메서드 참조를 대입하는 로직이 없었기 때문에 f는 null이다.
- 즉, onAssembly는 파라미터로 주입받은 source를 그대로 리턴한다.
- 즉, new SingleCreate<>(source)의 결과를 리턴한다.
C-2-2. SingleCreate
- SingleCreate는 SingleOnSubscribe 메서드 참조를 파라미터로 받았다.
- SingleCreate는 source를 멤버변수(메소드 참조형)에 담기만 하고 끝.
끝..???
여전히 SingleEmitter를 생성하는 로직은 찾지 못했다.
다시 처음으로 돌아가서 샘플 코드의 1번 라인을 보자.
c. Single.create()에 의해 Single 객체가 만들어진 것은 위에서 확인했다.
Single
Single 에는 SingleSource 함수형 인터페이스가 구현되어 있다.
d. Single에 구현된 subscribe 메서드
d-1. RxJavaPlugins.onSubscribe(this, observer)
- 어디에서 본듯한 코드 구조가 눈에 띈다. 바로 위에서 분석한 c-2-1. RxJavaPlugins.onAssembly 메소드에서 본 구조와 유사하다.
- BiFunction 라인을 보면, onSingleSubscribe를 주입한 코드는 없으니, f는 null이다.
- 즉, 이 경우에는 파라미터로 받은 observer를 리턴하는 단순 로직이다.
d-2. subscribeActual(observer)
- d. Single에 구현된 subscribe 메서드 하단부를 보면, subscribeActual 메서드를 호출하고 있으며, 파라미터로 observer를 넘기고 있다.
D-2-1. subscribeActual(observer)
- subscribeActual은 아래와 같이 Single의 추상 메서드이다.
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
- 그렇다면 Single 추상클래스를 상속받은 클래스에서 subscribeActual을 오버라이드했다는 것이다.
- Single을 상속받은 클래스는 이번 분석 초반에 c.Single.create()를 확인하면 출처를 알 수 있다.
- Single 클래스의 create static 메서드에서는 아래와 같이 SingleCreate 클래스의 객체를 new로 생성하고 있다.
- 드디어 실타래가 풀리려고 한다~~~
D-3. 다시 SingleCreate
- SingleCreate 클래스를 보면, 이전에 C-2-2. SingleCreate를 살펴볼 때는 눈에 띄지 않던 오버라이드된 subscribeActual 메서드가 눈에 띈다.
- 코드 안에 드디어 Emitter 객체를 생성한 로직이 보인다.
Emitter<T> parent = new Emitter<>(observer); - 그리고, 조금 아래로 코드를 내려보면 Emitter 클래스가 이너 클래스로 SingleEmitter 인터페이스를 구현하고 있는 것을 찾을 수 있다.
- 그리고,
d-4. Emitter 클래스
- 코드가 길어 생략했지만, Emitter에는 SingleEmitter에 정의된 모든 메서드가 override 되어있다.
public interface SingleEmitter<@NonNull T> {
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable t);
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
boolean tryOnError(@NonNull Throwable t);
}
- Emitter는 Observer를 생성자로 받아서 downstream 멤버변수에 담고 있으며, override 메서드에서 downstream을 사용하고 있다.
- 이 중에서 onSuccess 메서드를 보면 observer의 onSuccess 메서드 콜백을 통해 값(value)을 통지하는 것을 알 수 있다.
d-4-1. onSuccess 메서드 콜백
D-5. 다시 SingleCreate
- Emitter가 언제 만들어졌고, SingleEmitter interface가 어디에 구현되었느지는 d-3, d-4를 통해 확인했다.
- 그럼 observer에게 통지는 누가 언제 하는 것일까?
- 다시 SingleCreate를 살펴보자.
- [박스3]에서 SingleCreate의 생성자에서 SingleOnSubScribe 함수형 인터페이스를 인자 받아 source 멤버객체에 넣어 두었다.
- [박스4]에서 observer에게 통지할 준비가 되었음을 onSubscribe(parent)로 알린 후
- [박스 5]에서 드디어 source(람다함수)를 호출했다. source.subscribe(parent)에서 parent 인자는 Emitter이다.
- 이렇게 호출된 메소드 참조는 아래 코드였고, 이렇게 해서 observer(emitter의 downstream 멤버변수에 보관됨)의 onSuccess 메서드를 통해 값(now().getDayOfWeek())가 콜백된다.
[SingleCreate 클래스의 내부 클래스로 정의된 Emitter 코드 발췌]
[RxJava Single Class Diagram]
[RxJava Single Sequence Diagram]
'프로그래밍 > OOP_Pattern_TDD' 카테고리의 다른 글
ThreadPool로 초당 API 호출 건수 제한 - ExecutorService (0) | 2023.08.02 |
---|---|
Flowable.crete 구조 파악 (0) | 2023.07.17 |
자바 DSL - 중첩된 함수(NestedFunction) + Lambda (0) | 2023.03.08 |
자바 DSL - 중첩된 함수(NestedFunction) 패턴 (0) | 2023.03.08 |
자바 DSL - 메서드 체인 패턴 (0) | 2023.03.08 |