티스토리 뷰

개발/RxJava

[RxJava] RxJava와 Android

맨날치킨 2018. 7. 6. 12:04
반응형

요즘 Reactive Programming 에 대한 관심이 높습니다. 서버 백엔드부터 Front 클라이언트들까지 많은 곳에서 이야기되고 있습니다.

Reactive Programming 은 위키에서는 다음과 같이 정의하고 있습니다.

"퓨팅으로써 반응형 프로그래밍은 데이터의 흐름과 변화에 대한 전달을 기반으로 하는 프로그래밍 패러다임이다."

끊임없이 요청/변경되는 데이터에 반응하기 위해나온 Reactive Programming 은 데이터를 처리함에 있어서 비동기적으로 데이터를 처리할 때 효율적으로 할 수 있도록 하기 위한 방법입니다.

다음 그림을 보면 좀 더 쉽게 이해할 수 있습니다.

 

일반적인 동기화 방식의 프로그래밍에서는 메소드 콜이 발생했을 때 데이터 처리가 끝날때까지 쓰레드를 대기시키거나 콜백을 받아서 처리하기 때문에 컴퓨팅 리소스를 충분히 사용할 수 없습니다. 반면 Messaging 기반의 Reactive Programming 에서는 필요한 경우에만 쓰레드를 생성 후 메세지 형태로 전달하기 때문에 더 효율적으로 컴퓨팅 리소스를 사용할 수 있습니다.

RxJava 란 무엇인가?
MS 에서.NET 의 Reactive Extension 의 Java 버전으로 데이터베이스의 데이터 처리 과정에서 비동기 처리를 효율적으로 하기 위해서 만든 뒤 큰 호응을 얻어 차차 다른 언어로 개발되어 지금에 이르러 RxSwift, RxJS, RxJava 등등 다양한 언어로 사용할 수 있게 된 것입니다.

RxJava의 역사와 로드맵은 다음과 같습니다.
□ 2014년 11월 - 1.0 배포
□ 2016년 10월 - 2.0 배포
□ 2017년 07월 - 1.0 기능개발 중단, 버그 수정만 함
□ 2018년 03월 - 1.0 유지보수 종료

2017년 여름부터 1.0의 추가 기능 개발은 멈췄으며, 현재는 유지보수도 종료된 상태입니다. 앞으로 새롭게 RxJava 를 배울 분들은 RxJava 2.0 을 배우시길 권장합니다.


RxJava 의 기초 이해하기
RxJava 는 데이터를 가공/변형/처리를 하는 라이브러리입니다. RxJava 에서는 Nothing, One, More, Unlimited Datas 등의 데이터 형태를 처리할 수 있습니다. 그에 따라서 Completable, Maybe, Single, Observable, Processor 로 각각 처리할 수 있습니다.

데이터의 형태에 따라 다음과 같이 분류 할 수 있습니다. 
Completable : Nothing
Maybe : One
Single : One
Observable : One, More, Unlimited
Subject : One, More, Unlimited


기존 코드를 RxJava 로 바꿔보기
일반적으로 사용하는 Observable 보도록 하겠습니다.
다음과 같은 요구사항을 전통적인 방법과 RxJava를 통해 각각 프로그래밍 해보겠습니다. 

0~10까지의 Int 형 배열이 있습니다.
List<Integer> datas = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10);
Int 형 배열을 a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.

기존 방식에서는 다음과 같이 작성 할 수 있습니다.

for (int data : datas) {
  char value = (char) data + 'a';
  print(value);
}


위의 코드를 RxJava 로 변경하면 다음과 같이 바꿀 수 있습니다.

Observable.fromIteratable(datas)
    .subscribe(data -> {
        char value = (char) data + 'a';
        print(value);
  });


위의 RxJava 코드는 기존의 코드에 비해서 장점을 찾기 어렵습니다.

RxJava 의 map 을 활용하여 좀 더 고쳐보겠습니다.

Observable.fromIteratable(datas)
    .map(data -> (char) data + 'a')
    .subscribe(value -> print(value));


map 은 최초로 들어온 데이터를 다른 형태로 변경 시킬 수 있습니다.

map도 써 봤으니 이제 기존의 요구사항에 하나를 더 추가하겠습니다.

0~10까지의 Int 형 배열이 있습니다.
Int 형배열을a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.
추가 1 : 짝수에 해당하는 숫자는 출력하지 마세요.

수정된 기존 방식의 코드입니다.

for (int data : datas) {
    if ((data  % 2) != 0) {
        char value = (char) data + 'a';
        print(value);
    }
}


RxJava 도 수정하겠습니다.

Observable.fromIteratable(datas)
    .filter(data -> (data % 2) != 0)
    .map(data -> (char) data + 'a')
    .subscribe(value -> print(value));


RxJava 에서는 filter를 사용하였습니다.
filter 는 반환이 true 이면 값을 다음 단계로 전달하고, false 이면 전달하지 않습니다.

다시 요구사항을 추가해보겠습니다.

0~10까지의 Int 형 배열이 있습니다.
Int 형 배열을 a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.
추가 1 : 짝수에 해당하는 숫자는 출력하지 마세요.
추가 2 : 각 문자는 10번씩 반복되도록 한꺼번에 출력되도록 하세요.

for 형태의 코드를 수정하면 다음과 같이 될 것입니다.

List<Character> chars = new ArrayList<>();
for (int data : datas) {
    if ((data  % 2) != 0) {
        char value = (char) data + 'a';
        for (int idx = 0; idx < 10; idx++) {
            chars.add(value);
        }
    }
}
print(chars);


RxJava 는 다음과 같습니다.
Observable.fromIteratable(datas)
    .filter(data -> (data % 2) != 0)
    .map(data -> (char) data + 'a')
    .flatMap(value -> Observable.range(0, 10).map(index -> value))
    .collect(() -> new ArrayList<Character>(), (chars, value) -> chars.add(value))
    .subscribe(values -> print(values));


flatmap 은 1개의 데이터에 대해 n 개의 데이터로 변형할 수 있는 처리 기능입니다.
collect 는 주입된 데이터를 특정 Collection 에 담을 수 있도록 하는 처리 기능입니다.
Observable.range 는(n, m) 에 대해 n 부터 n + m -1 만큼 int 형데이터를 호출해주는 데이터로 Observable.range(0, 10).map(index -> value)) 는 value 라는값을 10번 반복하도록 하였습니다.
지금까지 알아본 Observable 은 정해진 데이터를 전달하는 경우에 사용되고 있습니다.
하지만 데이터가 언제 어떻게 시작될지 모르는 경우도 있습니다.

가장 일반적인 예시로 사용자가 화면을 터치하는 것은 언제 얼마나 할 지 알 수 없습니다.
이때 사용하는 Subject에 대해 알아보겠습니다.


Subject 알아보기
앞서 Observable 에 맞춰서 작성한 예제를 조금 수정해보겠습니다.
 
PublishSubject<Integer> subject = PublishSubject.<Integer>create();
subject
    .filter(data -> (data % 2) != 0)
    .map(data -> (char) data + 'a')
    .flatMap(value -> Observable.range(0, 10).map(index -> value))
    .collect(() -> new ArrayList<Character>(), (chars, value) -> chars.add(value))
    .subscribe(values -> print(values));


위와 같이 데이터가 들어왔을 때 동작할 것을 미리 정의를 한 후 다음과 같이 데이터를 넣으면 됩니다.
 
subject.onNext(0);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
// 중략...

Observable 과 Subject 의 가장 큰 차이는 내부에서 데이터를 주입하느냐 외부에서 주입받느냐의 차이로 이해하시면 됩니다.


비동기 처리 하기
RxJava 를 사용하는 가장 큰 이유는 비동기처리에 굉장히 유연하고 쉽게 사용할 수 있다는 것에 있습니다.
RxJava 에서 주로 쓰는 비동기 쓰레드 선언은 다음과 같습니다.

□ Schedulers.io()
Network IO 나 File IO 등을 처리하기 위한 쓰레드

□ Schedulers.computation()
단순 연산로직등에 사용되며 Event Looper 에 의해 동작하는 쓰레드

□ AndroidSchedulers.mainThread()
Android 에서 UI Thread 에서 처리하기 위한 쓰레드

이 외에도 trampoline(), immediately() 등이 있으나 주로 쓰는건 위의 3개로 이해하시면 됩니다.

RxJava 에서 비동기쓰레드 선언은 데이터 주입과 데이터의 처리 시점을 위하여 있습니다.

□ 데이터 주입 - subscribeOn(쓰레드)
데이터를 주입하는 시점에 대한 쓰레드 선언이며 모든 stream 내에서 최종적으로 선언한 쓰레드가 할당됩니다.

□ 데이터처리 - observeOn(쓰레드)
쓰레드를선언한 다음부터 새로운 쓰레드가 선언되기 전까지 데이터 처리에 동작할 쓰레드를 할당합니다.

이제 비동기 처리된 코드를 보도록 하겠습니다. 

Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onNext(4);
    emitter.onComplete();
})
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .filter(data -> (data % 2) != 0)
    .map(data -> (char) data + 'a')
    .observeOn(Schedulers.computation())
    .flatMap(value -> Observable.range(0, 10).map(index -> value))
    .collect(() -> new ArrayList<Character>(),(chars, value) -> chars.add(value)).observeOn(AndroidSchedulers.mainThread())
    .subscribe(values -> print(values));



위의 코드가 수행되는 쓰레드는 각각 다음과 같습니다.

□ Observable.create() -Schedulers.computation()
□ filter() - Schedulers.io()
□ map() - Schedulers.io()
□ flatMap() - Schedulers.computation()
□ collect() - Schedulers.computation()
□ subscribe() - AndroidSchedulers.mainThread()

간단요약
RxJava 는 데이터 비동기 처리를 효율적으로 하기 위해 나온 라이브러리입니다.
데이터 단위로 처리해야 할 기능을 선언하고 비동기 선언이 쉽게 될 수 있도록 지원하고 있습니다.

Observable 은 1개이상의 무제한 데이터를 처리하는 기능을 지원합니다.
Subject 는 1개 이상의 무제한 데이터를 처리할 수 있지만 Observable 과의 차이는 외부에서 데이터를 주입할 수 있습니다.

쓰레드 선언은subscribeOnobserveOn 으로 나뉘며 각각 데이터 주입과 데이터 처리 시점을 위해 선언됩니다.
주로 사용되는 쓰레드는 Schedulers.io()Schedulers.computation() 입니다. Android 의 UI Thread 는 AndroidSchedulers.mainThread() 를 사용하시면 됩니다.

RxJava 는 다양한 처리 기능을 제공합니다. 앞에서 예로 든 filter, map, flatMap 등은 극히 일부입니다. 자세한 정보는 아래 링크에서 Rx 문서를 참고하시길 바랍니다.

http://reactivex.io/documentation/operators.html#alphabetical


반응형
댓글
공지사항
최근에 올라온 글