In this article I want to show some examples how to implement lightweight event bus using RxJava. Examples are implemented for Android, but same event bus can be used on other platforms.

Table of contents:

  1. Simple single event type bus
    1. Creation
    2. Posting events
    3. Listening for events
    4. Switching threads
  2. Generic event bus
    1. Creation
    2. Replaying events
    3. Observing multiple events at once
  3. Generic bus usage sample
  4. Related articles

Simple single event type bus

To create simple event bus we can use subjects provided by RxJava. For example PublishSubject.

Creation

PublishSubject<String> stringBus = PublishSubject.create();

Posting events

stringBus.onNext("event1");
stringBus.onNext("event2");

Listening for events

stringBus.subscribe(new Action1<String>() {
  @Override
  public void call(String event) {
    Log.d("EventBusSample", "Event happened: " + event);
  }
});

Switching threads

With RxJava we can easily send events from one thread and listen for them on another:

stringBus.observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1<String>() {
    @Override
    public void call(String event) {
      Toast.makeText(EventBusSampleActivity.this, "Simple event bus: " + event, Toast.LENGTH_SHORT).show();
    }
  });

But:

If events on this bus are posted from different threads, to preserve their order and Observable contract, SerializedSubject should be used.

Generic event bus

Creation

To be able to use bus to send multiple event types we should create generic subject and then filter events, leaving those we are interested in. It can be done using operator ofType and specifying class of event.

public static class EventBus<T> {
  private final Subject<T, T> subject;

  public EventBus() {
    this(PublishSubject.<T>create());
  }

  public EventBus(Subject<T, T> subject) {
    this.subject = subject;
  }

  public <E extends T> void post(E event) {
    subject.onNext(event);
  }

  public Observable<T> observe() {
    return subject;
  }

  public <E extends T> Observable<E> observeEvents(Class<E> eventClass) {
    return subject.ofType(eventClass);//pass only events of specified type, filter all other
  }
}

Events of type T and it's subclasses can be posted to this bus.

Replaying events

Sometimes it is needed to replay events occured, while listener wasn't listening for them. This can be easily done by changing subject from PublishSubject to BehaviorSubject, which repeats last emitted item or ReplaySubject which can repeat arbitrary number of items.

public static <T> EventBus<T> createSimple() {
  return new EventBus<>();//PublishSubject is created in constructor
}

public static <T> EventBus<T> createRepeating(int numberOfEventsToRepeat) {
  return new EventBus<>(ReplaySubject.<T>createWithSize(numberOfEventsToRepeat));
}

public static <T> EventBus<T> createWithLatest() {
  return new EventBus<>(BehaviorSubject.<T>create());
}

NOTE:

This works perfectly when you have one event class or you don't care about replaying events separately per class. In that case you will need separate subject for each class of event, this can be done, for example, by having HashMap<Class<?>, Subject<T, T>>.

Observing multiple events at once

Let's say that we have two buses with different events and we want to listen all of them in a single observable. To achieve this we can use merge operator which merges items from different observables emitting them all in a single sequence.

This is one of the advantages over classic event buses - you can apply operators and you can combine bus and other observables together.

public static <E1, E2> Observable<Object> observeEvents(EventBus<Object> bus, Class<E1> class1, Class<E2> class2) {
  return Observable.merge(bus.observeEvents(class1), bus.observeEvents(class2));
}

If we know that these event classes have same parent, then this observable can be transformed using cast operator.

Operator merge has many overloads, also you can use different combining and other available operators.

Generic bus usage sample

Source code of the sample can be found on GitHub, you can install and run sample on Android device.

Lets say we have a location service which runs in a background and posts events with location changes. We want to listen these events and show last known location.

We define our service as singleton and create bus for any class of event:

private static final EventBus<Object> sAppBus = EventBus.createWithLatest();//singleton bus
private static final MyLocationService sLocationService = new MyLocationService();//singleton background service

Service has main method run:

public void run() {
  //for simplicity I simulated background location service using observables
  if (mSubscription == null) {
    mSubscription = observeLocationChanges().subscribe(new Action1<LocationChangedEvent>() {
      @Override
      public void call(LocationChangedEvent locationChangedEvent) {
        sAppBus.post(locationChangedEvent);
      }
    });
  }
}

Events are generated each second:

private static Observable<LocationChangedEvent> observeLocationChanges() {
  final Location[] locations = new Location[]{
    createLocation(59.9500, 30.3000),//Saint-Petersburg
    createLocation(55.7500, 37.6167),//Moscow
    createLocation(52.5167, 13.3833),//Berlin
    createLocation(48.8567, 2.3508),//Paris
    createLocation(51.5072, 0.1275),//London
    createLocation(40.7127, -74.0059)//New York
  };
  //generate new location every second, but first is generated after 3
  return Observable.timer(3L, 1L, TimeUnit.SECONDS).map(new Func1<Long, LocationChangedEvent>() {
    @Override
    public LocationChangedEvent call(Long counter) {
      Location location = new Location(locations[((int) (counter % locations.length))]);
      location.setTime(System.currentTimeMillis());//set time when location has "changed"
      return new LocationChangedEvent(location);
    }
  });
}

These events will be sent from background thread, because operator timer uses computation scheduler.

And listening for events:

private void listenLocationChanges(final TextView lastKnownLocation) {
  showToast("Listening for location changes");
  mLocationChangesSubscription = sAppBus.observeEvents(LocationChangedEvent.class)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<LocationChangedEvent>() {
        @Override
        public void call(LocationChangedEvent event) {
          lastKnownLocation.setVisibility(View.VISIBLE);
          Location location = event.getLocation();
          lastKnownLocation.setText(getString(R.string.last_known_location, location.toString()
            + "\nupdated: " + DateFormat.getTimeInstance().format(location.getTime())));
        }
    });
    mSubscriptions.add(mLocationChangesSubscription);
}

If you run sample, you will notice, that last event is saved. After rotating device and clicking 'Listen..' again, you will see that saved event is immediately used to show last known location. This happens because I used BehaviorSubject and because bus is retained upon rotation change.

If service is running and new listener of events is registered it immediately receives last event (if such is available), even though event occured while no one was listening for it.


Comments

comments powered by Disqus