001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.extensions;
028
029
030
031 import java.util.ArrayList;
032 import java.util.Iterator;
033 import java.util.LinkedList;
034 import java.util.List;
035 import java.util.concurrent.LinkedBlockingQueue;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicLong;
038
039 import org.opends.messages.Message;
040 import org.opends.server.admin.server.ConfigurationChangeListener;
041 import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
042 import org.opends.server.api.WorkQueue;
043 import org.opends.server.config.ConfigException;
044 import org.opends.server.core.DirectoryServer;
045 import org.opends.server.loggers.debug.DebugTracer;
046 import org.opends.server.monitors.TraditionalWorkQueueMonitor;
047 import org.opends.server.types.AbstractOperation;
048 import org.opends.server.types.CancelRequest;
049 import org.opends.server.types.ConfigChangeResult;
050 import org.opends.server.types.DebugLogLevel;
051 import org.opends.server.types.DirectoryException;
052 import org.opends.server.types.DN;
053 import org.opends.server.types.InitializationException;
054 import org.opends.server.types.Operation;
055 import org.opends.server.types.ResultCode;
056
057 import static org.opends.messages.ConfigMessages.*;
058 import static org.opends.messages.CoreMessages.*;
059 import static org.opends.server.loggers.ErrorLogger.*;
060 import static org.opends.server.loggers.debug.DebugLogger.*;
061
062
063
064 /**
065 * This class defines a data structure for storing and interacting with the
066 * Directory Server work queue.
067 */
068 public class TraditionalWorkQueue
069 extends WorkQueue<TraditionalWorkQueueCfg>
070 implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
071 {
072 /**
073 * The tracer object for the debug logger.
074 */
075 private static final DebugTracer TRACER = getTracer();
076
077
078
079
080 /**
081 * The maximum number of times to retry getting the next operation from the
082 * queue if an unexpected failure occurs.
083 */
084 private static final int MAX_RETRY_COUNT = 5;
085
086
087
088 // The set of worker threads that will be used to process this work queue.
089 private ArrayList<TraditionalWorkerThread> workerThreads;
090
091 // The number of operations that have been submitted to the work queue for
092 // processing.
093 private AtomicLong opsSubmitted;
094
095 // The number of times that an attempt to submit a new request has been
096 // rejected because the work queue is already at its maximum capacity.
097 private AtomicLong queueFullRejects;
098
099 // Indicates whether one or more of the worker threads needs to be killed at
100 // the next convenient opportunity.
101 private boolean killThreads;
102
103 // Indicates whether the Directory Server is shutting down.
104 private boolean shutdownRequested;
105
106 // The DN of the configuration entry with information to use to configure the
107 // work queue.
108 private DN configEntryDN;
109
110 // The thread number used for the last worker thread that was created.
111 private int lastThreadNumber;
112
113 // The maximum number of pending requests that this work queue will allow
114 // before it will start rejecting them.
115 private int maxCapacity;
116
117 // The number of worker threads that should be active (or will be shortly if
118 // a configuration change has not been completely applied).
119 private int numWorkerThreads;
120
121 // The queue that will be used to actually hold the pending operations.
122 private LinkedBlockingQueue<AbstractOperation> opQueue;
123
124 // The lock used to provide threadsafe access for the queue.
125 private Object queueLock;
126
127
128
129 /**
130 * Creates a new instance of this work queue. All initialization should be
131 * performed in the <CODE>initializeWorkQueue</CODE> method.
132 */
133 public TraditionalWorkQueue()
134 {
135 // No implementation should be performed here.
136 }
137
138
139
140 /**
141 * {@inheritDoc}
142 */
143 @Override()
144 public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
145 throws ConfigException, InitializationException
146 {
147 shutdownRequested = false;
148 killThreads = false;
149 opsSubmitted = new AtomicLong(0);
150 queueFullRejects = new AtomicLong(0);
151 queueLock = new Object();
152
153
154 // Register to be notified of any configuration changes.
155 configuration.addTraditionalChangeListener(this);
156
157
158 // Get the necessary configuration from the provided entry.
159 configEntryDN = configuration.dn();
160 numWorkerThreads = configuration.getNumWorkerThreads();
161 maxCapacity = configuration.getMaxWorkQueueCapacity();
162
163
164 // Create the actual work queue.
165 if (maxCapacity > 0)
166 {
167 opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
168 }
169 else
170 {
171 opQueue = new LinkedBlockingQueue<AbstractOperation>();
172 }
173
174
175 // Create the set of worker threads that should be used to service the
176 // work queue.
177 workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
178 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
179 lastThreadNumber++)
180 {
181 TraditionalWorkerThread t =
182 new TraditionalWorkerThread(this, lastThreadNumber);
183 t.start();
184 workerThreads.add(t);
185 }
186
187
188 // Create and register a monitor provider for the work queue.
189 try
190 {
191 TraditionalWorkQueueMonitor monitor =
192 new TraditionalWorkQueueMonitor(this);
193 monitor.initializeMonitorProvider(null);
194 monitor.start();
195 DirectoryServer.registerMonitorProvider(monitor);
196 }
197 catch (Exception e)
198 {
199 if (debugEnabled())
200 {
201 TRACER.debugCaught(DebugLogLevel.ERROR, e);
202 }
203
204 Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
205 String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
206 logError(message);
207 }
208 }
209
210
211
212 /**
213 * {@inheritDoc}
214 */
215 @Override()
216 public void finalizeWorkQueue(Message reason)
217 {
218 shutdownRequested = true;
219
220
221 // Send responses to any operations in the pending queue to indicate that
222 // they won't be processed because the server is shutting down.
223 CancelRequest cancelRequest = new CancelRequest(true, reason);
224 ArrayList<Operation> pendingOperations = new ArrayList<Operation>();
225 opQueue.removeAll(pendingOperations);
226 for (Operation o : pendingOperations)
227 {
228 try
229 {
230 // The operation has no chance of responding to the cancel
231 // request so avoid waiting for a cancel response.
232 if (o.getCancelResult() == null) {
233 o.abort(cancelRequest);
234 }
235 }
236 catch (Exception e)
237 {
238 if (debugEnabled())
239 {
240 TRACER.debugCaught(DebugLogLevel.ERROR, e);
241 }
242
243 logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
244 String.valueOf(o), String.valueOf(e)));
245 }
246 }
247
248
249 // Notify all the worker threads of the shutdown.
250 for (TraditionalWorkerThread t : workerThreads)
251 {
252 try
253 {
254 t.shutDown();
255 }
256 catch (Exception e)
257 {
258 if (debugEnabled())
259 {
260 TRACER.debugCaught(DebugLogLevel.ERROR, e);
261 }
262
263 logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
264 t.getName(), String.valueOf(e)));
265 }
266 }
267 }
268
269
270
271 /**
272 * Indicates whether this work queue has received a request to shut down.
273 *
274 * @return <CODE>true</CODE> if the work queue has recieved a request to shut
275 * down, or <CODE>false</CODE> if not.
276 */
277 public boolean shutdownRequested()
278 {
279 return shutdownRequested;
280 }
281
282
283
284 /**
285 * Submits an operation to be processed by one of the worker threads
286 * associated with this work queue.
287 *
288 * @param operation The operation to be processed.
289 *
290 * @throws DirectoryException If the provided operation is not accepted for
291 * some reason (e.g., if the server is shutting
292 * down or the pending operation queue is already
293 * at its maximum capacity).
294 */
295 public void submitOperation(AbstractOperation operation)
296 throws DirectoryException
297 {
298 if (shutdownRequested)
299 {
300 Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
301 throw new DirectoryException(ResultCode.UNAVAILABLE, message);
302 }
303
304 if (! opQueue.offer(operation))
305 {
306 queueFullRejects.incrementAndGet();
307
308 Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
309 throw new DirectoryException(ResultCode.UNAVAILABLE, message);
310 }
311
312 opsSubmitted.incrementAndGet();
313 }
314
315
316
317 /**
318 * Retrieves the next operation that should be processed by one of the worker
319 * threads, blocking if necessary until a new request arrives. This method
320 * should only be called by a worker thread associated with this work queue.
321 *
322 * @param workerThread The worker thread that is requesting the operation.
323 *
324 * @return The next operation that should be processed, or <CODE>null</CODE>
325 * if the server is shutting down and no more operations will be
326 * processed.
327 */
328 public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
329 {
330 return retryNextOperation(workerThread, 0);
331 }
332
333
334
335 /**
336 * Retrieves the next operation that should be processed by one of the worker
337 * threads following a previous failure attempt. A maximum of five
338 * consecutive failures will be allowed before returning <CODE>null</CODE>,
339 * which will cause the associated thread to exit.
340 *
341 * @param workerThread The worker thread that is requesting the operation.
342 * @param numFailures The number of consecutive failures that the worker
343 * thread has experienced so far. If this gets too
344 * high, then this method will return <CODE>null</CODE>
345 * rather than retrying.
346 *
347 * @return The next operation that should be processed, or <CODE>null</CODE>
348 * if the server is shutting down and no more operations will be
349 * processed, or if there have been too many consecutive failures.
350 */
351 private AbstractOperation retryNextOperation(
352 TraditionalWorkerThread workerThread,
353 int numFailures)
354 {
355 // See if we should kill off this thread. This could be necessary if the
356 // number of worker threads has been decreased with the server online. If
357 // so, then return null and the thread will exit.
358 if (killThreads)
359 {
360 synchronized (queueLock)
361 {
362 try
363 {
364 int currentThreads = workerThreads.size();
365 if (currentThreads > numWorkerThreads)
366 {
367 if (workerThreads.remove(Thread.currentThread()))
368 {
369 currentThreads--;
370 }
371
372 if (currentThreads <= numWorkerThreads)
373 {
374 killThreads = false;
375 }
376
377 workerThread.setStoppedByReducedThreadNumber();
378 return null;
379 }
380 }
381 catch (Exception e)
382 {
383 if (debugEnabled())
384 {
385 TRACER.debugCaught(DebugLogLevel.ERROR, e);
386 }
387 }
388 }
389 }
390
391 if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
392 {
393 if (numFailures > MAX_RETRY_COUNT)
394 {
395 Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
396 Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
397 logError(message);
398 }
399
400 return null;
401 }
402
403 try
404 {
405 while (true)
406 {
407 AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
408 if (nextOperation == null)
409 {
410 // There was no work to do in the specified length of time. See if
411 // we should shutdown, and if not then just check again.
412 if (shutdownRequested)
413 {
414 return null;
415 }
416 else if (killThreads)
417 {
418 synchronized (queueLock)
419 {
420 try
421 {
422 int currentThreads = workerThreads.size();
423 if (currentThreads > numWorkerThreads)
424 {
425 if (workerThreads.remove(Thread.currentThread()))
426 {
427 currentThreads--;
428 }
429
430 if (currentThreads <= numWorkerThreads)
431 {
432 killThreads = false;
433 }
434
435 workerThread.setStoppedByReducedThreadNumber();
436 return null;
437 }
438 }
439 catch (Exception e)
440 {
441 if (debugEnabled())
442 {
443 TRACER.debugCaught(DebugLogLevel.ERROR, e);
444 }
445 }
446 }
447 }
448 }
449 else
450 {
451 return nextOperation;
452 }
453 }
454 }
455 catch (InterruptedException ie)
456 {
457 // This is somewhat expected so don't log.
458 // assert debugException(CLASS_NAME, "retryNextOperation", ie);
459
460 // If this occurs, then the worker thread must have been interrupted for
461 // some reason. This could be because the Directory Server is shutting
462 // down, in which case we should return null.
463 if (shutdownRequested)
464 {
465 return null;
466 }
467
468 // If we've gotten here, then the worker thread was interrupted for some
469 // other reason. This should not happen, and we need to log a message.
470 logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
471 Thread.currentThread().getName(), String.valueOf(ie)));
472 return retryNextOperation(workerThread, numFailures+1);
473 }
474 catch (Exception e)
475 {
476 if (debugEnabled())
477 {
478 TRACER.debugCaught(DebugLogLevel.ERROR, e);
479 }
480
481 // This should not happen. The only recourse we have is to log a message
482 // and try again.
483 logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
484 Thread.currentThread().getName(), String.valueOf(e)));
485 return retryNextOperation(workerThread, numFailures + 1);
486 }
487 }
488
489
490
491 /**
492 * Attempts to remove the specified operation from this queue if it has not
493 * yet been picked up for processing by one of the worker threads.
494 *
495 * @param operation The operation to remove from the queue.
496 *
497 * @return <CODE>true</CODE> if the provided request was present in the queue
498 * and was removed successfully, or <CODE>false</CODE> it not.
499 */
500 public boolean removeOperation(AbstractOperation operation)
501 {
502 return opQueue.remove(operation);
503 }
504
505
506
507 /**
508 * Retrieves the total number of operations that have been successfully
509 * submitted to this work queue for processing since server startup. This
510 * does not include operations that have been rejected for some reason like
511 * the queue already at its maximum capacity.
512 *
513 * @return The total number of operations that have been successfully
514 * submitted to this work queue since startup.
515 */
516 public long getOpsSubmitted()
517 {
518 return opsSubmitted.longValue();
519 }
520
521
522
523 /**
524 * Retrieves the total number of operations that have been rejected because
525 * the work queue was already at its maximum capacity.
526 *
527 * @return The total number of operations that have been rejected because the
528 * work queue was already at its maximum capacity.
529 */
530 public long getOpsRejectedDueToQueueFull()
531 {
532 return queueFullRejects.longValue();
533 }
534
535
536
537 /**
538 * Retrieves the number of pending operations in the queue that have not yet
539 * been picked up for processing. Note that this method is not a
540 * constant-time operation and can be relatively inefficient, so it should be
541 * used sparingly.
542 *
543 * @return The number of pending operations in the queue that have not yet
544 * been picked up for processing.
545 */
546 public int size()
547 {
548 return opQueue.size();
549 }
550
551
552
553 /**
554 * {@inheritDoc}
555 */
556 public boolean isConfigurationChangeAcceptable(
557 TraditionalWorkQueueCfg configuration,
558 List<Message> unacceptableReasons)
559 {
560 // The provided configuration will always be acceptable.
561 return true;
562 }
563
564
565
566 /**
567 * {@inheritDoc}
568 */
569 public ConfigChangeResult applyConfigurationChange(
570 TraditionalWorkQueueCfg configuration)
571 {
572 ArrayList<Message> resultMessages = new ArrayList<Message>();
573 int newNumThreads = configuration.getNumWorkerThreads();
574 int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
575
576
577 // Apply a change to the number of worker threads if appropriate.
578 int currentThreads = workerThreads.size();
579 if (newNumThreads != currentThreads)
580 {
581 synchronized (queueLock)
582 {
583 try
584 {
585 int threadsToAdd = newNumThreads - currentThreads;
586 if (threadsToAdd > 0)
587 {
588 for (int i=0; i < threadsToAdd; i++)
589 {
590 TraditionalWorkerThread t =
591 new TraditionalWorkerThread(this, lastThreadNumber++);
592 workerThreads.add(t);
593 t.start();
594 }
595
596 killThreads = false;
597 }
598 else
599 {
600 killThreads = true;
601 }
602
603 numWorkerThreads = newNumThreads;
604 }
605 catch (Exception e)
606 {
607 if (debugEnabled())
608 {
609 TRACER.debugCaught(DebugLogLevel.ERROR, e);
610 }
611 }
612 }
613 }
614
615
616 // Apply a change to the maximum capacity if appropriate. Since we can't
617 // change capacity on the fly, then we'll have to create a new queue and
618 // transfer any remaining items into it. Any thread that is waiting on the
619 // original queue will time out after at most a few seconds and further
620 // checks will be against the new queue.
621 if (newMaxCapacity != maxCapacity)
622 {
623 synchronized (queueLock)
624 {
625 try
626 {
627 LinkedBlockingQueue<AbstractOperation> newOpQueue;
628 if (newMaxCapacity > 0)
629 {
630 newOpQueue =
631 new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
632 }
633 else
634 {
635 newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
636 }
637
638 LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
639 opQueue = newOpQueue;
640
641 LinkedList<AbstractOperation> pendingOps =
642 new LinkedList<AbstractOperation>();
643 oldOpQueue.drainTo(pendingOps);
644
645
646 // We have to be careful when adding any existing pending operations
647 // because the new capacity could be less than what was already
648 // backlogged in the previous queue. If that happens, we may have to
649 // loop a few times to get everything in there.
650 while (! pendingOps.isEmpty())
651 {
652 Iterator<AbstractOperation> iterator = pendingOps.iterator();
653 while (iterator.hasNext())
654 {
655 AbstractOperation o = iterator.next();
656 try
657 {
658 if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
659 {
660 iterator.remove();
661 }
662 }
663 catch (InterruptedException ie)
664 {
665 if (debugEnabled())
666 {
667 TRACER.debugCaught(DebugLogLevel.ERROR, ie);
668 }
669 }
670 }
671 }
672
673 maxCapacity = newMaxCapacity;
674 }
675 catch (Exception e)
676 {
677 if (debugEnabled())
678 {
679 TRACER.debugCaught(DebugLogLevel.ERROR, e);
680 }
681 }
682 }
683 }
684
685
686 return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
687 }
688
689
690
691 /**
692 * {@inheritDoc}
693 */
694 @Override()
695 public boolean isIdle()
696 {
697 if (opQueue.size() > 0)
698 {
699 return false;
700 }
701
702 synchronized (queueLock)
703 {
704 for (TraditionalWorkerThread t : workerThreads)
705 {
706 if (t.isActive())
707 {
708 return false;
709 }
710 }
711
712 return true;
713 }
714 }
715 }
716