Class ConcurrentMergeScheduler
- java.lang.Object
-
- org.apache.lucene.index.MergeScheduler
-
- org.apache.lucene.index.ConcurrentMergeScheduler
-
- All Implemented Interfaces:
java.io.Closeable,java.lang.AutoCloseable
public class ConcurrentMergeScheduler extends MergeScheduler
AMergeSchedulerthat runs each merge using a separate thread.Specify the max number of threads that may run at once, and the maximum number of simultaneous merges with
setMaxMergesAndThreads(int, int).If the number of merges exceeds the max number of threads then the largest merges are paused until one of the smaller merges completes.
If more than
getMaxMergeCount()merges are requested then this class will forcefully throttle the incoming threads by pausing until one more merges complete.This class attempts to detect whether the index is on rotational storage (traditional hard drive) or not (e.g. solid-state disk) and changes the default max merge and thread count accordingly. This detection is currently Linux-only, and relies on the OS to put the right value into /sys/block/<dev>/block/rotational. For all other operating systems it currently assumes a rotational disk for backwards compatibility. To enable default settings for spinning or solid state disks for such operating systems, use
setDefaultMaxMergesAndThreads(boolean).
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classConcurrentMergeScheduler.MergeThreadRuns a merge thread to execute a single merge, then exits.-
Nested classes/interfaces inherited from class org.apache.lucene.index.MergeScheduler
MergeScheduler.MergeSource
-
-
Field Summary
Fields Modifier and Type Field Description static intAUTO_DETECT_MERGES_AND_THREADSDynamic default formaxThreadCountandmaxMergeCount, used to detect whether the index is backed by an SSD or rotational disk and setmaxThreadCountaccordingly.static java.lang.StringDEFAULT_CPU_CORE_COUNT_PROPERTYUsed for testing.static java.lang.StringDEFAULT_SPINS_PROPERTYUsed for testing.private booleandoAutoIOThrottletrue if we should rate-limit writes for each mergeprivate doubleforceMergeMBPerSecprivate static doubleMAX_MERGE_MB_PER_SECCeiling for IO write rate limit (we will never go any higher than this)private intmaxMergeCountprivate intmaxThreadCountprotected intmergeThreadCountHow manyConcurrentMergeScheduler.MergeThreads have kicked off (this is use to name them).protected java.util.List<ConcurrentMergeScheduler.MergeThread>mergeThreadsList of currently activeConcurrentMergeScheduler.MergeThreads.private static doubleMIN_BIG_MERGE_MBMerges below this size are not counted in the maxThreadCount, i.e.private static doubleMIN_MERGE_MB_PER_SECFloor for IO write rate limit (we will never go any lower than this)private static doubleSTART_MB_PER_SECInitial value for IO write rate limit when doAutoIOThrottle is trueprivate booleansuppressExceptionsprotected doubletargetMBPerSecCurrent IO writes throttle rate-
Fields inherited from class org.apache.lucene.index.MergeScheduler
infoStream
-
-
Constructor Summary
Constructors Constructor Description ConcurrentMergeScheduler()Sole constructor, with all settings set to default values.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description private static doublebytesToMB(long bytes)(package private) voidclearSuppressExceptions()Used for testingvoidclose()Close this MergeScheduler.voiddisableAutoIOThrottle()Turn off auto IO throttling.protected voiddoMerge(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge)Does the actual merge, by callingMergeScheduler.MergeSource.merge(org.apache.lucene.index.MergePolicy.OneMerge)protected voiddoStall()Called frommaybeStall(org.apache.lucene.index.MergeScheduler.MergeSource)to pause the calling thread for a bit.voidenableAutoIOThrottle()Turn on dynamic IO throttling, to adaptively rate limit writes bytes/sec to the minimal rate necessary so merges do not fall behind.booleangetAutoIOThrottle()Returns true if auto IO throttling is currently enabled.doublegetForceMergeMBPerSec()Get the per-merge IO throttle rate for forced merges.doublegetIORateLimitMBPerSec()Returns the currently set per-merge IO writes rate limit, ifenableAutoIOThrottle()was called, elseDouble.POSITIVE_INFINITY.intgetMaxMergeCount()intgetMaxThreadCount()ReturnsmaxThreadCount.protected ConcurrentMergeScheduler.MergeThreadgetMergeThread(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge)Create and return a new MergeThreadprotected voidhandleMergeException(java.lang.Throwable exc)Called when an exception is hit in a background merge threadprivate voidinitDynamicDefaults(Directory directory)(package private) voidinitialize(InfoStream infoStream, Directory directory)IndexWriter calls this on init.private booleanisBacklog(long now, MergePolicy.OneMerge merge)protected booleanmaybeStall(MergeScheduler.MergeSource mergeSource)This is invoked bymerge(org.apache.lucene.index.MergeScheduler.MergeSource, org.apache.lucene.index.MergeTrigger)to possibly stall the incoming thread when there are too many merges running or pending.voidmerge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger)Run the merges provided byMergeScheduler.MergeSource.getNextMerge().intmergeThreadCount()Returns the number of merge threads that are alive, ignoring the calling thread if it is a merge thread.private static doublensToSec(long ns)private static java.lang.StringrateToString(double mbPerSec)(package private) voidremoveMergeThread()Removes the calling thread from the active merge threads.(package private) voidrunOnMergeFinished(MergeScheduler.MergeSource mergeSource)voidsetDefaultMaxMergesAndThreads(boolean spins)Sets max merges and threads to proper defaults for rotational or non-rotational storage.voidsetForceMergeMBPerSec(double v)Set the per-merge IO throttle rate for forced merges (default:Double.POSITIVE_INFINITY).voidsetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)Expert: directly set the maximum number of merge threads and simultaneous merges allowed.(package private) voidsetSuppressExceptions()Used for testingvoidsync()Wait for any running merge threads to finish.protected voidtargetMBPerSecChanged()Subclass can override to tweak targetMBPerSec.java.lang.StringtoString()private voidupdateIOThrottle(MergePolicy.OneMerge newMerge, MergeRateLimiter rateLimiter)Tunes IO throttle when a new merge starts.protected voidupdateMergeThreads()Called whenever the running merges have changed, to set merge IO limits.DirectorywrapForMerge(MergePolicy.OneMerge merge, Directory in)Wraps the incomingDirectoryso that we can merge-throttle it usingRateLimitedIndexOutput.-
Methods inherited from class org.apache.lucene.index.MergeScheduler
message, verbose
-
-
-
-
Field Detail
-
AUTO_DETECT_MERGES_AND_THREADS
public static final int AUTO_DETECT_MERGES_AND_THREADS
Dynamic default formaxThreadCountandmaxMergeCount, used to detect whether the index is backed by an SSD or rotational disk and setmaxThreadCountaccordingly. If it's an SSD,maxThreadCountis set tomax(1, min(4, cpuCoreCount/2)), otherwise 1. Note that detection only currently works on Linux; other platforms will assume the index is not on an SSD.- See Also:
- Constant Field Values
-
DEFAULT_CPU_CORE_COUNT_PROPERTY
public static final java.lang.String DEFAULT_CPU_CORE_COUNT_PROPERTY
Used for testing.- See Also:
- Constant Field Values
-
DEFAULT_SPINS_PROPERTY
public static final java.lang.String DEFAULT_SPINS_PROPERTY
Used for testing.- See Also:
- Constant Field Values
-
mergeThreads
protected final java.util.List<ConcurrentMergeScheduler.MergeThread> mergeThreads
List of currently activeConcurrentMergeScheduler.MergeThreads.
-
maxThreadCount
private int maxThreadCount
-
maxMergeCount
private int maxMergeCount
-
mergeThreadCount
protected int mergeThreadCount
How manyConcurrentMergeScheduler.MergeThreads have kicked off (this is use to name them).
-
MIN_MERGE_MB_PER_SEC
private static final double MIN_MERGE_MB_PER_SEC
Floor for IO write rate limit (we will never go any lower than this)- See Also:
- Constant Field Values
-
MAX_MERGE_MB_PER_SEC
private static final double MAX_MERGE_MB_PER_SEC
Ceiling for IO write rate limit (we will never go any higher than this)- See Also:
- Constant Field Values
-
START_MB_PER_SEC
private static final double START_MB_PER_SEC
Initial value for IO write rate limit when doAutoIOThrottle is true- See Also:
- Constant Field Values
-
MIN_BIG_MERGE_MB
private static final double MIN_BIG_MERGE_MB
Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount).- See Also:
- Constant Field Values
-
targetMBPerSec
protected double targetMBPerSec
Current IO writes throttle rate
-
doAutoIOThrottle
private boolean doAutoIOThrottle
true if we should rate-limit writes for each merge
-
forceMergeMBPerSec
private double forceMergeMBPerSec
-
suppressExceptions
private boolean suppressExceptions
-
-
Method Detail
-
setMaxMergesAndThreads
public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)Expert: directly set the maximum number of merge threads and simultaneous merges allowed.- Parameters:
maxMergeCount- the max # simultaneous merges that are allowed. If a merge is necessary yet we already have this many threads running, the incoming thread (that is calling add/updateDocument) will block until a merge thread has completed. Note that we will only run the smallestmaxThreadCountmerges at a time.maxThreadCount- the max # simultaneous merge threads that should be running at once. This must be <=maxMergeCount
-
setDefaultMaxMergesAndThreads
public void setDefaultMaxMergesAndThreads(boolean spins)
Sets max merges and threads to proper defaults for rotational or non-rotational storage.- Parameters:
spins- true to set defaults best for traditional rotatational storage (spinning disks), else false (e.g. for solid-state disks)
-
setForceMergeMBPerSec
public void setForceMergeMBPerSec(double v)
Set the per-merge IO throttle rate for forced merges (default:Double.POSITIVE_INFINITY).
-
getForceMergeMBPerSec
public double getForceMergeMBPerSec()
Get the per-merge IO throttle rate for forced merges.
-
enableAutoIOThrottle
public void enableAutoIOThrottle()
Turn on dynamic IO throttling, to adaptively rate limit writes bytes/sec to the minimal rate necessary so merges do not fall behind. By default this is enabled.
-
disableAutoIOThrottle
public void disableAutoIOThrottle()
Turn off auto IO throttling.- See Also:
enableAutoIOThrottle()
-
getAutoIOThrottle
public boolean getAutoIOThrottle()
Returns true if auto IO throttling is currently enabled.
-
getIORateLimitMBPerSec
public double getIORateLimitMBPerSec()
Returns the currently set per-merge IO writes rate limit, ifenableAutoIOThrottle()was called, elseDouble.POSITIVE_INFINITY.
-
getMaxThreadCount
public int getMaxThreadCount()
ReturnsmaxThreadCount.- See Also:
setMaxMergesAndThreads(int, int)
-
getMaxMergeCount
public int getMaxMergeCount()
-
removeMergeThread
void removeMergeThread()
Removes the calling thread from the active merge threads.
-
wrapForMerge
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in)
Description copied from class:MergeSchedulerWraps the incomingDirectoryso that we can merge-throttle it usingRateLimitedIndexOutput.- Overrides:
wrapForMergein classMergeScheduler
-
updateMergeThreads
protected void updateMergeThreads()
Called whenever the running merges have changed, to set merge IO limits. This method sorts the merge threads by their merge size in descending order and then pauses/unpauses threads from first to last -- that way, smaller merges are guaranteed to run before larger ones.
-
initDynamicDefaults
private void initDynamicDefaults(Directory directory) throws java.io.IOException
- Throws:
java.io.IOException
-
rateToString
private static java.lang.String rateToString(double mbPerSec)
-
close
public void close()
Description copied from class:MergeSchedulerClose this MergeScheduler.- Specified by:
closein interfacejava.lang.AutoCloseable- Specified by:
closein interfacejava.io.Closeable- Specified by:
closein classMergeScheduler
-
sync
public void sync()
Wait for any running merge threads to finish. This call is not interruptible as used byclose().
-
mergeThreadCount
public int mergeThreadCount()
Returns the number of merge threads that are alive, ignoring the calling thread if it is a merge thread. Note that this number is ≤mergeThreadssize.
-
initialize
void initialize(InfoStream infoStream, Directory directory) throws java.io.IOException
Description copied from class:MergeSchedulerIndexWriter calls this on init.- Overrides:
initializein classMergeScheduler- Throws:
java.io.IOException
-
merge
public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws java.io.IOException
Description copied from class:MergeSchedulerRun the merges provided byMergeScheduler.MergeSource.getNextMerge().- Specified by:
mergein classMergeScheduler- Parameters:
mergeSource- theIndexWriterto obtain the merges from.trigger- theMergeTriggerthat caused this merge to happen- Throws:
java.io.IOException
-
maybeStall
protected boolean maybeStall(MergeScheduler.MergeSource mergeSource)
This is invoked bymerge(org.apache.lucene.index.MergeScheduler.MergeSource, org.apache.lucene.index.MergeTrigger)to possibly stall the incoming thread when there are too many merges running or pending. The default behavior is to force this thread, which is producing too many segments for merging to keep up, to wait until merges catch up. Applications that can take other less drastic measures, such as limiting how many threads are allowed to index, can do nothing here and throttle elsewhere. If this method wants to stall but the calling thread is a merge thread, it should return false to tell caller not to kick off any new merges.
-
doStall
protected void doStall()
Called frommaybeStall(org.apache.lucene.index.MergeScheduler.MergeSource)to pause the calling thread for a bit.
-
doMerge
protected void doMerge(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge) throws java.io.IOException
Does the actual merge, by callingMergeScheduler.MergeSource.merge(org.apache.lucene.index.MergePolicy.OneMerge)- Throws:
java.io.IOException
-
getMergeThread
protected ConcurrentMergeScheduler.MergeThread getMergeThread(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge) throws java.io.IOException
Create and return a new MergeThread- Throws:
java.io.IOException
-
runOnMergeFinished
void runOnMergeFinished(MergeScheduler.MergeSource mergeSource)
-
handleMergeException
protected void handleMergeException(java.lang.Throwable exc)
Called when an exception is hit in a background merge thread
-
setSuppressExceptions
void setSuppressExceptions()
Used for testing
-
clearSuppressExceptions
void clearSuppressExceptions()
Used for testing
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
isBacklog
private boolean isBacklog(long now, MergePolicy.OneMerge merge)
-
updateIOThrottle
private void updateIOThrottle(MergePolicy.OneMerge newMerge, MergeRateLimiter rateLimiter) throws java.io.IOException
Tunes IO throttle when a new merge starts.- Throws:
java.io.IOException
-
targetMBPerSecChanged
protected void targetMBPerSecChanged()
Subclass can override to tweak targetMBPerSec.
-
nsToSec
private static double nsToSec(long ns)
-
bytesToMB
private static double bytesToMB(long bytes)
-
-