Class SchedulerWhen
- java.lang.Object
-
- rx.Scheduler
-
- rx.internal.schedulers.SchedulerWhen
-
- All Implemented Interfaces:
Subscription
@Experimental public class SchedulerWhen extends Scheduler implements Subscription
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on thisScheduler. The only parameter is a function that flattens anObservableofObservableofCompletables into just oneCompletable. There must be a chain of operators connecting the returned value to the sourceObservableotherwise any work scheduled on the returnedSchedulerwill not be executed.When
Scheduler.createWorker()is invoked aObservableofCompletables is onNext'd to the combinator to be flattened. If the innerObservableis not immediately subscribed to an calls toScheduler.Worker.schedule(rx.functions.Action0)are buffered. Once theObservableis subscribed to actions are then onNext'd asCompletables.Finally the actions scheduled on the parent
Schedulerwhen the inner mostCompletables are subscribed to.When the
Scheduler.Workeris unsubscribed theCompletableemits an onComplete and triggers any behavior in the flattening operator. TheObservableand allCompletables give to the flattening function never onError.Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitSched = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Observable.merge(workers), 2); });This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent
Scheduler.Workers rather than individual actions. Generally eachObservableuses its ownScheduler.Worker. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators likeObservable.zip(Observable, Observable, rx.functions.Func2)where subscribing to the firstObservablecould deadlock the subscription to the second.Scheduler limitSched = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Observables two at a time return Completable.merge(Observable.merge(workers, 2)); });Slowing down the rate to no more than than 1 a second. This suffers from the same problem as the one above I could find anObservableoperator that limits the rate without dropping the values (aka leaky bucket algorithm).Scheduler slowSched = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static classSchedulerWhen.DelayedActionprivate static classSchedulerWhen.ImmediateActionprivate static classSchedulerWhen.ScheduledAction-
Nested classes/interfaces inherited from class rx.Scheduler
Scheduler.Worker
-
-
Field Summary
Fields Modifier and Type Field Description private ScheduleractualSchedulerprivate static SubscriptionSUBSCRIBEDprivate Subscriptionsubscriptionprivate static SubscriptionUNSUBSCRIBEDprivate Observer<Observable<Completable>>workerObserver
-
Constructor Summary
Constructors Constructor Description SchedulerWhen(Func1<Observable<Observable<Completable>>,Completable> combine, Scheduler actualScheduler)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description Scheduler.WorkercreateWorker()Retrieves or creates a newScheduler.Workerthat represents serial execution of actions.booleanisUnsubscribed()Indicates whether thisSubscriptionis currently unsubscribed.voidunsubscribe()Stops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.
-
-
-
Field Detail
-
actualScheduler
private final Scheduler actualScheduler
-
workerObserver
private final Observer<Observable<Completable>> workerObserver
-
subscription
private final Subscription subscription
-
SUBSCRIBED
private static final Subscription SUBSCRIBED
-
UNSUBSCRIBED
private static final Subscription UNSUBSCRIBED
-
-
Constructor Detail
-
SchedulerWhen
public SchedulerWhen(Func1<Observable<Observable<Completable>>,Completable> combine, Scheduler actualScheduler)
-
-
Method Detail
-
unsubscribe
public void unsubscribe()
Description copied from interface:SubscriptionStops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.This allows unregistering an
Subscriberbefore it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribein interfaceSubscription
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:SubscriptionIndicates whether thisSubscriptionis currently unsubscribed.- Specified by:
isUnsubscribedin interfaceSubscription- Returns:
trueif thisSubscriptionis currently unsubscribed,falseotherwise
-
createWorker
public Scheduler.Worker createWorker()
Description copied from class:SchedulerRetrieves or creates a newScheduler.Workerthat represents serial execution of actions.When work is completed it should be unsubscribed using
Subscription.unsubscribe().Work on a
Scheduler.Workeris guaranteed to be sequential.- Specified by:
createWorkerin classScheduler- Returns:
- a Worker representing a serial queue of actions to be executed
-
-