読者です 読者をやめる 読者になる 読者になる

うさがにっき

読書感想文とプログラムのこと書いてきます

Retrolamda, RxAndroidを使ったJava8っぽいAndroidコーディングはへっぽこアンドロイダーにも恩恵をもたらすのか?(RxJava詳細編)

RxJava Android Retrolamda

概要

Retrolamda, RxAndroidを使ったJava8っぽいAndroidコーディングはへっぽこアンドロイダーにも恩恵をもたらすのか?(RxJava編) - うさがにっき
の続き

RxJavaはできることがかなりたくさんある
使っていく上で効率的に、かつ、間違いがないようRxJavaについてもう少し深く掘り下げる

RxJava学習のベストプラクティスっぽいもの // Speaker Deck
をベースに考える、感謝
逆に上記のスライド見てわかる人には意味のない記事なのでスルーするが吉

詳細

そもそもrxとは何を指すのか?

Rx (Reactive Extensions) - Home
より

>The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

Reactive Extensions(Rx)はobservableシーケンスそしてLINQスタイルクエリオペレータを使った非同期、イベントベースのライブラリのこと
Rxを使うと開発者はObservablesを非同期データストリームで表すことができ、LINQオペレータを使って非同期データストリームにクエリを指定でき、非同期データストリームの平行処理をSchedulersでパラメータ化できる
簡単に言うと、Rx = Observable + LINQ + Schedulers

???

一つ一つ整理

Observableを非同期データストリームで表す

RxJavaではObservable(監視対象)をデータストリームに加工して監視する
それがintであろうとStringであろうとオブジェクトであろうとデータストリームにする

そのデータストリームを非同期に監視している

LINQスタイル

Observableの値に対して、フィルタリングしたり、値を加工したりする記述方法
先日のブログでいうoperatorのことmapとかfilterとか

Schedulers

動作するスレッドを選択できるということ
先日のブログでいうとsubscribeOn, observeOnのこと

つまり
RxはObservableをデータストリームとして、非同期的に監視している
また、Observable(データストリーム)に対してLINQスタイルを使って値を加工できる
さらに、Observableの処理の実行スレッドを選択できる機能
ということ

ではどのようなものがObserbableとなりえて、どのようなイベントとなるのか
なんでもなるのだが、例えば

  • Event:ユーザが操作した時にイベント発火
  • Async:非同期処理が終了した時にイベント発火
  • Array:各要素の間隔(時間)が限りなく0に近いイベントとして扱われる

では実際にJavaに落とし込んで見る

Arrayの場合

// ストリーム化
Observable<Integer> counter = Observable.from(new Integer[]{1, 2, 3, 4});
// 観測する
counter.subscribe(new Action1<Integer>(){
	@override
	public void call(Integer integer) {
		// dosomething
	}

});

callメソッドの引数integerにストリーム化したnew Integer[]{1, 2, 3, 4}が順番に(間隔ほぼ0で)入って来る

Eventの場合

// ストリーム化
Observable<TouchEvent> touches = Observable.from(view.touches());
// 観測し、MOVEイベントのみを取得する
Observable<TouchEvent> touchesEventMove =
	touches.filter(new Func1<TouchEvent, Boolean>() {
		@override
		public void call(TouchEvent touchEvent) {
			return touchEvent.type == MOVE;
		}		
	})

ではRxJavaは何に、どのように使ったらいいのか?

コレクション操作に使う

ObservableはリッチなIteratorとして扱える

例えばAdapter内のリスト一覧のObservableを取得したい時には・・・

public class XXXAdapter extends ArrayAdapter<T> {
	public Observable<T> items() {
		return Observable
			.range(0, getCount())
			.map(i -> getItem())
			.window(getCount())
			.toBlocking()
			.single();
	}
}

window, toBlocking, singleはObservableから別のObservable生やして、それをブロッキングして(BlockingObservable)、その中からとりだしている感じ
これをしないとObservable内でadapterをremoveすると落ちるのでwindowで一回別のObservableを生やしている

http://reactivex.io/RxJava/javadoc/rx/Observable.html#window(int)
http://reactivex.io/RxJava/javadoc/rx/Observable.html#toBlocking()
BlockingObservable (RxJava Javadoc 1.0.11)
http://reactivex.io/RxJava/javadoc/rx/observables/BlockingObservable.html#single()

これがあるとこんな感じでadapterを持っている場所から操作が手軽にできる
チェックしたものだけをadapterから削除

adapter.items()
	.filter(Todo::isChecked)
	.subscribe(adapter::remove)

他にもいろいろリッチなIterator的なものが作れる

Observable<Recipe> history = Observable.from(historyList);

Observable<String> ids = history
	// Recipeからidを取得
	.map(new Func1<Recipe, Long>() {
		@override
		public Long call(Recipe recipe) {
			return recipe.getId();
		}
	})
	// 最大10件
	.limit(10)
	// ,(カンマ)で結合
	.reduce("" , new Func2<String, Long, String>() {
		@override
		public Long call(String s, Long id) {
			return (s == null ? "" , ",") + id.toString();
		}	
	});

// カンマで結合したidを取り出す
String ids = ids.toBlocking().single();

ここらへんはCollection操作だけなので低リスクに導入できる
最初に導入を試すならCollection操作からが良さげ

非同期の通信処理を行う(Promise)

Promiseは1つの値をもつObservable

public Observable<Recipe> recipe(final long recipeId) {
	return Observable.create(new Observable.OnSubscribe<Recipe>() {
		@override
		public void call(Subscriber<? super Recipe> subscriber) {
			try {
				Recipe recipe = loadRecipe(recipeId);
				subscriber.onNext(recipeId);
				subscriber.onComplete();
			}catch(Exception e) {
				subscriber.onError(e);
			}
		}
	});
}

Subscriber (RxJava Javadoc 1.0.11)

schedulerをセット

Observable<Recipe> recipeObservable = recipe(id);

recipeObservable
	// Observableの実行スレッド
	.subScribeOn(Scheduler.io())
	// Observerの実行スレッド
	.observeOn(AndroidSchedulers.mainThread());

非同期処理を実行

recipeObservable.subscribe(
	// success
	(Recipe recipe) -> {
		showRecipe(recipe);
	},
	// exception
	(Throwable throwable) -> {
		showError(throwable);
	},
	// complete
	() -> {
		hideProgress();
	}
);

これでRecipeIdからRecipeを取得するloadRecipeは別スレッドで処理されて、取得した後のshowRecipeはメインスレッドで処理される
エラーハンドリングが難しいAsynkTaskに比べて、より良い設計が出来そう
ReactiveExtensions - 【翻訳】AsyncTask と AsyncTaskLoader を rx.Observable に置き換える - RxJava Android Patterns - Qiita

次回はこれをふまえて、RxAndroidに話を展開をしていく