Package rx.internal.util
Class RxRingBuffer
- java.lang.Object
-
- rx.internal.util.RxRingBuffer
-
- All Implemented Interfaces:
Subscription
public class RxRingBuffer extends java.lang.Object implements Subscription
This assumes Spsc or Spmc usage. This means only a single producer calling the on* methods. This is the Rx contract of an Observer (see http://reactivex.io/documentation/contract.html). Concurrent invocations of on* methods will not be thread-safe.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) static intdefaultSize128 was chosen as the default based on the numbers below.private static NotificationLite<java.lang.Object>ONQueue implementation testing that led to current choices of data structures: With synchronized LinkedListprivate ObjectPool<java.util.Queue<java.lang.Object>>poolprivate java.util.Queue<java.lang.Object>queueprivate intsizestatic intSIZEstatic ObjectPool<java.util.Queue<java.lang.Object>>SPMC_POOLstatic ObjectPool<java.util.Queue<java.lang.Object>>SPSC_POOLjava.lang.ObjectterminalStateWe store the terminal state separately so it doesn't count against the size.
-
Constructor Summary
Constructors Modifier Constructor Description (package private)RxRingBuffer()privateRxRingBuffer(java.util.Queue<java.lang.Object> queue, int size)privateRxRingBuffer(ObjectPool<java.util.Queue<java.lang.Object>> pool, int size)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description booleanaccept(java.lang.Object o, Observer child)java.lang.ThrowableasError(java.lang.Object o)intavailable()intcapacity()intcount()static RxRingBuffergetSpmcInstance()static RxRingBuffergetSpscInstance()java.lang.ObjectgetValue(java.lang.Object o)booleanisCompleted(java.lang.Object o)booleanisEmpty()booleanisError(java.lang.Object o)booleanisUnsubscribed()Indicates whether thisSubscriptionis currently unsubscribed.voidonCompleted()voidonError(java.lang.Throwable t)voidonNext(java.lang.Object o)java.lang.Objectpeek()java.lang.Objectpoll()voidrelease()voidunsubscribe()Stops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.
-
-
-
Field Detail
-
ON
private static final NotificationLite<java.lang.Object> ON
Queue implementation testing that led to current choices of data structures: With synchronized LinkedListBenchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 19118392.046 1002814.238 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17891.641 252.747 ops/s With MpscPaddedQueue (single consumer, so failing 1 unit test) Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 22164483.238 3035027.348 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 23154.303 602.548 ops/s With ConcurrentLinkedQueue (tracking count separately) Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 17353906.092 378756.411 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 19224.411 1010.610 ops/s With ConcurrentLinkedQueue (using queue.size() method for count) Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s With SynchronizedQueue (synchronized LinkedList ... no object pooling) r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 22907359.257 707026.632 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 22222.410 320.829 ops/s With ArrayBlockingQueue Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 2389804.664 68990.804 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27384.274 1411.789 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26497037.559 91176.247 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.144 237.771 ops/s With ArrayBlockingQueue and Object Pool Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 12465685.522 399070.770 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27701.294 395.217 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26399625.086 695639.436 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.427 253.190 ops/s With SpscArrayQueue (single consumer, so failing 1 unit test) - requires access to Unsafe Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 1922996.035 49183.766 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 70890.186 1382.550 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 80637811.605 3509706.954 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 71822.453 4127.660 ops/s With SpscArrayQueue and Object Pool (object pool improves createUseAndDestroy1 by 10x) Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 25220069.264 1329078.785 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 72313.457 3535.447 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 81863840.884 2191416.069 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 73140.822 1528.764 ops/s With SpmcArrayQueue - requires access to Unsafe Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 27630345.474 769219.142 ops/s r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 80052.046 4059.541 ops/s r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 44449524.222 563068.793 ops/s r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 65231.253 1805.732 ops/s With SpmcArrayQueue and ObjectPool (object pool improves createUseAndDestroy1 by 10x) Benchmark Mode Samples Score Score error Units r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 18489343.061 1011872.825 ops/s r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 46416.434 1439.144 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove thrpt 5 38280945.847 1071801.279 ops/s r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 42337.663 1052.231 ops/s -------------- When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations.
-
queue
private java.util.Queue<java.lang.Object> queue
-
size
private final int size
-
pool
private final ObjectPool<java.util.Queue<java.lang.Object>> pool
-
terminalState
public volatile java.lang.Object terminalState
We store the terminal state separately so it doesn't count against the size. We don't just +1 the size since some of the queues require sizes that are a power of 2. This is a subjective thing ... wanting to keep the size (ie 1024) the actual number of onNext that can be sent rather than something like 1023 onNext + 1 terminal event. It also simplifies checking that we have received only 1 terminal event, as we don't need to peek at the last item or retain a boolean flag.
-
defaultSize
static int defaultSize
128 was chosen as the default based on the numbers below. A stream processing system may benefit from increasing to 512+../gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*' 1024 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 100642.874 24676.478 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4095.901 90.730 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 9.797 4.982 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 15536155.489 758579.454 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 156257.341 6324.176 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 157.099 7.143 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16864.641 1826.877 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 4269.317 169.480 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.393 1.047 ops/s 512 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 98945.980 48050.282 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4111.149 95.987 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 12.483 3.067 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16032469.143 620157.818 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157997.290 5097.718 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 156.462 7.728 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 15813.984 8260.170 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 4358.334 251.609 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.647 0.613 ops/s 256 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 108489.834 2688.489 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4526.674 728.019 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 13.372 0.457 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16435113.709 311602.627 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157611.204 13146.108 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 158.346 2.500 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16976.775 968.191 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 6238.210 2060.387 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.465 0.566 ops/s 128 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 106887.027 29307.913 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 6713.891 202.989 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 11.929 0.187 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16055774.724 350633.068 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 153403.821 17976.156 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 153.559 20.178 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 17172.274 236.816 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 7073.555 595.990 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 11.855 1.093 ops/s 32 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 106128.589 20986.201 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 6396.607 73.627 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 7.643 0.668 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16012419.447 409004.521 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157907.001 5772.849 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 155.308 23.853 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16927.513 606.692 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 5191.084 244.876 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 8.288 0.217 ops/s 16 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 109974.741 839.064 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4538.912 173.561 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 5.420 0.111 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16017466.785 768748.695 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157934.065 13479.575 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 155.922 17.781 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 14903.686 3325.205 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 3784.776 1054.131 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 5.624 0.130 ops/s 2 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 112663.216 899.005 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 899.737 9.460 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 0.999 0.100 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16087325.336 783206.227 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 156747.025 4880.489 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 156.645 3.810 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 15958.711 673.895 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 884.624 47.692 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 1.173 0.100 ops/s
-
SIZE
public static final int SIZE
-
SPSC_POOL
public static final ObjectPool<java.util.Queue<java.lang.Object>> SPSC_POOL
-
SPMC_POOL
public static final ObjectPool<java.util.Queue<java.lang.Object>> SPMC_POOL
-
-
Constructor Detail
-
RxRingBuffer
private RxRingBuffer(java.util.Queue<java.lang.Object> queue, int size)
-
RxRingBuffer
private RxRingBuffer(ObjectPool<java.util.Queue<java.lang.Object>> pool, int size)
-
RxRingBuffer
RxRingBuffer()
-
-
Method Detail
-
getSpscInstance
public static RxRingBuffer getSpscInstance()
-
getSpmcInstance
public static RxRingBuffer getSpmcInstance()
-
release
public void release()
-
unsubscribe
public void unsubscribe()
Description copied from interface:SubscriptionStops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.This allows unregistering an
Subscriberbefore it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribein interfaceSubscription
-
onNext
public void onNext(java.lang.Object o) throws MissingBackpressureException- Parameters:
o-- Throws:
MissingBackpressureException- if more onNext are sent than have been requested
-
onCompleted
public void onCompleted()
-
onError
public void onError(java.lang.Throwable t)
-
available
public int available()
-
capacity
public int capacity()
-
count
public int count()
-
isEmpty
public boolean isEmpty()
-
poll
public java.lang.Object poll()
-
peek
public java.lang.Object peek()
-
isCompleted
public boolean isCompleted(java.lang.Object o)
-
isError
public boolean isError(java.lang.Object o)
-
getValue
public java.lang.Object getValue(java.lang.Object o)
-
accept
public boolean accept(java.lang.Object o, Observer child)
-
asError
public java.lang.Throwable asError(java.lang.Object o)
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:SubscriptionIndicates whether thisSubscriptionis currently unsubscribed.- Specified by:
isUnsubscribedin interfaceSubscription- Returns:
trueif thisSubscriptionis currently unsubscribed,falseotherwise
-
-