Class OnSubscribePublishMulticast<T>
- java.lang.Object
-
- java.lang.Number
-
- java.util.concurrent.atomic.AtomicInteger
-
- rx.internal.operators.OnSubscribePublishMulticast<T>
-
- Type Parameters:
T- the input and output type
- All Implemented Interfaces:
java.io.Serializable,Action,Action1<Subscriber<? super T>>,Function,Observable.OnSubscribe<T>,Observer<T>,Subscription
public final class OnSubscribePublishMulticast<T> extends java.util.concurrent.atomic.AtomicInteger implements Observable.OnSubscribe<T>, Observer<T>, Subscription
Multicasts notifications coming through its input Subscriber view to its client Subscribers via lockstep backpressure mode.The difference between this class and OperatorPublish is that this class doesn't consume the upstream if there are no child subscribers but waits for them to show up. Plus if the upstream source terminates, late subscribers will be immediately terminated with the same terminal event unlike OperatorPublish which just waits for the next connection.
The class extends AtomicInteger which is the work-in-progress gate for the drain-loop serializing subscriptions and child request changes.
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classOnSubscribePublishMulticast.ParentSubscriber<T>The subscriber that must be used for subscribing to the upstream source.(package private) static classOnSubscribePublishMulticast.PublishProducer<T>A Producer and Subscription that wraps a child Subscriber and manages its backpressure requests along with its unsubscription from the parent class.
-
Field Summary
Fields Modifier and Type Field Description (package private) booleandelayErrorDelays the error delivery to happen only after all values have been consumed.(package private) booleandoneIndicates the upstream has completed.(package private) static OnSubscribePublishMulticast.PublishProducer<?>[]EMPTYRepresents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.(package private) java.lang.ThrowableerrorHolds onto the upstream's exception if done is true and this field is non-null.(package private) OnSubscribePublishMulticast.ParentSubscriber<T>parentThe subscriber that can be 'connected' to the upstream source.(package private) intprefetchThe number of items to prefetch from the upstreams source.(package private) ProducerproducerHolds the upstream producer if any, set through the parent subscriber.(package private) java.util.Queue<T>queueThe prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.private static longserialVersionUID(package private) OnSubscribePublishMulticast.PublishProducer<T>[]subscribersA copy-on-write array of currently subscribed child subscribers' wrapper structure.(package private) static OnSubscribePublishMulticast.PublishProducer<?>[]TERMINATEDRepresents a final state for this class that prevents new subscribers from subscribing to it.
-
Constructor Summary
Constructors Constructor Description OnSubscribePublishMulticast(int prefetch, boolean delayError)Constructor, initializes the fields
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleanadd(OnSubscribePublishMulticast.PublishProducer<T> inner)Atomically adds the given wrapper of a child Subscriber to the subscribers array.voidcall(Subscriber<? super T> t)(package private) booleancheckTerminated(boolean d, boolean empty)Given the current source state, terminates all child subscribers.(package private) voiddrain()The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.booleanisUnsubscribed()Indicates whether thisSubscriptionis currently unsubscribed.voidonCompleted()Notifies the Observer that theObservablehas finished sending push-based notifications.voidonError(java.lang.Throwable e)Notifies the Observer that theObservablehas experienced an error condition.voidonNext(T t)Provides the Observer with a new item to observe.(package private) voidremove(OnSubscribePublishMulticast.PublishProducer<T> inner)Atomically removes the given wrapper, if present, from the subscribers array.(package private) voidsetProducer(Producer p)Sets the main producer and issues the prefetch amount.Subscriber<T>subscriber()Returns the input subscriber of this class that must be subscribed to the upstream source.(package private) OnSubscribePublishMulticast.PublishProducer<T>[]terminate()Atomically swaps in the terminated state.voidunsubscribe()Stops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.-
Methods inherited from class java.util.concurrent.atomic.AtomicInteger
accumulateAndGet, addAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, decrementAndGet, doubleValue, floatValue, get, getAcquire, getAndAccumulate, getAndAdd, getAndDecrement, getAndIncrement, getAndSet, getAndUpdate, getOpaque, getPlain, incrementAndGet, intValue, lazySet, longValue, set, setOpaque, setPlain, setRelease, toString, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
-
-
-
-
Field Detail
-
serialVersionUID
private static final long serialVersionUID
- See Also:
- Constant Field Values
-
queue
final java.util.Queue<T> queue
The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.
-
prefetch
final int prefetch
The number of items to prefetch from the upstreams source.
-
delayError
final boolean delayError
Delays the error delivery to happen only after all values have been consumed.
-
parent
final OnSubscribePublishMulticast.ParentSubscriber<T> parent
The subscriber that can be 'connected' to the upstream source.
-
done
volatile boolean done
Indicates the upstream has completed.
-
error
java.lang.Throwable error
Holds onto the upstream's exception if done is true and this field is non-null.This field must be read after done or if subscribers == TERMINATED to establish a proper happens-before.
-
producer
volatile Producer producer
Holds the upstream producer if any, set through the parent subscriber.
-
subscribers
volatile OnSubscribePublishMulticast.PublishProducer<T>[] subscribers
A copy-on-write array of currently subscribed child subscribers' wrapper structure.
-
EMPTY
static final OnSubscribePublishMulticast.PublishProducer<?>[] EMPTY
Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.
-
TERMINATED
static final OnSubscribePublishMulticast.PublishProducer<?>[] TERMINATED
Represents a final state for this class that prevents new subscribers from subscribing to it.
-
-
Constructor Detail
-
OnSubscribePublishMulticast
public OnSubscribePublishMulticast(int prefetch, boolean delayError)Constructor, initializes the fields- Parameters:
prefetch- the prefetch amount, > 0 requireddelayError- delay the error delivery after the normal items?- Throws:
java.lang.IllegalArgumentException- if prefetch <= 0
-
-
Method Detail
-
call
public void call(Subscriber<? super T> t)
-
onNext
public void onNext(T t)
Description copied from interface:ObserverProvides the Observer with a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onCompleted()orObserver.onError(java.lang.Throwable).
-
onError
public void onError(java.lang.Throwable e)
Description copied from interface:ObserverNotifies the Observer that theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onCompleted().
-
onCompleted
public void onCompleted()
Description copied from interface:ObserverNotifies the Observer that theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable).- Specified by:
onCompletedin interfaceObserver<T>
-
setProducer
void setProducer(Producer p)
Sets the main producer and issues the prefetch amount.- Parameters:
p- the producer to set
-
drain
void drain()
The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.The execution of the drain-loop is guaranteed to be thread-safe.
-
checkTerminated
boolean checkTerminated(boolean d, boolean empty)Given the current source state, terminates all child subscribers.- Parameters:
d- the source-done indicatorempty- the queue-emptiness indicator- Returns:
- true if the class reached its terminal state
-
terminate
OnSubscribePublishMulticast.PublishProducer<T>[] terminate()
Atomically swaps in the terminated state.- Returns:
- the last set of subscribers before the state change or an empty array
-
add
boolean add(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically adds the given wrapper of a child Subscriber to the subscribers array.- Parameters:
inner- the wrapper- Returns:
- true if successful, false if the terminal state has been reached in the meantime
-
remove
void remove(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically removes the given wrapper, if present, from the subscribers array.- Parameters:
inner- the wrapper to remove
-
subscriber
public Subscriber<T> subscriber()
Returns the input subscriber of this class that must be subscribed to the upstream source.- Returns:
- the subscriber instance
-
unsubscribe
public void unsubscribe()
Description copied from interface:SubscriptionStops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.This allows unregistering an
Subscriberbefore it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribein interfaceSubscription
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:SubscriptionIndicates whether thisSubscriptionis currently unsubscribed.- Specified by:
isUnsubscribedin interfaceSubscription- Returns:
trueif thisSubscriptionis currently unsubscribed,falseotherwise
-
-