Package rx.observables
Class SyncOnSubscribe<S,T>
- java.lang.Object
-
- rx.observables.SyncOnSubscribe<S,T>
-
- Type Parameters:
S- the type of the user-define state used ingenerateState(S),next(S, Subscriber), andonUnsubscribe(S).T- the type ofSubscribersthat will be compatible withthis.
- All Implemented Interfaces:
Action,Action1<Subscriber<? super T>>,Function,Observable.OnSubscribe<T>
- Direct Known Subclasses:
SyncOnSubscribe.SyncOnSubscribeImpl
@Beta public abstract class SyncOnSubscribe<S,T> extends java.lang.Object implements Observable.OnSubscribe<T>
A utility class to createOnSubscribe<T>functions that respond correctly to back pressure requests from subscribers. This is an improvement overObservable.create(OnSubscribe)which does not provide any means of managing back pressure requests out-of-the-box.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classSyncOnSubscribe.SubscriptionProducer<S,T>Contains the producer loop that reacts to downstream requests of work.(package private) static classSyncOnSubscribe.SyncOnSubscribeImpl<S,T>An implementation of SyncOnSubscribe that delegatesSyncOnSubscribe#next(Object, Subscriber),generateState(), andonUnsubscribe(Object)to provided functions/closures.
-
Constructor Summary
Constructors Constructor Description SyncOnSubscribe()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidcall(Subscriber<? super T> subscriber)static <S,T>
SyncOnSubscribe<S,T>createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T>
SyncOnSubscribe<S,T>createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next, Action1<? super S> onUnsubscribe)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T>
SyncOnSubscribe<S,T>createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T>
SyncOnSubscribe<S,T>createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next, Action1<? super S> onUnsubscribe)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> SyncOnSubscribe<java.lang.Void,T>createStateless(Action1<? super Observer<? super T>> next)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> SyncOnSubscribe<java.lang.Void,T>createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe)Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.protected abstract SgenerateState()Executed once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value.protected abstract Snext(S state, Observer<? super T> observer)Called to produce data to the downstream subscribers.protected voidonUnsubscribe(S state)Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
-
-
-
Method Detail
-
call
public final void call(Subscriber<? super T> subscriber)
-
generateState
protected abstract S generateState()
Executed once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value. This value is passed intonext(S state, Observeron the first iteration. Subsequent iterations ofobserver) nextwill receive the state returned by the previous invocation ofnext.- Returns:
- the initial state value
-
next
protected abstract S next(S state, Observer<? super T> observer)
Called to produce data to the downstream subscribers. To emit data to a downstream subscriber callobserver.onNext(t). To signal an error condition callobserver.onError(throwable)or throw an Exception. To signal the end of a data stream callobserver.onCompleted(). Implementations of this method must follow the following rules.- Must not call
observer.onNext(t)more than 1 time per invocation. - Must not call
observer.onNext(t)concurrently.
stateargument of the next invocation of this method.- Parameters:
state- the state value (fromgenerateState()on the first invocation or the previous invocation of this method.observer- the observer of data emitted by- Returns:
- the next iteration's state value
- Must not call
-
onUnsubscribe
protected void onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.- Parameters:
state- the last state value prior fromgenerateState()ornext(S, Observer<T>)before unsubscribe.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
T- the type of the generated valuesS- the type of the associated state with each Subscriber- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data in a protocol compatible with back-pressure.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next, Action1<? super S> onUnsubscribe)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a SyncOnSubscribe without an explicit clean up step.- Type Parameters:
T- the type of the generated valuesS- the type of the associated state with each Subscriber- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next, Action1<? super S> onUnsubscribe)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
T- the type of the generated valuesS- the type of the associated state with each Subscriber- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
T- the type of the generated valuesS- the type of the associated state with each Subscriber- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<java.lang.Void,T> createStateless(Action1<? super Observer<? super T>> next)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<java.lang.Void,T> createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe)
Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
-