001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.backends.jeb;
028 import org.opends.messages.Message;
029
030 import org.opends.server.types.*;
031
032 import java.util.ArrayList;
033 import java.util.TimerTask;
034 import java.util.Timer;
035 import java.util.concurrent.CopyOnWriteArrayList;
036 import java.util.concurrent.locks.ReentrantLock;
037
038 import com.sleepycat.je.DatabaseException;
039 import com.sleepycat.je.StatsConfig;
040 import com.sleepycat.je.EnvironmentStats;
041
042 import static org.opends.server.loggers.ErrorLogger.logError;
043 import static org.opends.server.loggers.debug.DebugLogger.*;
044 import org.opends.server.loggers.debug.DebugTracer;
045 import org.opends.server.core.DirectoryServer;
046 import static org.opends.messages.JebMessages.
047 ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED;
048 import static org.opends.messages.JebMessages.
049 NOTE_JEB_REBUILD_PROGRESS_REPORT;
050 import static org.opends.messages.JebMessages.
051 NOTE_JEB_REBUILD_FINAL_STATUS;
052 import static org.opends.messages.JebMessages.
053 NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT;
054 import static org.opends.messages.JebMessages.
055 ERR_JEB_REBUILD_INDEX_CONFLICT;
056 import static org.opends.messages.JebMessages.
057 NOTE_JEB_REBUILD_START;
058 import static org.opends.messages.JebMessages.
059 ERR_JEB_VLV_INDEX_NOT_CONFIGURED;
060 /**
061 * Runs a index rebuild process on the backend. Each index selected for rebuild
062 * will be done from scratch by first clearing out the database for that index.
063 * Different threads will be used to rebuild each index.
064 * The rebuild process can run concurrently with the backend online and
065 * performing write and read operations. However, during the rebuild process,
066 * other reader and writer activeThreads might notice inconsistencies in index
067 * databases being rebuilt. They can safely ignore these inconsistencies as long
068 * as a rebuild is in progress.
069 */
070 public class RebuildJob
071 {
072 /**
073 * The tracer object for the debug logger.
074 */
075 private static final DebugTracer TRACER = getTracer();
076
077 /**
078 * The rebuild configuraiton.
079 */
080 private RebuildConfig rebuildConfig;
081
082 /**
083 * The root container used for the verify job.
084 */
085 private RootContainer rootContainer;
086
087 /**
088 * The number of milliseconds between job progress reports.
089 */
090 private long progressInterval = 10000;
091
092 /**
093 * The waiting rebuild threads created to process the rebuild.
094 */
095 private CopyOnWriteArrayList<IndexRebuildThread> waitingThreads =
096 new CopyOnWriteArrayList<IndexRebuildThread>();
097
098 /**
099 * The active rebuild threads created to process the rebuild.
100 */
101 private CopyOnWriteArrayList<IndexRebuildThread> activeThreads =
102 new CopyOnWriteArrayList<IndexRebuildThread>();
103
104 /**
105 * The completed rebuild threads used to process the rebuild.
106 */
107 private CopyOnWriteArrayList<IndexRebuildThread> completedThreads =
108 new CopyOnWriteArrayList<IndexRebuildThread>();
109
110 /**
111 * Rebuild jobs currently running.
112 */
113 private static CopyOnWriteArrayList<RebuildJob> rebuildJobs =
114 new CopyOnWriteArrayList<RebuildJob>();
115
116 /**
117 * A mutex that will be used to provide threadsafe access to methods changing
118 * the set of currently running rebuild jobs.
119 */
120 private static ReentrantLock jobsMutex = new ReentrantLock();
121
122 /**
123 * This class reports progress of the rebuild job at fixed intervals.
124 */
125 class ProgressTask extends TimerTask
126 {
127 /**
128 * The number of records that had been processed at the time of the
129 * previous progress report.
130 */
131 private long previousProcessed = 0;
132
133 /**
134 * The time in milliseconds of the previous progress report.
135 */
136 private long previousTime;
137
138 /**
139 * The environment statistics at the time of the previous report.
140 */
141 private EnvironmentStats prevEnvStats;
142
143 /**
144 * The number of bytes in a megabyte.
145 * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
146 */
147 private static final int bytesPerMegabyte = 1024*1024;
148 /**
149 * Create a new verify progress task.
150 * @throws DatabaseException An error occurred while accessing the JE
151 * database.
152 */
153 public ProgressTask() throws DatabaseException
154 {
155 previousTime = System.currentTimeMillis();
156 prevEnvStats =
157 rootContainer.getEnvironmentStats(new StatsConfig());
158 }
159
160 /**
161 * The action to be performed by this timer task.
162 */
163 public void run()
164 {
165 long latestTime = System.currentTimeMillis();
166 long deltaTime = latestTime - previousTime;
167
168 if (deltaTime == 0)
169 {
170 return;
171 }
172
173 long totalEntries = 0;
174 long latestProcessed = 0;
175
176 ArrayList<IndexRebuildThread> allThreads =
177 new ArrayList<IndexRebuildThread>(waitingThreads);
178 allThreads.addAll(activeThreads);
179 allThreads.addAll(completedThreads);
180
181 for(IndexRebuildThread thread : allThreads)
182 {
183 try
184 {
185 totalEntries += thread.getTotalEntries();
186 latestProcessed += thread.getProcessedEntries();
187
188 if(debugEnabled())
189 {
190 TRACER.debugVerbose("Rebuild thread %s stats: total %d " +
191 "processed %d rebuilt %d duplicated %d skipped %d",
192 thread.getTotalEntries(), thread.getProcessedEntries(),
193 thread.getRebuiltEntries(),
194 thread.getDuplicatedEntries(),
195 thread.getSkippedEntries());
196 }
197 }
198 catch(Exception e)
199 {
200 if(debugEnabled())
201 {
202 TRACER.debugCaught(DebugLogLevel.ERROR, e);
203 }
204 }
205 }
206
207 long deltaCount = (latestProcessed - previousProcessed);
208 float rate = 1000f*deltaCount / deltaTime;
209 float completed = 0;
210 if(totalEntries > 0)
211 {
212 completed = 100f*latestProcessed / totalEntries;
213 }
214
215 Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get(
216 completed, latestProcessed, totalEntries, rate);
217 logError(message);
218
219 try
220 {
221 Runtime runtime = Runtime.getRuntime();
222 long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
223
224 EnvironmentStats envStats =
225 rootContainer.getEnvironmentStats(new StatsConfig());
226 long nCacheMiss =
227 envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
228
229 float cacheMissRate = 0;
230 if (deltaCount > 0)
231 {
232 cacheMissRate = nCacheMiss/(float)deltaCount;
233 }
234
235 message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(
236 freeMemory, cacheMissRate);
237 logError(message);
238
239 prevEnvStats = envStats;
240 }
241 catch (DatabaseException e)
242 {
243 if (debugEnabled())
244 {
245 TRACER.debugCaught(DebugLogLevel.ERROR, e);
246 }
247 }
248
249
250 previousProcessed = latestProcessed;
251 previousTime = latestTime;
252 }
253 }
254
255 /**
256 * Construct a new rebuild job.
257 *
258 * @param rebuildConfig The configuration to use for this rebuild job.
259 */
260 public RebuildJob(RebuildConfig rebuildConfig)
261 {
262 this.rebuildConfig = rebuildConfig;
263 }
264
265 private static void addJob(RebuildJob job)
266 throws DatabaseException, JebException
267 {
268 //Make sure there are no running rebuild jobs
269 jobsMutex.lock();
270
271 try
272 {
273 for(RebuildJob otherJob : rebuildJobs)
274 {
275 String conflictIndex =
276 job.rebuildConfig.checkConflicts(otherJob.rebuildConfig);
277 if(conflictIndex != null)
278 {
279 if(debugEnabled())
280 {
281 TRACER.debugError("Conflit detected. This job config: %s, " +
282 "That job config: %s.",
283 job.rebuildConfig, otherJob.rebuildConfig);
284 }
285
286 Message msg = ERR_JEB_REBUILD_INDEX_CONFLICT.get(conflictIndex);
287 throw new JebException(msg);
288 }
289 }
290
291 //No conflicts are found. Add the job to the list of currently running
292 // jobs.
293 rebuildJobs.add(job);
294 }
295 finally
296 {
297 jobsMutex.unlock();
298 }
299 }
300
301 private static void removeJob(RebuildJob job)
302 {
303 jobsMutex.lock();
304
305 rebuildJobs.remove(job);
306
307 jobsMutex.unlock();
308 }
309
310 /**
311 * Initiate the rebuild process on a backend.
312 *
313 * @param rootContainer The root container to rebuild in.
314 * @throws DirectoryException If an error occurs during the rebuild process.
315 * @throws DatabaseException If a JE database error occurs during the rebuild
316 * process.
317 * @throws JebException If a JE database error occurs during the rebuild
318 * process.
319 */
320 public void rebuildBackend(RootContainer rootContainer)
321 throws DirectoryException, DatabaseException, JebException
322 {
323 //TODO: Add check for only performing internal indexType rebuilds when
324 // backend is offline.
325
326 addJob(this);
327
328 try
329 {
330 this.rootContainer = rootContainer;
331 EntryContainer entryContainer =
332 rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
333
334 ArrayList<String> rebuildList = rebuildConfig.getRebuildList();
335
336 if(!rebuildList.isEmpty())
337 {
338
339 for (String index : rebuildList)
340 {
341 IndexRebuildThread rebuildThread;
342 String lowerName = index.toLowerCase();
343 if (lowerName.equals("dn2id"))
344 {
345 rebuildThread = new IndexRebuildThread(entryContainer,
346 IndexRebuildThread.IndexType.DN2ID);
347 }
348 else if (lowerName.equals("dn2uri"))
349 {
350 rebuildThread = new IndexRebuildThread(entryContainer,
351 IndexRebuildThread.IndexType.DN2URI);
352 }
353 else if (lowerName.equals("id2children"))
354 {
355 rebuildThread = new IndexRebuildThread(entryContainer,
356 IndexRebuildThread.IndexType.ID2CHILDREN);
357 }
358 else if (lowerName.equals("id2subtree"))
359 {
360 rebuildThread = new IndexRebuildThread(entryContainer,
361 IndexRebuildThread.IndexType.ID2SUBTREE);
362 }
363 else if (lowerName.startsWith("vlv."))
364 {
365 if(lowerName.length() < 5)
366 {
367 Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
368 throw new JebException(msg);
369 }
370
371 VLVIndex vlvIndex =
372 entryContainer.getVLVIndex(lowerName.substring(4));
373 if(vlvIndex == null)
374 {
375 Message msg =
376 ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName.substring(4));
377 throw new JebException(msg);
378 }
379
380 rebuildThread = new IndexRebuildThread(entryContainer, vlvIndex);
381 }
382 else
383 {
384 String[] attrIndexParts = lowerName.split("\\.");
385 if(attrIndexParts.length <= 0)
386 {
387 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
388 throw new JebException(msg);
389 }
390
391 AttributeType attrType =
392 DirectoryServer.getAttributeType(attrIndexParts[0]);
393
394 if (attrType == null)
395 {
396 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
397 throw new JebException(msg);
398 }
399 AttributeIndex attrIndex =
400 entryContainer.getAttributeIndex(attrType);
401 if (attrIndex == null)
402 {
403 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
404 throw new JebException(msg);
405 }
406
407 if(attrIndexParts.length > 1)
408 {
409 Index partialAttrIndex = null;
410 if(attrIndexParts[1].equals("presence"))
411 {
412 partialAttrIndex = attrIndex.presenceIndex;
413 }
414 else if(attrIndexParts[1].equals("equality"))
415 {
416 partialAttrIndex = attrIndex.equalityIndex;
417 }
418 else if(attrIndexParts[1].equals("substring"))
419 {
420 partialAttrIndex = attrIndex.substringIndex;
421 }
422 else if(attrIndexParts[1].equals("ordering"))
423 {
424 partialAttrIndex = attrIndex.orderingIndex;
425 }
426 else if(attrIndexParts[1].equals("approximate"))
427 {
428 partialAttrIndex = attrIndex.approximateIndex;
429 }
430
431 if(partialAttrIndex == null)
432 {
433 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
434 throw new JebException(msg);
435 }
436
437 rebuildThread =
438 new IndexRebuildThread(entryContainer, partialAttrIndex);
439 }
440 else
441 {
442 rebuildThread = new IndexRebuildThread(entryContainer,
443 attrIndex);
444 }
445 }
446
447 waitingThreads.add(rebuildThread);
448
449 if(debugEnabled())
450 {
451 TRACER.debugInfo("Created rebuild thread %s",
452 rebuildThread.getName());
453 }
454 }
455
456 //Log a start message.
457 long totalToProcess = 0;
458
459 for(IndexRebuildThread thread : waitingThreads)
460 {
461 totalToProcess += thread.getTotalEntries();
462 }
463
464 StringBuilder sb = new StringBuilder();
465 for(String index : rebuildList)
466 {
467 if(sb.length() > 0)
468 {
469 sb.append(", ");
470 }
471 sb.append(index);
472 }
473 Message message =
474 NOTE_JEB_REBUILD_START.get(sb.toString(), totalToProcess);
475 logError(message);
476
477 // Make a note of the time we started.
478 long startTime = System.currentTimeMillis();
479
480 // Start a timer for the progress report.
481 Timer timer = new Timer();
482 TimerTask progressTask = new ProgressTask();
483 timer.scheduleAtFixedRate(progressTask, progressInterval,
484 progressInterval);
485
486 entryContainer.exclusiveLock.lock();
487 try
488 {
489 for(IndexRebuildThread thread : waitingThreads)
490 {
491 thread.clearDatabase();
492 }
493 }
494 finally
495 {
496 if(!rebuildConfig.includesSystemIndex())
497 {
498 entryContainer.exclusiveLock.unlock();
499 }
500 }
501
502
503 if(!rebuildConfig.includesSystemIndex())
504 {
505 entryContainer.sharedLock.lock();
506 }
507 try
508 {
509 while(!waitingThreads.isEmpty())
510 {
511 dispatchThreads();
512 joinThreads();
513 }
514 }
515 finally
516 {
517 timer.cancel();
518 if(rebuildConfig.includesSystemIndex())
519 {
520 entryContainer.exclusiveLock.unlock();
521 }
522 else
523 {
524 entryContainer.sharedLock.unlock();
525 }
526 }
527
528 long totalProcessed = 0;
529 long totalRebuilt = 0;
530 long totalDuplicated = 0;
531 long totalSkipped = 0;
532
533 for(IndexRebuildThread thread : completedThreads)
534 {
535 totalProcessed += thread.getProcessedEntries();
536 totalRebuilt += thread.getRebuiltEntries();
537 totalDuplicated += thread.getDuplicatedEntries();
538 totalSkipped += thread.getSkippedEntries();
539 }
540
541 long finishTime = System.currentTimeMillis();
542 long totalTime = (finishTime - startTime);
543
544 float rate = 0;
545 if (totalTime > 0)
546 {
547 rate = 1000f*totalProcessed / totalTime;
548 }
549
550 message = NOTE_JEB_REBUILD_FINAL_STATUS.get(
551 totalProcessed, totalTime/1000, rate);
552 logError(message);
553
554 if(debugEnabled())
555 {
556 TRACER.debugInfo("Detailed overall rebuild job stats: rebuilt %d, " +
557 "duplicated %d, skipped %d",
558 totalRebuilt, totalDuplicated, totalSkipped);
559 }
560 }
561 }
562 finally
563 {
564 removeJob(this);
565 }
566
567 }
568
569 /**
570 * Dispatch a set of threads based on their dependency and ordering.
571 */
572 private void dispatchThreads() throws DatabaseException
573 {
574 for(IndexRebuildThread t : waitingThreads)
575 {
576 boolean start = true;
577
578 //Check to see if we have exceeded the max number of threads to use at
579 //one time.
580 if(rebuildConfig.getMaxRebuildThreads() > 0 &&
581 activeThreads.size() > rebuildConfig.getMaxRebuildThreads())
582 {
583 if(debugEnabled())
584 {
585 TRACER.debugInfo("Delaying the start of thread %s because " +
586 "the max number of rebuild threads has been reached.");
587 }
588 start = false;
589 }
590
591 /**
592 * We may need to start the threads in stages since the rebuild process
593 * of some index types (id2children, id2subtree) depends on another
594 * index being rebuilt to be completed first.
595 */
596 if(t.getIndexType() == IndexRebuildThread.IndexType.ID2CHILDREN ||
597 t.getIndexType() == IndexRebuildThread.IndexType.ID2SUBTREE)
598 {
599 //Check to see if we have any waiting threads that needs to go
600 //first
601 for(IndexRebuildThread t2 : waitingThreads)
602 {
603 if(t2.getIndexType() == IndexRebuildThread.IndexType.DN2ID ||
604 t2.getIndexType() == IndexRebuildThread.IndexType.DN2URI)
605 {
606 //We gotta wait for these to start before running the
607 //rebuild on ID2CHILDREN or ID2SUBTREE
608
609 if(debugEnabled())
610 {
611 TRACER.debugInfo("Delaying the start of thread %s because " +
612 "it depends on another index rebuilt to " +
613 "go first.", t.getName());
614 }
615 start = false;
616 break;
617 }
618 }
619
620 //Check to see if we have any active threads that needs to
621 //finish first
622 for(IndexRebuildThread t3 : activeThreads)
623 {
624 if(t3.getIndexType() == IndexRebuildThread.IndexType.DN2ID ||
625 t3.getIndexType() == IndexRebuildThread.IndexType.DN2URI)
626 {
627 //We gotta wait for these to start before running the
628 //rebuild on ID2CHILDREN or ID2SUBTREE
629
630 if(debugEnabled())
631 {
632 TRACER.debugInfo("Delaying the start of thread %s because " +
633 "it depends on another index being rebuilt to " +
634 "finish.", t.getName());
635 }
636 start = false;
637 break;
638 }
639 }
640 }
641
642 if(start)
643 {
644 if(debugEnabled())
645 {
646 TRACER.debugInfo("Starting rebuild thread %s.", t.getName());
647 }
648 waitingThreads.remove(t);
649 activeThreads.add(t);
650 t.start();
651 }
652 }
653 }
654
655 /**
656 * Wait for all worker activeThreads to exit.
657 */
658 private void joinThreads()
659 {
660 for (IndexRebuildThread t : activeThreads)
661 {
662 try
663 {
664 t.join();
665
666 if(debugEnabled())
667 {
668 TRACER.debugInfo("Rebuild thread %s finished.", t.getName());
669 }
670 activeThreads.remove(t);
671 completedThreads.add(t);
672 }
673 catch (InterruptedException ie)
674 {
675 // No action needed?
676 }
677 }
678 }
679 }