코딩하는 일용직 노동자

ReactiveX ? RxAndroid ? 본문

안드로이드

ReactiveX ? RxAndroid ?

bacass 2020. 11. 14. 15:17

# ReactiveX란?
프로그래밍에는 다양한 패러다임이 있습니다.
오래전 순차처리부터 OOP(객체지향), Function(함수형) 등..

기존의 프로그래밍은 클라이언트가 서버에게 정보를 요청하면 서버는 요청받은 작업을 처리한 후 돌려줄때까지
클라이언트는 계속해서 결과를 기다리는 동기식 처리가 많았습니다.
하지만 프로그래밍의 규모가 커지고 복잡해지면서 동시에 다양한 작업을 하면서 비동기식 처리의 수요가 많아지게 되었습니다.
ReactiveX란 비동기식 처리를 가능하게 하는 프로그래밍 이라고 생각하면 될듯합니다.
상태를 관찰하고 그에 반응하는 처리를 한다고 생각하면 이해가 쉬울것입니다.
다양한 언어에서 ReactiveX를 이용할수 있습니다. RxJava, RxKotlin, RxSwift, RxScala..
자세한 스펙은 이곳에서 확인하실 수 있습니다.



우선 ReactiveX를 구성하는 요소들을 알아봅시다.

1. Observable(옵저버블)
- 데이터의 흐름입니다.
- onNext(), onComplte(), onError() 로 구성됩니다.
- onNext() : 새로운 데이터를 전달
- onComplte() : 데이터 흐름의 종료
- onError() : 에러 신호를 전달

2. Subscribe(구독)
- 옵저버블을 등록하고 전달된 데이터를 처리합니다.

3. Operator(연산자)
- 전달되는 데이터를 중간에서 필터링하거나 가공합니다.

4. Scheduler
- Subscribe가 어느 쓰레드에서 실행될지 결정할 수 있습니다.
- Main : 메인 쓰레드에서 동작
- IO : 네트워크나 입출력 쓰레드에서 동작



# 준비작업
RxAndroid를 사용하기 위해 gradle에 의존성 추가를 해줍니다.

// RxJava
implementation 'io.reactivex.rxjava3:rxjava:3.0.7'
// RxKotlin
implementation 'io.reactivex.rxjava3:rxkotlin:3.0.1'
// RxAndroid
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

// Android 에서 사용하는 뷰들과 연결해주는 작업을 위해 RxBinding 추가.
implementation 'com.jakewharton.rxbinding3:rxbinding:3.1.0'



# RxAndroid 
ReactiveX를 아주 쉽게 설명하자면
Observable에서 보낸 이벤트를 Operator에서 가공을 하고 Subscribe에서 받는 것입니다.
이것은 비동기적으로 처리되는 데이터의 흐름입니다.

우선 RxAndroid의 간단한 예제를 이용해보겠습니다.
observable 에서 보낸 "Hello World"를 onNext() 에서 받는것입니다.

var observer = object : DisposableObserver<String>() {
    override fun onComplete() {
        Log.d("Rx", "onComplete")
    }

    override fun onNext(t: String) {
        Log.d("Rx", "onNext : $t")
    }

    override fun onError(e: Throwable) {
        Log.d("Rx", "onError")
    }
}

val observable = Observable.create<String> {
    it.onNext("Hello World")
    it.onComplete()
}
observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer)



# Operator(연산자)
ReactiveX에는 다양한 연산자를 제공해줍니다.
이것을 이용해 데이터를 가공할수 있습니다.

- 데이터의 흐름을 생성하는 함수 : create(), just(), range() 등
- 데이터를 변환을 하는 함수 : map()
- 원하는 데이터만 구하는 함수 : filter(), first(), last(), take() 등
- 쓰레드 관리를 제어하는 함수 : subscribeOn(), observeOn() 

아래는 Operator를 이용한 예입니다.

var observer = object : DisposableObserver<Int>() {
        override fun onComplete() {
            Log.d("Rx", "onComplete")
        }

        override fun onNext(t: Int) {
            Log.d("Rx", "onNext : $t")
        }

        override fun onError(e: Throwable) {
            Log.d("Rx", "onError")
        }
    }

var observable = Observable.create<Int> {
    it.onNext(1)
    it.onNext(2)
    it.onNext(3)
    it.onNext(4)
    it.onNext(5)
    it.onNext(6)
    it.onComplete()
}.filter {
    it % 2 == 0
}
observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer)

it.onNext() 를 여러번 사용하지 않고 just() 를 이용할 수 있습니다.

var observer = object : DisposableObserver<Int>() {
    override fun onComplete() {
        Log.d("Rx", "onComplete")
    }

    override fun onNext(t: Int) {
        Log.d("Rx", "onNext : $t")
    }

    override fun onError(e: Throwable) {
        Log.d("Rx", "onError")
    }
}

Observable
        .just(1, 2, 3, 4, 5, 6)
        .filter {
            it % 2 == 0
        }.subscribe(observer)

 

map() 을 이용해 전달받은 값을 처리해줄 수 있습니다.

var observer = object : DisposableObserver<Int>() {
    override fun onComplete() {
        Log.d("Rx", "onComplete")
    }

    override fun onNext(t: Int) {
        Log.d("Rx", "onNext : $t")
    }

    override fun onError(e: Throwable) {
        Log.d("Rx", "onError")
    }
}

var observable = Observable.create<Int> {
    it.onNext(1)
    it.onNext(2)
    it.onNext(3)
    it.onNext(4)
    it.onNext(5)
    it.onNext(6)
    it.onComplete()
}.map {
    it * 2
}
observable.subscribe(observer)

 

# Scheduler
비동기식으로 처리되다 보니 쓰레드의 관리가 중요한데 이것을 어느 쓰레드에서 실행될지 설정해주는것이 Scheduler입니다.

스케쥴러는 subscribeOn(), observeOn()에서 각각 지정할 수 있는데 
subscribeOn()은 Observable의 작업을 시작하는 쓰레드를 지정 할 수 있습니다.
observeOn()은 Operator와 Subscribe에 작업되는 쓰레드를 지정 할 수 있습니다.
안드로이드에서 주로 사용되는 Scheduler는 
Schedulers.io(), AndroidSchedulers.mainThread()가 있습니다.

예제를 확인해보겠습니다.
subscribeOn()에는 Schedulers.io()를
observeOn()에는 AndroidSchedulers.mainThread()를 지정했습니다.
결과를 살펴보면 신호를 보낼때 onDoNext()가 작동된 쓰레드와
결과를 처리후 신호를 받을때 onNext()가 작동된 쓰레드의 ID가 다릅니다.

var observable = Observable.create<Int> {
    it.onNext(1)
    it.onNext(2)
    it.onNext(3)
    it.onNext(4)
    it.onNext(5)
    it.onNext(6)
    it.onComplete()
}.doOnNext {
    Log.d("Rx", "doOnNext : $it")
}.filter {
    it % 2 == 0
}
observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer)

쓰레드 ID가 다르다.


# 마치며
RxAndroid를 이용하면 다양한 비동기식 처리를 가능하게 해주고 중간에 데이터를 가공하기 쉽습니다.
다만 학습에 난이도가 있고 이로인해 코드가 더 복잡해 보일수있습니다.