鍍金池/ 教程/ Android/ Incrementally Agerifying legacy code
Custom observables
Compiled functions
Reactive programming
Reservoirs and parallelism
Incrementally Agerifying legacy code
Observables and updatables
Compiled repositories
Repositories

Incrementally Agerifying legacy code

Agera引入的代碼風(fēng)格也許適合從零開(kāi)始的新建app項(xiàng)目。這篇包括一些提示,來(lái)幫助想在遺留代碼中使用Agera的開(kāi)發(fā)者,如此往下做。

Upgrading legacy observer pattern

--升級(jí)原有觀察者模式

觀察者模式有很多種實(shí)現(xiàn)方式,但不是所有的都可以通過(guò)簡(jiǎn)單的放入遷入到Agera中。下面是一個(gè)例子:演示將一個(gè)監(jiān)聽(tīng)(listenable)類添加到Observable接口的一種方法。

MyListenable類可以增加(addListener)和刪除(removeListener)Listener,作為額外完整的演示,它繼承了SomeBaseClass

該實(shí)例使用UpdateDispatcher來(lái)解決Java的單繼承約束,使用一個(gè)內(nèi)部類(Bridge)來(lái)做橋接, 保持其完整的原始API,同時(shí)也使Agera可見(jiàn)。

public final class MyListenable extends SomeBaseClass implements Observable {

  private final UpdateDispatcher updateDispatcher;

  public MyListenable() {
    // Original constructor code here...
    updateDispatcher = Observables.updateDispatcher(new Bridge());
  }

  // Original class body here... including:
  public void addListener(Listener listener) { … }
  public void removeListener(Listener listener) { … }

  @Override
  public void addUpdatable(Updatable updatable) {
    updateDispatcher.addUpdatable(updatable);
  }

  @Override
  public void removeUpdatable(Updatable updatable) {
    updateDispatcher.removeUpdatable(updatable);
  }

  private final class Bridge implements ActivationHandler, Listener {
    @Override
    public void observableActivated(UpdateDispatcher caller) {
      addListener(this);
    }

    @Override
    public void observableDeactivated(UpdateDispatcher caller) {
      removeListener(this);
    }

    @Override
    public void onEvent() { // Listener implementation
      updateDispatcher.update();
    }
  }
}

Exposing synchronous operations as repositories

--揭秘repository的同步操作

Java本質(zhì)是一種同步語(yǔ)言,如:在Java中最低級(jí)別的操作都是同步方法。 當(dāng)操作可能會(huì)花一些時(shí)間才能返回(耗時(shí)操作),這種方法通常稱為阻塞方法,而且禁止開(kāi)發(fā)者在主線程(UI Thread)調(diào)用。

假設(shè)app的UI需要從阻塞的方法獲得數(shù)據(jù)。Agera可以很容易的通過(guò)后臺(tái)線程完成調(diào)用,然后UI可以在主線程中接收數(shù)據(jù)。首先,這個(gè)阻塞操作需要封裝成一個(gè)Agera操作,像這樣:

public class NetworkCallingSupplier implements Supplier<Result<ResponseBlob>> {
  private final RequestBlob request = …;

  @Override
  public Result<ResponseBlob> get() {
    try {
       ResponseBlob blob = networkStack.execute(request); // blocking call
       return Result.success(blob);
    } catch (Throwable e) {
       return Result.failure(e);
    }
  }
}

Supplier<Result<ResponseBlob>> networkCall = new NetworkCallingSupplier();

Repository<Result<ResponseBlob>> responseRepository =
    Repositories.repositoryWithInitialValue(Result.<ResponseBlob>absent())
        .observe() // no event source; works on activation
        .onUpdatesPerLoop() // but this line is still needed to compile
        .goTo(networkingExecutor)
        .thenGetFrom(networkCall)
        .compile();

上面的代碼段假定了,在Repository.compile()之前這個(gè)請(qǐng)求是已知且永遠(yuǎn)不變的。

這個(gè)很容易升級(jí)成為一個(gè)動(dòng)態(tài)請(qǐng)求,甚至在repository同樣的激活周期期間。

要可以修改請(qǐng)求,簡(jiǎn)單的方式是使用MutableRepository。 此外,為了在第一次請(qǐng)求為完成之前就可以提供數(shù)據(jù),可以在Result中一個(gè)提供初始值:absent()

MutableRepository這種用法類似于是一個(gè)可變的變量(可為null),故命名為requestVariable。

// MutableRepository<RequestBlob> requestVariable =
//     mutableRepository(firstRequest);
// OR:
MutableRepository<Result<RequestBlob>> requestVariable =
    mutableRepository(Result.<RequestBlob>absent());

然后, 不是在supplier中封裝阻塞方法,使用function實(shí)現(xiàn)動(dòng)態(tài)請(qǐng)求:

public class NetworkCallingFunction
    implements Function<RequestBlob, Result<ResponseBlob>> {
  @Override
  public Result<ResponseBlob> apply(RequestBlob request) {
    try {
       ResponseBlob blob = networkStack.execute(request);
       return Result.success(blob);
    } catch (Throwable e) {
       return Result.failure(e);
    }
  }
}

Function<RequestBlob, Result<ResponseBlob>> networkCallingFunction =
    new NetworkCallingFunction();

升級(jí)后的repository可以像這樣compiled:

Result<ResponseBlob> noResponse = Result.absent();
Function<Throwable, Result<ResponseBlob>> withNoResponse =
    Functions.staticFunction(noResponse);
Repository<Result<ResponseBlob>> responseRepository =
    Repositories.repositoryWithInitialValue(noResponse)
        .observe(requestVariable)
        .onUpdatesPerLoop()
        // .getFrom(requestVariable) if it does not supply Result, OR:
        .attemptGetFrom(requestVariable).orEnd(withNoResponse)
        .goTo(networkingExecutor)
        .thenTransform(networkCallingFunction)
        .compile();

這部分代碼段還演示了一點(diǎn):通過(guò)給操作一個(gè)特殊的名字,讓repository的編譯表達(dá)式更易讀。

Wrapping asynchronous calls in repositories

--repository的異步調(diào)用封裝

現(xiàn)在的很多Library都有異步API和內(nèi)置的線程,但是客戶端不能控制或禁用。

app中有這樣的Library的話,引入Agera將是一個(gè)具有挑戰(zhàn)性的工作。 一個(gè)直接的辦法就是找到Library中同步選擇的點(diǎn),采用[[如上段所述|Incrementally-Agerifying-legacy-code#exposing-synchronous-operations-as-repositories]]方法。

另一個(gè)方式(反模式):切換后臺(tái)線程執(zhí)行異步調(diào)用并等待結(jié)果,然后同步拿結(jié)果。上面方式不可行時(shí),這一節(jié)討論一個(gè)合適的解決方法。

異步調(diào)用的一個(gè)循環(huán)模式是請(qǐng)求-響應(yīng) 結(jié)構(gòu)。下面的示例假定這樣結(jié)構(gòu):未完成的工作可以被取消,但是不指定回調(diào)的線程。

interface AsyncOperator<P, R> {
  Cancellable request(P param, Callback<R> callback);
}

interface Callback<R> {
  void onResponse(R response); // Can be called from any thread
}

interface Cancellable {
  void cancel();
}

下面repository例子,使用AsyncOperator提供數(shù)據(jù), 完成響應(yīng)式請(qǐng)求(一個(gè)抽象的supplier類)。

這段代碼假定AsyncOperator已經(jīng)有足夠的緩存,因此重復(fù)的請(qǐng)求不會(huì)影響性能。

public class AsyncOperatorRepository<P, R> extends BaseObservable
    implements Repository<Result<R>>, Callback<R> {

  private final AsyncOperator<P, R> asyncOperator;
  private final Supplier<P> paramSupplier;

  private Result<R> result;
  private Cancellable cancellable;

  public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
      Supplier<P> paramSupplier) {
    this.asyncOperator = asyncOperator;
    this.paramSupplier = paramSupplier;
    this.result = Result.absent();
  }

  @Override
  protected synchronized void observableActivated() {
    cancellable = asyncOperator.request(paramSupplier.get(), this);
  }

  @Override
  protected synchronized void observableDeactivated() {
    if (cancellable != null) {
      cancellable.cancel();
      cancellable = null;
    }
  }

  @Override
  public synchronized void onResponse(R response) {
    cancellable = null;
    result = Result.absentIfNull(response);
    dispatchUpdate();
  }

  @Override
  public synchronized Result<R> get() {
    return result;
  }
}

這個(gè)類可以很容易地升級(jí)到可以修改請(qǐng)求參數(shù),而這個(gè)過(guò)程就類似于前面的討論:讓repository提供請(qǐng)求參數(shù),并讓AsyncOperatorRepository觀察請(qǐng)求參數(shù)變化。

在激活期間,觀察請(qǐng)求參數(shù)的變化,取消任何正在進(jìn)行的請(qǐng)求,并發(fā)出新的請(qǐng)求,如下所示:

public class AsyncOperatorRepository<P, R> extends BaseObservable
    implements Repository<Result<R>>, Callback<R>, Updatable {

  private final AsyncOperator<P, R> asyncOperator;
  private final Repository<P> paramRepository;

  private Result<R> result;
  private Cancellable cancellable;

  public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
      Repository<P> paramRepository) {
    this.asyncOperator = asyncOperator;
    this.paramRepository = paramRepository;
    this.result = Result.absent();
  }

  @Override
  protected void observableActivated() {
    paramRepository.addUpdatable(this);
    update();
  }

  @Override
  protected synchronized void observableDeactivated() {
    paramRepository.removeUpdatable(this);
    cancelOngoingRequestLocked();
  }

  @Override
  public synchronized void update() {
    cancelOngoingRequestLocked();
    // Adapt accordingly if paramRepository supplies a Result.
    cancellable = asyncOperator.request(paramRepository.get(), this);
  }

  private void cancelOngoingRequestLocked() {
    if (cancellable != null) {
      cancellable.cancel();
      cancellable = null;
    }
  }

  @Override
  public synchronized void onResponse(R response) {
    cancellable = null;
    result = Result.absentIfNull(response);
    dispatchUpdate();
  }

  // Similar process for fallible requests (typically with an
  // onError(Throwable) callback): wrap the failure in a Result and
  // dispatchUpdate().

  @Override
  public synchronized Result<R> get() {
    return result;
  }
}