Reactive Programming with RxJava-Chapter5:Reactive from Top to Bottom(2)

    xiaoxiao2021-03-25  53

    Relational Database Access

    There are reasons for JDBC to remain blocking. 1. Query parsing (CPU-bound) translates a String containing a query into a parse tree. 2. Query optimizer (CPU-bound) evaluates the query against various rules and statisics,try to build an execution plan 3. Query executor (I/O-bound) traverses database storage and finds appropriate tuples to return. 4. Result set (network-bound) is serialized and pushed back to the client

    The advice for interacting with relational databases is to actually have a dedicated,well-tuned thread pool and isolate the blocking code there.

    CompletableFuture and Streams

    CompletableFuture class significantly improves the Future interface known since Java 5.

    import static java.util.function.Function.identity List<TravelAgency> agencies = //... CompletableFuture<User> user = findByIdAsync(id); CompletableFuture<GeoLocation> location = locateAsync(); CompletableFuture<Ticket> ticketFuture = user .thenCombine(location,(User us,GeoLocation loc)) -> agencies .stream() .map(agency -> agency.searchAsync(us,loc)) .reduce((f1,f2) -> f1.applyToEither(f2,identity()) ) .get() .thenCompose(identity()) .thenCompose(this::bookAsync);

    CompletableFuture shares a lot of similarities with Observable.

    CompletableFutureObservablethenApply()map()thenCombine()zip()anyOf(),applyToEither()amb()thenCompose()flatMap()

    Interoperability with CompletableFuture

    Semantically,you can treat CompletableFuture like an Observable that has the following characteristics:

    It is hot. The computation behind CompletableFuture starts eagerly,regardless of whether someone registered any callbacks like thenApply() or not.It is cached. The background computation behind CompletableFuture is triggered once eagerly and the result is forwarded to alll registered callbacks.Moreover,iif a callback is registered after completion,it is immediately with completed value (or execption)It emits exactly one element or exception. In principle,Future completes exactly once (or never) with a value of type T or an exception.This matches the contract of Observable.

    Turning CompletableFuture into Observable with Single item

    class Util{ static <T> Observable<T> observe(CompletableFuture<T> future){ return Observable.create( subscriber -> { future.whenComplete((value,exception) -> { if(exception !=null){ subscriber.onError(exception); }else{ subscriber.onNext(value); subscriber.onCompleted(); } }); }); } }

    Remember that CompletableFuture is hot and cached using Rx terminology.It begins computation immediately,whereas Observable will not start computation until someone actually subscribes.

    Observable<User> rxFindById(long id){ return Util.observe(findByIdAsync(id)); } Observable<GeoLoaction> rxLocate(){ return Util.observe(LocateAsync()); } Observable<Ticket> rxBook(Flight flight){ return Util.observe(bookAsync(flight)); } Observable<TravelAgency> agencies = agencies(); Observable<User> user = rxFindById(id); Observable<GeoLocation> location = rxLocate(); Observable<Ticket> ticket = user .zipWith(location,(us,loc)) -> agencies .flatMap(agency -> agency.rxSearch(us,loc)) .first() ) .flatMap(x -> x) .flatMap(this::rxBook);

    From Observable to CompletableFuture

    static <T> CompletableFuture<T> toFuture(Observable<T> observable) { CompletableFuture<T> promise = new CompletableFuture<T>(); observable .single() .subscribe( promise::complete, promise::completeExceptionally ); return promise; } static <T> Completable<<List<T>>> toFutureList(Observable<T> observable) { return toFuture(observable.toList()); }

    Observable versus Single

    Creating and Consuming Single

    AsyncHttpClient asyncHttpClient = new AsyncHttpClient() Single<Response> fetch(String address){ return Single.create(subscriber -> asyncHttpClient .prepareGet(address) .execute(handler(subscriber))); } AsyncCompletionHandler handler(SingleSubscriber<? super Response> subscriber){ return new AsyncCompletionHandler() { public Response onComplete(Response response){ subscriber.onSuccess(response); return response; } public void onThrowable(Throwable t){ subscriber.onError(t); } } }

    Combining Responses Using zip,merge,and concat

    Interoperablility with Observable and CompletableFuture

    However there are two situations for which conversion between Observable and Single makes sense: - When we use Single as an Observable that emits one value and completion notification (or error notitication) - When Single is missing certain operators available in Observable.

    When to Use Single?

    You should use Single in the following scenarios: - An operation must complete with some particular value or an exception. - There is no such thing as a stream in your problem domain;using Observable would be misleading and an overkill; - Observable is too heavyweight and you measured tha Single is faster in your paticular problem

    You should prefer Observable for these circumstances: - You model some sort of events (messages,GUI events) which are by definition occurring several times,possibly infinite. - Or entirely the opposite,you expert the value to occur or not before completion.

    最后,安利一款自己写的基于MVP的Android开发框架 https://github.com/sxenon/Pure 欢迎大家拍砖,如果觉得好,麻烦多多star

    转载请注明原文地址: https://ju.6miu.com/read-37292.html

    最新回复(0)