In a previous article I used RxJava with UI components, but it can be easily used to perform some background work. RxJava provides simple mechanism to define, where (on which thread) observable should be processed and where results should be observed.

In this sample I'm going to show how different observables can be combined together to solve these two tasks. Also, I'm going to show how to simulate long running background operations.

Please note, that this sample doesn't handle orientation change, because it's a subject for separate big article and I'm still in process of developing best way handle it.

I will refer to observables working on other thread as I/O observables. But, it can be any long running operation on other thread. What's important is that such operation can throw an error, which has to be handled appropriately. It can be done using different error handling operators or in Observer.onError.

On the other hand UI observables normally do not throw errors (only in case of bugs or if you specifically implemented them in such way).

But, if we want to write declarative code with UI and I/O observables, we need I/O observables not to throw errors, because in such case our observable will terminate and won't work (and subscribers won't receive new items) until it is created again. But because it's a declarative code our observable is created only once.

So, here is what we want to achieve:

  1. Run I/O operation on background thread and observe results on main
  2. Handle errors of I/O operation appropriately
  3. Prevent observable from breaking because of I/O errors
  4. Combine together UI and I/O observables

First of all we are going to observe results of simulated long running background operation, which calculates random value.

private static Observable<Integer> observeBackgroundOperation() {
    return Observable.just(new Random().nextInt())
      .delay(3L, TimeUnit.SECONDS)//by default operates on computation Scheduler
      .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
              if (new Random().nextBoolean()) {
                  //simulate error
                  throw new RuntimeException("Error calculating value");
              }
          }
      });
}

This code fragment uses delay operator to simulate real-life long running operation and by default it works on computation scheduler, doOnNext operator is used to simulate error, which occurs randomly (it can be network related error or something else).

Next step is to observe this value on UI.

UI used to run background operation


Also, you can download APK and open sample on device by clicking this link or watch video (431 kb).


And here is the code:

private Subscription mSubscription;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_sample_mix);
    final Button calculateBtn = (Button) findViewById(R.id.calculate);
    final EditText result = (EditText) findViewById(R.id.result);
    result.setEnabled(false);
    final ProgressBar progress = (ProgressBar) findViewById(android.R.id.progress);

    //NOTE: this sample doesn't handle orientation change and other activity restore cases
    mSubscription = ViewObservable.clicks(calculateBtn)
      .doOnNext(new Action1<OnClickEvent>() {
          @Override
          public void call(OnClickEvent onClickEvent) {
              calculateBtn.setEnabled(false);
              progress.setVisibility(View.VISIBLE);
              result.setText("");
          }
      })
      .flatMap(new Func1<OnClickEvent, Observable<Integer>>() {
          @Override
          public Observable<Integer> call(OnClickEvent onClickEvent) {
              return observeBackgroundOperation()
                .observeOn(AndroidSchedulers.mainThread())//interaction with UI must be performed on main thread
                .doOnError(new Action1<Throwable>() {//handle error before it will be suppressed
                    @Override
                    public void call(Throwable throwable) {
                        progress.setVisibility(View.GONE);
                        calculateBtn.setEnabled(true);
                        Toast.makeText(IOCombineSampleActivity.this, R.string.mix_error_message, Toast.LENGTH_SHORT).show();
                    }
                })
                .onErrorResumeNext(Observable.<Integer>empty());//prevent observable from breaking
          }
      })
      .subscribe(new Action1<Integer>() {
          //not handling errors, because they are handled for API calls, and normally no other errors should appear
          @Override
          public void call(Integer integer) {
              progress.setVisibility(View.GONE);
              calculateBtn.setEnabled(true);
              result.setText(integer.toString());
          }
      });
}

Here we have observable with button click events, which is transformed into observable performing background operation. observeOn is used to observe results on main thread (to perform UI operations).

doOnError is used to handle occuring errors, onErrorResumeNext is used to suppress these errors. Note that, errors are not passed to observer, but handled earlier. In case of error we emit empty sequence, but because we use flatMap (each click is transformed into another observable, which items are emitted to main observable) our main observable is not completed and waits for next click event.

To stop observing results after activity was stopped we need to unsubscribe from our observable. Unsubscribing is also needed to prevent leaking reference to activity, because operation is long running and performed on other thread.

@Override
protected void onStop() {
    mSubscription.unsubscribe();//prevent leaking of activity
    super.onStop();
}

If you have multiple subscriptions you can use CompositeSubscription, add subscriptions there and unsubscribe to all of them in one place.

Operators used for handling errors can be refactored into generic Observable.Transformer which can be used where needed.

//Transformer to be used in flatMap
public static class IOErrorTransformer<T> implements Observable.Transformer<T, T> {
    private final Action1<Throwable> mErrorHandler;

    public IOErrorTransformer(Action1<Throwable> errorHandler) {
        mErrorHandler = errorHandler;
    }

    @Override
    public Observable<T> call(Observable<T> observable) {
        return observable
          .observeOn(AndroidSchedulers.mainThread())//interaction with UI must be performed on main thread
          .doOnError(mErrorHandler)
          .onErrorResumeNext(Observable.<T>empty());//prevent observable from breaking
    }
}

Full version of this code can be found here.
Download APK to try out the sample, click this link on device to open it.


Comments

comments powered by Disqus