Package rx.internal.util
Class IndexedRingBuffer<E>
- java.lang.Object
-
- rx.internal.util.IndexedRingBuffer<E>
-
- Type Parameters:
E-
- All Implemented Interfaces:
Subscription
public final class IndexedRingBuffer<E> extends java.lang.Object implements Subscription
Add/Remove without object allocation (after initial construction).This is meant for hundreds or single-digit thousands of elements that need to be rapidly added and randomly or sequentially removed while avoiding object allocation.
On Intel Core i7, 2.3Mhz, Mac Java 8:
- adds per second single-threaded => ~32,598,500 for 100 - adds per second single-threaded => ~23,200,000 for 10,000 - adds + removes per second single-threaded => 15,562,100 for 100 - adds + removes per second single-threaded => 8,760,000 for 10,000
Benchmark (size) Mode Samples Score Score error Units r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263571.721 9856.994 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1763.417 211.998 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139850.115 17143.705 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 809.982 72.931 ops/s
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classIndexedRingBuffer.ElementSection<E>(package private) static classIndexedRingBuffer.IndexSection
-
Field Summary
Fields Modifier and Type Field Description (package private) static intdefaultSizeprivate IndexedRingBuffer.ElementSection<E>elements(package private) java.util.concurrent.atomic.AtomicIntegerindexprivate static ObjectPool<IndexedRingBuffer<?>>POOLprivate IndexedRingBuffer.IndexSectionremoved(package private) java.util.concurrent.atomic.AtomicIntegerremovedIndex(package private) static intSIZE
-
Constructor Summary
Constructors Constructor Description IndexedRingBuffer()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description intadd(E e)Add an element and return the index where it was added to allow removal.intforEach(Func1<? super E,java.lang.Boolean> action)intforEach(Func1<? super E,java.lang.Boolean> action, int startIndex)Loop through each element in the buffer and call a specific function.private intforEach(Func1<? super E,java.lang.Boolean> action, int startIndex, int endIndex)private IndexedRingBuffer.ElementSection<E>getElementSection(int index)private intgetIndexForAdd()private intgetIndexFromPreviouslyRemoved()Returns -1 if nothing, 0 or greater if the index should be usedprivate IndexedRingBuffer.IndexSectiongetIndexSection(int index)static <T> IndexedRingBuffer<T>getInstance()booleanisUnsubscribed()Indicates whether thisSubscriptionis currently unsubscribed.private voidpushRemovedIndex(int elementIndex)voidreleaseToPool()This resets the arrays, nulls out references and returns it to the pool.Eremove(int index)voidunsubscribe()Stops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.
-
-
-
Field Detail
-
elements
private final IndexedRingBuffer.ElementSection<E> elements
-
removed
private final IndexedRingBuffer.IndexSection removed
-
index
final java.util.concurrent.atomic.AtomicInteger index
-
removedIndex
final java.util.concurrent.atomic.AtomicInteger removedIndex
-
POOL
private static final ObjectPool<IndexedRingBuffer<?>> POOL
-
defaultSize
static int defaultSize
-
SIZE
static final int SIZE
-
-
Method Detail
-
getInstance
public static <T> IndexedRingBuffer<T> getInstance()
-
releaseToPool
public void releaseToPool()
This resets the arrays, nulls out references and returns it to the pool. This extra CPU cost is far smaller than the object allocation cost of not pooling.
-
unsubscribe
public void unsubscribe()
Description copied from interface:SubscriptionStops the receipt of notifications on theSubscriberthat was registered when this Subscription was received.This allows unregistering an
Subscriberbefore it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribein interfaceSubscription
-
add
public int add(E e)
Add an element and return the index where it was added to allow removal.- Parameters:
e- the element to add- Returns:
- the index where the element was added
-
remove
public E remove(int index)
-
getIndexSection
private IndexedRingBuffer.IndexSection getIndexSection(int index)
-
getElementSection
private IndexedRingBuffer.ElementSection<E> getElementSection(int index)
-
getIndexForAdd
private int getIndexForAdd()
-
getIndexFromPreviouslyRemoved
private int getIndexFromPreviouslyRemoved()
Returns -1 if nothing, 0 or greater if the index should be used- Returns:
-
pushRemovedIndex
private void pushRemovedIndex(int elementIndex)
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:SubscriptionIndicates whether thisSubscriptionis currently unsubscribed.- Specified by:
isUnsubscribedin interfaceSubscription- Returns:
trueif thisSubscriptionis currently unsubscribed,falseotherwise
-
forEach
public int forEach(Func1<? super E,java.lang.Boolean> action, int startIndex)
Loop through each element in the buffer and call a specific function.- Parameters:
action- that processes each item and returns true if it wants to continue to the nextstartIndex- at which index the loop should start- Returns:
- int of next index to process, or last index seen if it exited early
-
-