관리 메뉴

막내의 막무가내 프로그래밍 & 일상

[RxJava] 결합 연산자 정리 본문

안드로이드/RxJava

[RxJava] 결합 연산자 정리

막무가내막내 2020. 12. 22. 20:38
728x90

 

[2021-04-16 업데이트]

[2020.12.22 블로그 포스팅 스터디 네 번째 글]

 

[결합 연산자] : 결합 연산자는 다수의 Observable을 하나로 합치는 방법을 제공합니다. flatMap() , groupBy() 함수 등은 1개의 Observable을 확장해주는 반면 결합 연산자들은 여러 개의 Observable을 내가 원하는 Observable로 결합해줍니다.

 

 

1. zip()

zip() 함수는 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합하는데 있습니다. 예를 들어 A, B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있습니다. 그 전까지는 발행을 기다립니다.

위 다이어그램을 보면 두 개의 Observable이 짝을 이룰 때까지 발행이 늦어지고 각각 짝을 이루었을때 둘의 데이터를 결합해서 하나의 Observable로 방출함을 볼 수 있습니다.

 

참고로 zip() 함수는 최대 9개의 Observable을 결합할 수 있습니다.

 

간단한 예시를 살펴보면 다음과 같습니다. 3개의 Observable을 zip()함수를 사용해 합쳐주는 코드입니다.

Observable<Integer> source = Observable.zip(
			Observable.just(100, 200, 300),
			Observable.just(10, 20, 30),
			Observable.just(1, 2, 3),
			(a, b, c) -> a + b + c );
		source.subscribe(Log::i);

//출력
//111, 222, 333

 

 

 


 

 

2. combineLatest()

combineLatest() 함수는 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수입니다. 예를 들어 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 하면 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해줍니다.

 

다이어그램을 살펴보면 첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않습니다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나오고 그 이후부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값만 보여줍니다. 이것이 zip()하고의 큰 차이점입니다.

 

 

String[] data1 = {PUPPLE, ORANGE, SKY, YELLOW}; //6, 7, 4, 2
		String[] data2 = {DIAMOND, STAR, PENTAGON}; // <>, -S, -P
		
		Observable<String> source = Observable.combineLatest(
				Observable.fromArray(data1)  
						  .zipWith( //zipWith()로 깔끔하게 코드 정리
						      Observable.interval(100L, TimeUnit.MILLISECONDS), 
							  (shape, notUsed) -> Shape.getColor(shape)),	  
				Observable.fromArray(data2)
				          .zipWith(
				        	  Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),	  
				        	  (shape, notUsed) -> Shape.getSuffix(shape)),
				(v1, v2) -> v1 + v2);
		
		source.subscribe(Log::i);
		CommonUtils.sleep(1000);
		CommonUtils.exampleComplete();
        
 //출력
 6<>
 7<>
 4<>
 4-S
 2-S
 2-P

먼저 두 개의 Observable 데이터 개수가 다름을 볼 수 있습니다. 첫 번째 Observable 에서는 색상(6,7,4,2)를 얻어오고 두 번째 Observable 에서는 도형 모양을 얻어옵니다.(<>, -S, -P)  

첫 번쨰 Observable은 100ms 간격으로 값을 발행하고 두 번째 Observable은 최초에 150ms를 쉬고 200ms 간격으로 값을 발행합니다. zip() 함수와는 다르게 두 개의 값이 한 개씩 발행된 이후부터는 어느 1개의 값만 변경되어도 결과가 발행됩니다.  다이어그램으로 나타내면 다음과 같습니다.

 

 


 

 

3. merge()

merge() 함수는 zip() 함수나 combineLatest() 함수와 비교하면 가장 단순한 결합 함수입니다. 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행합니다. 마블 다이어그램을 보면 이해가 쉽습니다. ㅎㅎ

 

그냥 두 개의 Observable에서 발행되는 순서대로 발행됩니다.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
        
  //출처: https://blog.mindorks.com/rxjava-operator-concat-vs-merge

위와 같은 코드라면 A1, B1, A2, A3, B2, B3, A4 or A1, A2, B1, B2, A3, A4, B3 이런식으로 방출이 될 것입니다. 그리고 한가지 더 내용을 추가하자면 listFirst 와 listSecond 모두 개별 스레드에서 데이터가 발행이 됩니다. 

 

한마디로 merge()는 단순하게 두 개 이상의 Observable이 데이터가 오는대로 하나의 Observable을 통해 데이터를 방출해줍니다. 그리고 이것이 바로 뒤에서 살펴볼 concat()과의 차이점인데 이를 밑 사이트에서 설명을 잘 해놨습니다. concat()은 Observable들의 순서를 보장해줍니다. !

 

blog.mindorks.com/rxjava-operator-concat-vs-merge 

 

RxJava Operator - Concat Vs Merge

RxJava Operator - Concat Vs Merge. RxJava has so many operators. In order to use them correctly, we must know about them. Here, we will discuss the Concat and the Merge Operators.

blog.mindorks.com

 

 


 

 

 

3. concat()

concat()은 2개 이상의 Observable과 이어 붙여주는 함수입니다. 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독합니다. 

그리고 첫 번째 Observable에 onComplete 이벤트가 발생하지 않으면 두 번째 Observable은 영원히 대기하기 때문에 잠재적인 메모리 누수의 위험이 있습니다. 

 

다음은 비교적 빠른 로컬DB(Local)에서 데이터를 가져온 후 서버DB(Remote)에서 데이터를 가져오는 예시입니다. concat(Local, Remote) 

class CommunityRepositoryImpl(
    private val remote: CommunityRemoteDataSource,
    private val local: CommunityLocalDataSource
) : CommunityRepository {
    override fun requestMyReviews(): Single<List<ReservationProduct>> = remote.requestMyReviews()

    override fun requestReviews(cityCode: String): Flowable<List<UserReview>> {
        return local.getUserReviews()
            .observeOn(Schedulers.io())
            .onErrorReturn { listOf() }
            .flatMapPublisher { cachedItems ->
                if (cachedItems.isEmpty()) {
                    requestRemoteReviews(cityCode).toFlowable().onErrorReturn { listOf() }
                } else {
                    val local = Single.just(cachedItems)
                    val remote = requestRemoteReviews(cityCode).onErrorResumeNext { local }
                    Single.concat(local, remote)
                }
            }
    }

    override fun requestRemoteReviews(cityCode: String): Single<List<UserReview>> {
        return remote.requestReviews(cityCode).observeOn(Schedulers.io())
            .flatMap {
                local.insertUserReviews(it)
                    .andThen(Single.just(it))
            }
    }

    override fun updateReviewRecommend(userReview: UserReview): Completable =
        remote.updateReviewRecommend(userReview)


}

 

 

다음 포스팅에서도 concat을 사용한 여러 소스에서 데이터를 불러오는 방법에 대해 포스팅되어 있습니다.

medium.com/@jungil.han/rxjava%EB%A1%9C-%EC%97%AC%EB%9F%AC-%EC%86%8C%EC%8A%A4%EC%97%90%EC%84%9C-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%A1%9C%EB%94%A9%ED%95%98%EA%B8%B0-e19ed933e697

 

RxJava로 여러 소스에서 데이터 로딩하기

이 글은 Loading data from multiple sources with RxJava의 번역입니다.

medium.com

 

이상 결합연산자에 대해 살펴보았습니다.

기본 코드와 내용만 보면 이해가 가지만 실제 사용하려고하면 막막한 RxJava.. 입니다.


참고 :

github.com/yudong80/reactivejava/tree/master/src/main/java/com/yudong80/reactivejava/chapter04/combine

 

yudong80/reactivejava

Contribute to yudong80/reactivejava development by creating an account on GitHub.

github.com

reactivex.io/documentation/operators.html

 

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

 

 

 

댓글과 공감은 큰 힘이 됩니다. 감사합니다. !!

728x90
Comments