Package rx.internal.operators
Class OperatorMerge.MergeSubscriber<T>
- java.lang.Object
-
- rx.Subscriber<Observable<? extends T>>
-
- rx.internal.operators.OperatorMerge.MergeSubscriber<T>
-
- Type Parameters:
T- the value type
- All Implemented Interfaces:
Observer<Observable<? extends T>>,Subscription
- Enclosing class:
- OperatorMerge<T>
static final class OperatorMerge.MergeSubscriber<T> extends Subscriber<Observable<? extends T>>
The subscriber that observes Observables.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) Subscriber<? super T>child(package private) booleandelayErrors(package private) booleandone(package private) booleanemittingGuarded by this.(package private) static OperatorMerge.InnerSubscriber<?>[]EMPTYAn empty array to avoid creating new empty arrays in removeInner.(package private) java.util.concurrent.ConcurrentLinkedQueue<java.lang.Throwable>errorsDue to the emission loop, we need to store errors somewhere if !delayErrors.(package private) java.lang.ObjectinnerGuard(package private) OperatorMerge.InnerSubscriber<?>[]innerSubscribersCopy-on-write array, guarded by innerGuard.(package private) longlastIdWhich was the last InnerSubscriber that emitted? Accessed if emitting == true.(package private) intlastIndexWhat was its index in the innerSubscribers array? Accessed if emitting == true.(package private) intmaxConcurrent(package private) booleanmissedGuarded by this.(package private) NotificationLite<T>nl(package private) OperatorMerge.MergeProducer<T>producer(package private) java.util.Queue<java.lang.Object>queue(package private) intscalarEmissionCount(package private) intscalarEmissionLimit(package private) CompositeSubscriptionsubscriptionsTracks the active subscriptions to sources.(package private) longuniqueIdUsed to generate unique InnerSubscriber IDs.
-
Constructor Summary
Constructors Constructor Description MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) voidaddInner(OperatorMerge.InnerSubscriber<T> inner)(package private) booleancheckTerminate()Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.(package private) voidemit()(package private) voidemitEmpty()(package private) voidemitLoop()The standard emission loop serializing events and requests.protected voidemitScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r)protected voidemitScalar(T value, long r)(package private) CompositeSubscriptiongetOrCreateComposite()(package private) java.util.Queue<java.lang.Throwable>getOrCreateErrorQueue()voidonCompleted()Notifies the Observer that theObservablehas finished sending push-based notifications.voidonError(java.lang.Throwable e)Notifies the Observer that theObservablehas experienced an error condition.voidonNext(Observable<? extends T> t)Provides the Observer with a new item to observe.protected voidqueueScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value)protected voidqueueScalar(T value)(package private) voidremoveInner(OperatorMerge.InnerSubscriber<T> inner)private voidreportError()voidrequestMore(long n)(package private) voidtryEmit(OperatorMerge.InnerSubscriber<T> subscriber, T value)Tries to emit the value directly to the child if no concurrent emission is happening at the moment.(package private) voidtryEmit(T value)Tries to emit the value directly to the child if no concurrent emission is happening at the moment.-
Methods inherited from class rx.Subscriber
add, isUnsubscribed, onStart, request, setProducer, unsubscribe
-
-
-
-
Field Detail
-
child
final Subscriber<? super T> child
-
delayErrors
final boolean delayErrors
-
maxConcurrent
final int maxConcurrent
-
producer
OperatorMerge.MergeProducer<T> producer
-
queue
volatile java.util.Queue<java.lang.Object> queue
-
subscriptions
volatile CompositeSubscription subscriptions
Tracks the active subscriptions to sources.
-
errors
volatile java.util.concurrent.ConcurrentLinkedQueue<java.lang.Throwable> errors
Due to the emission loop, we need to store errors somewhere if !delayErrors.
-
nl
final NotificationLite<T> nl
-
done
volatile boolean done
-
emitting
boolean emitting
Guarded by this.
-
missed
boolean missed
Guarded by this.
-
innerGuard
final java.lang.Object innerGuard
-
innerSubscribers
volatile OperatorMerge.InnerSubscriber<?>[] innerSubscribers
Copy-on-write array, guarded by innerGuard.
-
uniqueId
long uniqueId
Used to generate unique InnerSubscriber IDs. Modified from onNext only.
-
lastId
long lastId
Which was the last InnerSubscriber that emitted? Accessed if emitting == true.
-
lastIndex
int lastIndex
What was its index in the innerSubscribers array? Accessed if emitting == true.
-
EMPTY
static final OperatorMerge.InnerSubscriber<?>[] EMPTY
An empty array to avoid creating new empty arrays in removeInner.
-
scalarEmissionLimit
final int scalarEmissionLimit
-
scalarEmissionCount
int scalarEmissionCount
-
-
Constructor Detail
-
MergeSubscriber
public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent)
-
-
Method Detail
-
getOrCreateErrorQueue
java.util.Queue<java.lang.Throwable> getOrCreateErrorQueue()
-
getOrCreateComposite
CompositeSubscription getOrCreateComposite()
-
onNext
public void onNext(Observable<? extends T> t)
Description copied from interface:ObserverProvides the Observer with a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onCompleted()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
emitEmpty
void emitEmpty()
-
reportError
private void reportError()
-
onError
public void onError(java.lang.Throwable e)
Description copied from interface:ObserverNotifies the Observer that theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onCompleted().- Parameters:
e- the exception encountered by the Observable
-
onCompleted
public void onCompleted()
Description copied from interface:ObserverNotifies the Observer that theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable).
-
addInner
void addInner(OperatorMerge.InnerSubscriber<T> inner)
-
removeInner
void removeInner(OperatorMerge.InnerSubscriber<T> inner)
-
tryEmit
void tryEmit(OperatorMerge.InnerSubscriber<T> subscriber, T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber-value-
-
queueScalar
protected void queueScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value)
-
emitScalar
protected void emitScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r)
-
requestMore
public void requestMore(long n)
-
tryEmit
void tryEmit(T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber-value-
-
queueScalar
protected void queueScalar(T value)
-
emitScalar
protected void emitScalar(T value, long r)
-
emit
void emit()
-
emitLoop
void emitLoop()
The standard emission loop serializing events and requests.
-
checkTerminate
boolean checkTerminate()
Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.- Returns:
- true if the child unsubscribed or there are errors available and merge doesn't delay errors.
-
-