일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 막내의막무가내 프로그래밍
- 주택가 잠실새내
- 2022년 6월 일상
- 주엽역 생활맥주
- 부스트코스에이스
- 막내의막무가내 안드로이드 코틀린
- 막내의막무가내 SQL
- 안드로이드 sunflower
- Fragment
- 막내의막무가내 rxjava
- 막내의막무가내 안드로이드
- 막내의막무가내 목표 및 회고
- 프로그래머스 알고리즘
- 안드로이드 Sunflower 스터디
- 막내의막무가내 알고리즘
- 프래그먼트
- 막내의막무가내 코틀린
- 막내의 막무가내
- 막내의막무가내 코볼 COBOL
- 막내의막무가내 일상
- 부스트코스
- 막내의막무가내 안드로이드 에러 해결
- flutter network call
- 막내의 막무가내 알고리즘
- 막내의막무가내 플러터
- 막내의막무가내 코틀린 안드로이드
- 안드로이드
- 막내의막무가내 플러터 flutter
- 막내의막무가내
- 막무가내
- Today
- Total
막내의 막무가내 프로그래밍 & 일상
[RxJava] 결합 연산자 정리 본문
[2021-04-16 업데이트]
[2020.12.22 블로그 포스팅 스터디 네 번째 글]
[결합 연산자] : 결합 연산자는 다수의 Observable을 하나로 합치는 방법을 제공합니다. flatMap() , groupBy() 함수 등은 1개의 Observable을 확장해주는 반면 결합 연산자들은 여러 개의 Observable을 내가 원하는 Observable로 결합해줍니다.
1. zip()
zip() 함수는 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합하는데 있습니다. 예를 들어 A, B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있습니다. 그 전까지는 발행을 기다립니다.
위 다이어그램을 보면 두 개의 Observable이 짝을 이룰 때까지 발행이 늦어지고 각각 짝을 이루었을때 둘의 데이터를 결합해서 하나의 Observable로 방출함을 볼 수 있습니다.
참고로 zip() 함수는 최대 9개의 Observable을 결합할 수 있습니다.
간단한 예시를 살펴보면 다음과 같습니다. 3개의 Observable을 zip()함수를 사용해 합쳐주는 코드입니다.
Observable<Integer> source = Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
Observable.just(1, 2, 3),
(a, b, c) -> a + b + c );
source.subscribe(Log::i);
//출력
//111, 222, 333
2. combineLatest()
combineLatest() 함수는 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수입니다. 예를 들어 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 하면 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해줍니다.
다이어그램을 살펴보면 첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않습니다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나오고 그 이후부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값만 보여줍니다. 이것이 zip()하고의 큰 차이점입니다.
String[] data1 = {PUPPLE, ORANGE, SKY, YELLOW}; //6, 7, 4, 2
String[] data2 = {DIAMOND, STAR, PENTAGON}; // <>, -S, -P
Observable<String> source = Observable.combineLatest(
Observable.fromArray(data1)
.zipWith( //zipWith()로 깔끔하게 코드 정리
Observable.interval(100L, TimeUnit.MILLISECONDS),
(shape, notUsed) -> Shape.getColor(shape)),
Observable.fromArray(data2)
.zipWith(
Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
(shape, notUsed) -> Shape.getSuffix(shape)),
(v1, v2) -> v1 + v2);
source.subscribe(Log::i);
CommonUtils.sleep(1000);
CommonUtils.exampleComplete();
//출력
6<>
7<>
4<>
4-S
2-S
2-P
먼저 두 개의 Observable 데이터 개수가 다름을 볼 수 있습니다. 첫 번째 Observable 에서는 색상(6,7,4,2)를 얻어오고 두 번째 Observable 에서는 도형 모양을 얻어옵니다.(<>, -S, -P)
첫 번쨰 Observable은 100ms 간격으로 값을 발행하고 두 번째 Observable은 최초에 150ms를 쉬고 200ms 간격으로 값을 발행합니다. zip() 함수와는 다르게 두 개의 값이 한 개씩 발행된 이후부터는 어느 1개의 값만 변경되어도 결과가 발행됩니다. 다이어그램으로 나타내면 다음과 같습니다.
3. merge()
merge() 함수는 zip() 함수나 combineLatest() 함수와 비교하면 가장 단순한 결합 함수입니다. 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행합니다. 마블 다이어그램을 보면 이해가 쉽습니다. ㅎㅎ
final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};
final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);
Observable.merge(observableFirst, observableSecond)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
//출처: https://blog.mindorks.com/rxjava-operator-concat-vs-merge
위와 같은 코드라면 A1, B1, A2, A3, B2, B3, A4 or A1, A2, B1, B2, A3, A4, B3 이런식으로 방출이 될 것입니다. 그리고 한가지 더 내용을 추가하자면 listFirst 와 listSecond 모두 개별 스레드에서 데이터가 발행이 됩니다.
한마디로 merge()는 단순하게 두 개 이상의 Observable이 데이터가 오는대로 하나의 Observable을 통해 데이터를 방출해줍니다. 그리고 이것이 바로 뒤에서 살펴볼 concat()과의 차이점인데 이를 밑 사이트에서 설명을 잘 해놨습니다. concat()은 Observable들의 순서를 보장해줍니다. !
blog.mindorks.com/rxjava-operator-concat-vs-merge
3. concat()
concat()은 2개 이상의 Observable과 이어 붙여주는 함수입니다. 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독합니다.
그리고 첫 번째 Observable에 onComplete 이벤트가 발생하지 않으면 두 번째 Observable은 영원히 대기하기 때문에 잠재적인 메모리 누수의 위험이 있습니다.
다음은 비교적 빠른 로컬DB(Local)에서 데이터를 가져온 후 서버DB(Remote)에서 데이터를 가져오는 예시입니다. concat(Local, Remote)
class CommunityRepositoryImpl(
private val remote: CommunityRemoteDataSource,
private val local: CommunityLocalDataSource
) : CommunityRepository {
override fun requestMyReviews(): Single<List<ReservationProduct>> = remote.requestMyReviews()
override fun requestReviews(cityCode: String): Flowable<List<UserReview>> {
return local.getUserReviews()
.observeOn(Schedulers.io())
.onErrorReturn { listOf() }
.flatMapPublisher { cachedItems ->
if (cachedItems.isEmpty()) {
requestRemoteReviews(cityCode).toFlowable().onErrorReturn { listOf() }
} else {
val local = Single.just(cachedItems)
val remote = requestRemoteReviews(cityCode).onErrorResumeNext { local }
Single.concat(local, remote)
}
}
}
override fun requestRemoteReviews(cityCode: String): Single<List<UserReview>> {
return remote.requestReviews(cityCode).observeOn(Schedulers.io())
.flatMap {
local.insertUserReviews(it)
.andThen(Single.just(it))
}
}
override fun updateReviewRecommend(userReview: UserReview): Completable =
remote.updateReviewRecommend(userReview)
}
다음 포스팅에서도 concat을 사용한 여러 소스에서 데이터를 불러오는 방법에 대해 포스팅되어 있습니다.
이상 결합연산자에 대해 살펴보았습니다.
기본 코드와 내용만 보면 이해가 가지만 실제 사용하려고하면 막막한 RxJava.. 입니다.
참고 :
reactivex.io/documentation/operators.html
댓글과 공감은 큰 힘이 됩니다. 감사합니다. !!
'안드로이드 > RxJava' 카테고리의 다른 글
[안드로이드] BaseViewModel 기본적인 Rx 서버 통신 작업을 위한 공통 함수 기록 (4) | 2021.06.03 |
---|---|
[RxJava] 스케줄러 정리 (0) | 2020.12.30 |
[RxJava] 변환 연산자 정리 (0) | 2020.12.12 |
[RxJava] 생성 연산자 정리 (4) | 2020.12.01 |
[RxJava] Subject 클래스 간략 정리 (feat. Hot, Cold Observable차이) (4) | 2020.10.03 |