관리 메뉴

막내의 막무가내 프로그래밍 & 일상

[RxJava] Subject 클래스 간략 정리 (feat. Hot, Cold Observable차이) 본문

안드로이드/RxJava

[RxJava] Subject 클래스 간략 정리 (feat. Hot, Cold Observable차이)

막무가내막내 2020. 10. 3. 15:20
728x90

 

[2021-04-16 업데이트]

 

 안녕하세요ㅎㅎ

RxJava 책을 복습과 함께 틈틈히 정리하면 더 기억에 남을 것 같아 포스팅을 하게되었습니다.

 

RxJava의 Observable 에는  Cold, Hot 두 종류의 Obsevable이 있습니다.

둘의 큰 차이점은 Cold는 구독하기 전까지 데이터를 방출하지 않는 Lazy한 접근법이고

Hot은 구독자의 존재 여부 관계없이 데이터블 발행하는 Observable입니다. (그래서 데이터를 발행하는 속도와 구독자가 처리하는 속도가 크면 원치않는 문제가 생기기 때문에 배압에 주의해야합니다.)

 

Cold 를 사용하는 예시는 웹 요청, 데이터베이스 쿼리, 파일 읽기가 있습니다.

Hot 을 사용하는 예시는 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터 등이 있습니다. 

 

 

그 중 Hot Observable 의 대표적인 클래스 중 하나인 Subject 클래스가 있습니다.

Subject 클래스는 Cold Obseravable을 Hot Observable로 변환해주고 Observable 속성과 구독자의 속성이 모두 있습니다. 그래서 Observable처럼 데이터를 발행하거나 구독자처럼 발행된 데이터를 바로 처리할 수 있습니다.

Subject 클래스 종류에 대해 간략하게 정리해보는 포스팅을 갖고자합니다.

 

 

1. AsyncSubjct

이 클래스는 Observable에서 발행한 마지막 데이터를 얻어올 수  있습니다. 완료되기 전 마지막 데이터만 남고 이전 데이터들은 무시됩니다.

정상적으로 완료되었을때의 마블다이어그램
만약 완료되지못하고 에러가 나면 에러가 통지된다.

 

 AsyncSubject<Object> subject = AsyncSubject.create();
 
 TestObserver<Object> to1 = subject.test();

 to1.assertEmpty();

 subject.onNext(1);

 // AsyncSubject only emits when onComplete was called.
 to1.assertEmpty();

 subject.onNext(2);
 subject.onComplete();

 // onComplete triggers the emission of the last cached item and the onComplete event.
 to1.assertResult(2);

 TestObserver<Object> to2 = subject.test();

 // late Observers receive the last cached item too
 to2.assertResult(2)

 

 

2. BehaviorSubject 

구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨줍니다. 그래서 기본값을 처음에 설정해 줄 수 있습니다. 

-> ex) BehaviorSubject.creatDefault("0"),  BehaviorSubject.create("0"),

 

가장 최근값을 보내줌을 볼 수 있습니다.
만약 오류와 함께 종료되면 BehaviorSubject는 후속 관찰자에게 아이템을 방출하지 않고 오류 알림을 전달하기 만합니다.

 

  // observer will receive all 4 events (including "default").
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.subscribe(observer);
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive the "one", "two" and "three" events, but not "default" and "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive only onCompleted
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.onCompleted();
  subject.subscribe(observer);

  // observer will receive only onError
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.onError(new RuntimeException("error"));
  subject.subscribe(observer);
import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

// 결과
abcd
cd

 

 

 

3.  PublishSubject

구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작합니다. 이전 1, 2번 Subject들과 다르게 옵션같은것들이 없고 가장 평범하게 동작합니다. 구독한 순간(이후)부터의 데이터를 받을 수 있습니다.

 

 

subscribe() 한 이후의 데이터를 매우 평범하게 받는 것을 볼 수 있습니다.
마찬가지로 구독한 이후의 데이터를 받다가 에러가 나면 에러알림도 받습니다.

 

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onComplete events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onComplete
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onComplete();

  // late Observers only receive the terminal event
  subject.test().assertEmpty();

 

추가 예시로 안드로이드의 EditText의 자동검색에 유용하게 사용할 수 있을 것입니다.

private val searchSubject = PublishSubject.create<String>()

binding.etSearch.doOnTextChanged { text, _, _, _ ->
            searchSubject.onNext(text.toString())
        }
        compositeDisposable.add(
            searchSubject
                .observeOn(AndroidSchedulers.mainThread())
                .debounce(1000, TimeUnit.MILLISECONDS)
                .filter { it.isNotBlank() }
                .map { it.trim() }
                .subscribe { roomName ->
                    Timber.d("검색 -> %s", roomName)
                    viewModel.requestSearchRooms(roomName)
                }
        )

 

 

 

4. ReplaySubject 

구독자가 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해줍니다. (이름 그대로 Replay 해준다고 생각하면 이해하기 쉽습니다.)

그러나 모든 데이터를 저장하는 만큼 메모리 누수가 발생할 가능성이 있기 때문에 주의해야합니다. 

 

두번쨰 subscribe() 에서 지금까지의 데이터들을 모두 보내주는 것을 볼 수 있습니다.

  ReplaySubject<Object> subject = ReplaySubject.create();
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");
  subject.onComplete();

  // both of the following will get the onNext/onComplete calls from above
  subject.subscribe(observer1);
  subject.subscribe(observer2);

 

 

 

 

5. ConnectableObservable

이 클래스에 대해서는 설명이 좀 기므로 참고 링크를 남기겠습니다. ㅎㅎ 

github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators

 

ReactiveX/RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

medium.com/@satoshun/about-rxjava-connectable-observable-e3905a432f8e

 

RxJava: What’s Connectable Observable

Observers subscribes a Connectable Observable that waits until connect method is called. It begins emitting items to those Observers. publish + connect is a most simple usage of Connectable…

medium.com

chanhyeok.tistory.com/17

 

005. Connectable Observable

참조 : [https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators] Connectable Observable ConnectableObservable과 operator에 대해 아라보자. ConnectableObservable.connect( ) — Conne..

chanhyeok.tistory.com

 

 


출처 및 참고 

reactivex.io/documentation/subject.html

 

ReactiveX - Subject

If you have a Subject and you want to pass it along to some other agent without exposing its Subscriber interface, you can mask it by calling its asObservable method, which will return the Subject as a pure Observable. See Also

reactivex.io

RxJava 프로그래밍 책

 

 

 

 

댓글과 공감은 큰 힘이 됩니다. 감사합니다. !!!

728x90
Comments