Package org.jgroups.protocols
Class TransferQueueBundler2
- java.lang.Object
-
- org.jgroups.protocols.TransferQueueBundler2
-
- All Implemented Interfaces:
java.lang.Runnable,Bundler
public class TransferQueueBundler2 extends java.lang.Object implements Bundler, java.lang.Runnable
This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send messages immediately when no other messages are available. https://issues.redhat.com/browse/JGRP-1540
The difference toTransferQueueBundleris that a size is maintainedper destination
and we maintain byte arrays of max_bundle_size per destination into which we marshall a message directly when it is sent.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static classTransferQueueBundler2.Buffer
-
Field Summary
Fields Modifier and Type Field Description protected AverageMinMaxavg_fill_countprotected java.lang.Threadbundler_threadprotected intcapacityprotected Loglogprotected intmax_sizeMaximum number of bytes for messages to be queued until they are sent.protected java.util.Map<Address,TransferQueueBundler2.Buffer>messagesprotected static NullAddressNILprotected longnum_sends_because_full_queueprotected longnum_sends_because_no_msgsprotected longpoll_timeoutprotected java.util.concurrent.BlockingQueue<Message>queueprotected java.util.List<Message>remove_queueprotected booleanrunningprotected static java.lang.StringTHREAD_NAMEprotected TPtransport
-
Constructor Summary
Constructors Modifier Constructor Description TransferQueueBundler2()TransferQueueBundler2(int capacity)protectedTransferQueueBundler2(java.util.concurrent.BlockingQueue<Message> queue)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidaddAndSendIfSizeExceeded(Message msg)protected static intassertPositive(int value, java.lang.String message)protected voiddrain()Takes all messages from the queue, adds them to the hashmap and then sends all bundled messagesjava.lang.Stringdump()intgetCapacity()If the bundler implementation supports a capacity (e.g.intgetMaxSize()Maximum number of bytes for messages to be queued until they are sentintgetQueueSize()If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.java.lang.ThreadgetThread()protected booleanhasMessages()voidinit(TP transport)Called after creation of the bundlerintremoveQueueSize()TransferQueueBundler2removeQueueSize(int size)voidresetStats()voidrun()voidsend(Message msg)protected voidsendBundledMessages()BundlersetCapacity(int c)BundlersetMaxSize(int s)intsize()The number of unsent messages in the bundlervoidstart()Called afterBundler.init(TP)voidstop()voidviewChange(View view)
-
-
-
Field Detail
-
max_size
protected int max_size
Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest datagram packet size in case of UDP
-
capacity
protected int capacity
-
poll_timeout
protected long poll_timeout
-
transport
protected TP transport
-
log
protected Log log
-
queue
protected java.util.concurrent.BlockingQueue<Message> queue
-
remove_queue
protected java.util.List<Message> remove_queue
-
bundler_thread
protected volatile java.lang.Thread bundler_thread
-
running
protected volatile boolean running
-
num_sends_because_full_queue
protected long num_sends_because_full_queue
-
num_sends_because_no_msgs
protected long num_sends_because_no_msgs
-
avg_fill_count
protected final AverageMinMax avg_fill_count
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
messages
protected final java.util.Map<Address,TransferQueueBundler2.Buffer> messages
-
NIL
protected static final NullAddress NIL
-
-
Constructor Detail
-
TransferQueueBundler2
public TransferQueueBundler2()
-
TransferQueueBundler2
protected TransferQueueBundler2(java.util.concurrent.BlockingQueue<Message> queue)
-
TransferQueueBundler2
public TransferQueueBundler2(int capacity)
-
-
Method Detail
-
getThread
public java.lang.Thread getThread()
-
getCapacity
public int getCapacity()
Description copied from interface:BundlerIf the bundler implementation supports a capacity (e.g.RingBufferBundler, then return it, else return -1- Specified by:
getCapacityin interfaceBundler
-
setCapacity
public Bundler setCapacity(int c)
-
getMaxSize
public int getMaxSize()
Description copied from interface:BundlerMaximum number of bytes for messages to be queued until they are sent- Specified by:
getMaxSizein interfaceBundler
-
setMaxSize
public Bundler setMaxSize(int s)
- Specified by:
setMaxSizein interfaceBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:BundlerIf the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSizein interfaceBundler
-
removeQueueSize
public int removeQueueSize()
-
removeQueueSize
public TransferQueueBundler2 removeQueueSize(int size)
-
dump
public java.lang.String dump()
-
init
public void init(TP transport)
Description copied from interface:BundlerCalled after creation of the bundler
-
resetStats
public void resetStats()
- Specified by:
resetStatsin interfaceBundler
-
viewChange
public void viewChange(View view)
- Specified by:
viewChangein interfaceBundler
-
start
public void start()
Description copied from interface:BundlerCalled afterBundler.init(TP)
-
size
public int size()
Description copied from interface:BundlerThe number of unsent messages in the bundler
-
send
public void send(Message msg) throws java.lang.Exception
-
run
public void run()
- Specified by:
runin interfacejava.lang.Runnable
-
hasMessages
protected boolean hasMessages()
-
addAndSendIfSizeExceeded
protected void addAndSendIfSizeExceeded(Message msg)
-
sendBundledMessages
protected void sendBundledMessages()
-
drain
protected void drain()
Takes all messages from the queue, adds them to the hashmap and then sends all bundled messages
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-