BitDroid

Converting callbacks to RxJava Observables

Philipp Eichhorn • • coding and android

While working on our latests FauDroids app, I ran across a situation where I wanted to convert an API which uses callbacks to one that uses rx.Observable. The two seemed so related I figured surely the web must be full with tutorials explaining just how to do that. Well, turns out it isn’t. Here and there you’ll find some small examples for other languages, but nothing really that helped me with Java / Android.

So, after some digging through the RxJava API, I found the following method for creating Observable objects:

public static final <T> Observable<T> from
(
  java.util.concurrent.Future<? extends T> future
)

While this doesn’t necessarily scream “convert callback to Observable”, it was enough to make me come up with an adapter class which actually does convert callbacks.

Assuming you have the following callback defined somewhere …

public interface MyCallback {
  void onNewData(String data);
}

… and a method which uses this callback …

public void readData(MyCallback callback) {
  callback.onNewData("hello world");
}

… the class blow will convert that callback to an Observable:

public class MyCallbackAdapter implements MyCallback {

  private final FutureAdapter futureAdapter = new FutureAdapter();

  @Override
  public void onNewData(String data) {
    futureAdapter.setValue(data);
  }

  public Observable<String> toObservable() {
    return Observable.from(futureAdapter);
  }

  public class FutureAdapter implements Future<String> {

    private boolean isDone;
    private String value;

    public void setValue(String value) {
      synchronized (this) {
        this.value = value;
        this.isDone = true;
        this.notify();
      }
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return false;
    }

    @Override
    public boolean isCancelled() {
      return false;
    }

    @Override
    public boolean isDone() {
      return isDone;
    }

    @Override
    public String get() throws InterruptedException {
      synchronized (this) {
        while (value == null) {
          this.wait();
        }
      }
      return value;
    }

    @Override
    public String get(long t, TimeUnit u) throws InterruptedException {
      return get();
    }
  }
}

From there using the adapter class is pretty straight forward:

MyCallbackAdapter adapter = new MyCallbackAdapter();
readData(adapter);

adapter.toObservable().subscribe(new Action1<String>() {
  @Override
  public void call(String data) {
    // something cool here
  }
});

That’s it!

Unfortunately this method of converting callbacks is a little verbose. It helps if you throw in some generics (e.g. if the callback has a generic method parameter) but otherwise you’re stuck with writing out the whole Future object. In addition this method feels slightly hackish, but maybe that is just fine in this case.

comments powered by Disqus