001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027
028 package org.opends.server.backends.jeb.importLDIF;
029
030 import org.opends.server.types.*;
031 import org.opends.server.loggers.debug.DebugTracer;
032 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034 import static org.opends.server.loggers.ErrorLogger.logError;
035 import org.opends.server.admin.std.server.LocalDBBackendCfg;
036 import org.opends.server.util.LDIFReader;
037 import org.opends.server.util.StaticUtils;
038 import org.opends.server.util.LDIFException;
039 import org.opends.server.util.RuntimeInformation;
040 import static org.opends.server.util.DynamicConstants.BUILD_ID;
041 import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
042 import org.opends.server.config.ConfigException;
043 import org.opends.server.core.DirectoryServer;
044 import org.opends.server.backends.jeb.*;
045 import org.opends.server.protocols.asn1.ASN1OctetString;
046 import org.opends.messages.Message;
047 import org.opends.messages.JebMessages;
048 import static org.opends.messages.JebMessages.*;
049 import java.util.concurrent.CopyOnWriteArrayList;
050 import java.util.concurrent.LinkedBlockingQueue;
051 import java.util.concurrent.TimeUnit;
052 import java.util.*;
053 import java.io.IOException;
054
055 import com.sleepycat.je.*;
056
057 /**
058 * Performs a LDIF import.
059 */
060
061 public class Importer implements Thread.UncaughtExceptionHandler {
062
063
064 /**
065 * The tracer object for the debug logger.
066 */
067 private static final DebugTracer TRACER = getTracer();
068
069 /**
070 * The JE backend configuration.
071 */
072 private LocalDBBackendCfg config;
073
074 /**
075 * The root container used for this import job.
076 */
077 private RootContainer rootContainer;
078
079 /**
080 * The LDIF import configuration.
081 */
082 private LDIFImportConfig ldifImportConfig;
083
084 /**
085 * The LDIF reader.
086 */
087 private LDIFReader reader;
088
089 /**
090 * Map of base DNs to their import context.
091 */
092 private LinkedHashMap<DN, DNContext> importMap =
093 new LinkedHashMap<DN, DNContext>();
094
095
096 /**
097 * The number of entries migrated.
098 */
099 private int migratedCount;
100
101 /**
102 * The number of entries imported.
103 */
104 private int importedCount;
105
106 /**
107 * The number of milliseconds between job progress reports.
108 */
109 private long progressInterval = 10000;
110
111 /**
112 * The progress report timer.
113 */
114 private Timer timer;
115
116 //Thread array.
117 private CopyOnWriteArrayList<WorkThread> threads;
118
119 //Progress task.
120 private ProgressTask pTask;
121
122 //Number of entries import before checking if cleaning is needed after
123 //eviction has been detected.
124 private static final int entryCleanInterval = 250000;
125
126 //Minimum buffer amount to give to a buffer manager.
127 private static final long minBuffer = 1024 * 1024;
128
129 //Total available memory for the buffer managers.
130 private long totalAvailBufferMemory = 0;
131
132 //Memory size to be used for the DB cache in string format.
133 private String dbCacheSizeStr;
134
135 //Used to do an initial clean after eviction has been detected.
136 private boolean firstClean=false;
137
138 //A thread threw an Runtime exception stop the import.
139 private boolean unCaughtExceptionThrown = false;
140
141 /**
142 * Create a new import job with the specified ldif import config.
143 *
144 * @param ldifImportConfig The LDIF import config.
145 */
146 public Importer(LDIFImportConfig ldifImportConfig)
147 {
148 this.ldifImportConfig = ldifImportConfig;
149 this.threads = new CopyOnWriteArrayList<WorkThread>();
150 calcMemoryLimits();
151 }
152
153 /**
154 * Start the worker threads.
155 *
156 * @throws DatabaseException If a DB problem occurs.
157 */
158 private void startWorkerThreads()
159 throws DatabaseException {
160
161 int importThreadCount = config.getImportThreadCount();
162 //Figure out how much buffer memory to give to each context.
163 int contextCount = importMap.size();
164 long memoryPerContext = totalAvailBufferMemory / contextCount;
165 //Below min, use the min value.
166 if(memoryPerContext < minBuffer) {
167 Message msg =
168 NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
169 minBuffer);
170 logError(msg);
171 memoryPerContext = minBuffer;
172 }
173 // Create one set of worker threads/buffer managers for each base DN.
174 for (DNContext context : importMap.values()) {
175 BufferManager bufferManager = new BufferManager(memoryPerContext,
176 importThreadCount);
177 context.setBufferManager(bufferManager);
178 for (int i = 0; i < importThreadCount; i++) {
179 WorkThread t = new WorkThread(context.getWorkQueue(), i,
180 bufferManager, rootContainer);
181 t.setUncaughtExceptionHandler(this);
182 threads.add(t);
183 t.start();
184 }
185 }
186 // Start a timer for the progress report.
187 timer = new Timer();
188 TimerTask progressTask = new ProgressTask();
189 //Used to get at extra functionality such as eviction detected.
190 pTask = (ProgressTask) progressTask;
191 timer.scheduleAtFixedRate(progressTask, progressInterval,
192 progressInterval);
193
194 }
195
196
197 /**
198 * Import a ldif using the specified root container.
199 *
200 * @param rootContainer The root container.
201 * @return A LDIF result.
202 * @throws DatabaseException If a DB error occurs.
203 * @throws IOException If a IO error occurs.
204 * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
205 * @throws DirectoryException If a directory error occurs.
206 * @throws ConfigException If a configuration has an error.
207 */
208 public LDIFImportResult processImport(RootContainer rootContainer)
209 throws DatabaseException, IOException, JebException, DirectoryException,
210 ConfigException {
211
212 // Create an LDIF reader. Throws an exception if the file does not exist.
213 reader = new LDIFReader(ldifImportConfig);
214 this.rootContainer = rootContainer;
215 this.config = rootContainer.getConfiguration();
216
217 Message message;
218 long startTime;
219 try {
220 int importThreadCount = config.getImportThreadCount();
221 message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
222 BUILD_ID, REVISION_NUMBER);
223 logError(message);
224 message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
225 logError(message);
226 RuntimeInformation.logInfo();
227 for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
228 DNContext DNContext = getImportContext(entryContainer);
229 if(DNContext != null) {
230 importMap.put(entryContainer.getBaseDN(), DNContext);
231 }
232 }
233 // Make a note of the time we started.
234 startTime = System.currentTimeMillis();
235 startWorkerThreads();
236 try {
237 importedCount = 0;
238 migratedCount = 0;
239 migrateExistingEntries();
240 processLDIF();
241 migrateExcludedEntries();
242 } finally {
243 if(!unCaughtExceptionThrown) {
244 cleanUp();
245 switchContainers();
246 }
247 }
248 }
249 finally {
250 reader.close();
251 }
252 importProlog(startTime);
253 return new LDIFImportResult(reader.getEntriesRead(),
254 reader.getEntriesRejected(),
255 reader.getEntriesIgnored());
256 }
257
258 /**
259 * Switch containers if the migrated entries were written to the temporary
260 * container.
261 *
262 * @throws DatabaseException If a DB problem occurs.
263 * @throws JebException If a JEB problem occurs.
264 */
265 private void switchContainers() throws DatabaseException, JebException {
266
267 for(DNContext importContext : importMap.values()) {
268 DN baseDN = importContext.getBaseDN();
269 EntryContainer srcEntryContainer =
270 importContext.getSrcEntryContainer();
271 if(srcEntryContainer != null) {
272 if (debugEnabled()) {
273 TRACER.debugInfo("Deleteing old entry container for base DN " +
274 "%s and renaming temp entry container", baseDN);
275 }
276 EntryContainer unregEC =
277 rootContainer.unregisterEntryContainer(baseDN);
278 //Make sure the unregistered EC for the base DN is the same as
279 //the one in the import context.
280 if(unregEC != srcEntryContainer) {
281 if(debugEnabled()) {
282 TRACER.debugInfo("Current entry container used for base DN " +
283 "%s is not the same as the source entry container used " +
284 "during the migration process.", baseDN);
285 }
286 rootContainer.registerEntryContainer(baseDN, unregEC);
287 continue;
288 }
289 srcEntryContainer.lock();
290 srcEntryContainer.delete();
291 srcEntryContainer.unlock();
292 EntryContainer newEC = importContext.getEntryContainer();
293 newEC.lock();
294 newEC.setDatabasePrefix(baseDN.toNormalizedString());
295 newEC.unlock();
296 rootContainer.registerEntryContainer(baseDN, newEC);
297 }
298 }
299 }
300
301 /**
302 * Create and log messages at the end of the successful import.
303 *
304 * @param startTime The time the import started.
305 */
306 private void importProlog(long startTime) {
307 Message message;
308 long finishTime = System.currentTimeMillis();
309 long importTime = (finishTime - startTime);
310
311 float rate = 0;
312 if (importTime > 0)
313 {
314 rate = 1000f*importedCount / importTime;
315 }
316
317 message = NOTE_JEB_IMPORT_FINAL_STATUS.
318 get(reader.getEntriesRead(), importedCount,
319 reader.getEntriesIgnored(), reader.getEntriesRejected(),
320 migratedCount, importTime/1000, rate);
321 logError(message);
322
323 message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
324 getEntryLimitExceededCount());
325 logError(message);
326
327 }
328
329
330 /**
331 * Run the cleaner if it is needed.
332 *
333 * @param entriesRead The number of entries read so far.
334 * @param evictEntryNumber The number of entries to run the cleaner after
335 * being read.
336 * @throws DatabaseException If a DB problem occurs.
337 */
338 private void
339 runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
340 throws DatabaseException {
341 if(!firstClean || (entriesRead % evictEntryNumber) == 0) {
342 //Make sure work queue is empty before starting.
343 drainWorkQueue();
344 Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get();
345 runCleaner(msg);
346 if(!firstClean) {
347 firstClean=true;
348 }
349 }
350 }
351
352 /**
353 * Run the cleaner, pausing the task thread output.
354 *
355 * @param header Message to be printed before cleaning.
356 * @throws DatabaseException If a DB problem occurs.
357 */
358 private void runCleaner(Message header) throws DatabaseException {
359 Message msg;
360 long startTime = System.currentTimeMillis();
361 //Need to force a checkpoint.
362 rootContainer.importForceCheckPoint();
363 logError(header);
364 pTask.setPause(true);
365 //Actually clean the files.
366 int cleaned = rootContainer.cleanedLogFiles();
367 //This checkpoint removes the files if any were cleaned.
368 if(cleaned > 0) {
369 msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
370 logError(msg);
371 rootContainer.importForceCheckPoint();
372 }
373 pTask.setPause(false);
374 long finishTime = System.currentTimeMillis();
375 long cleanTime = (finishTime - startTime) / 1000;
376 msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
377 logError(msg);
378 }
379
380 /**
381 * Process a LDIF reader.
382 *
383 * @throws JebException If a JEB problem occurs.
384 * @throws DatabaseException If a DB problem occurs.
385 * @throws IOException If an IO exception occurs.
386 */
387 private void
388 processLDIF() throws JebException, DatabaseException, IOException {
389 Message message = NOTE_JEB_IMPORT_LDIF_START.get();
390 logError(message);
391 do {
392 if (ldifImportConfig.isCancelled()) {
393 break;
394 }
395 if(threads.size() <= 0) {
396 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
397 throw new JebException(message);
398 }
399 if(unCaughtExceptionThrown) {
400 abortImport();
401 }
402 try {
403 // Read the next entry.
404 Entry entry = reader.readEntry();
405 // Check for end of file.
406 if (entry == null) {
407 message = NOTE_JEB_IMPORT_LDIF_END.get();
408 logError(message);
409
410 break;
411 }
412 // Route it according to base DN.
413 DNContext DNContext = getImportConfig(entry.getDN());
414 processEntry(DNContext, entry);
415 //If the progress task has noticed eviction proceeding, start running
416 //the cleaner.
417 if(pTask.isEvicting()) {
418 runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
419 }
420 } catch (LDIFException e) {
421 if (debugEnabled()) {
422 TRACER.debugCaught(DebugLogLevel.ERROR, e);
423 }
424 } catch (DirectoryException e) {
425 if (debugEnabled()) {
426 TRACER.debugCaught(DebugLogLevel.ERROR, e);
427 }
428 } catch (DatabaseException e) {
429 if (debugEnabled()) {
430 TRACER.debugCaught(DebugLogLevel.ERROR, e);
431 }
432 }
433 } while (true);
434 }
435
436 /**
437 * Process an entry using the specified import context.
438 *
439 * @param DNContext The import context.
440 * @param entry The entry to process.
441 */
442 private void processEntry(DNContext DNContext, Entry entry) {
443 //Add this DN to the pending map.
444 DNContext.addPending(entry.getDN());
445 addEntryQueue(DNContext, entry);
446 }
447
448 /**
449 * Add work item to specified import context's queue.
450 * @param context The import context.
451 * @param item The work item to add.
452 * @return <CODE>True</CODE> if the the work item was added to the queue.
453 */
454 private boolean
455 addQueue(DNContext context, WorkElement item) {
456 try {
457 while(!context.getWorkQueue().offer(item, 1000,
458 TimeUnit.MILLISECONDS)) {
459 if(threads.size() <= 0) {
460 // All worker threads died. We must stop now.
461 return false;
462 }
463 }
464 } catch (InterruptedException e) {
465 if (debugEnabled()) {
466 TRACER.debugCaught(DebugLogLevel.ERROR, e);
467 }
468 }
469 return true;
470 }
471
472
473 /**
474 * Wait until the work queue is empty.
475 */
476 private void drainWorkQueue() {
477 if(threads.size() > 0) {
478 for (DNContext context : importMap.values()) {
479 while (context.getWorkQueue().size() > 0) {
480 try {
481 Thread.sleep(100);
482 } catch (Exception e) {
483 // No action needed.
484 }
485 }
486 }
487 }
488 }
489
490 private void abortImport() throws JebException {
491 //Stop work threads telling them to skip substring flush.
492 stopWorkThreads(false);
493 timer.cancel();
494 Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
495 throw new JebException(message);
496 }
497
498 /**
499 * Stop work threads.
500 *
501 * @param abort <CODE>True</CODE> if stop work threads was called from an
502 * abort.
503 * @throws JebException if a Jeb error occurs.
504 */
505 private void
506 stopWorkThreads(boolean abort) throws JebException {
507 for (WorkThread t : threads) {
508 t.stopProcessing();
509 }
510 // Wait for each thread to stop.
511 for (WorkThread t : threads) {
512 try {
513 if(!abort && unCaughtExceptionThrown) {
514 timer.cancel();
515 Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
516 throw new JebException(message);
517 }
518 t.join();
519 importedCount += t.getImportedCount();
520 } catch (InterruptedException ie) {
521 // No action needed?
522 }
523 }
524 }
525
526 /**
527 * Clean up after a successful import.
528 *
529 * @throws DatabaseException If a DB error occurs.
530 * @throws JebException If a Jeb error occurs.
531 */
532 private void cleanUp() throws DatabaseException, JebException {
533 Message msg;
534 //Drain the work queue.
535 drainWorkQueue();
536 pTask.setPause(true);
537 long startTime = System.currentTimeMillis();
538 stopWorkThreads(true);
539 //Flush the buffer managers.
540 for(DNContext context : importMap.values()) {
541 context.getBufferManager().prepareFlush();
542 context.getBufferManager().flushAll();
543 }
544 long finishTime = System.currentTimeMillis();
545 long flushTime = (finishTime - startTime) / 1000;
546 msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
547 logError(msg);
548 timer.cancel();
549 for(DNContext context : importMap.values()) {
550 context.setIndexesTrusted();
551 }
552 msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
553 //Run the cleaner.
554 runCleaner(msg);
555 }
556
557 /**
558 * Uncaught exception handler.
559 *
560 * @param t The thread working when the exception was thrown.
561 * @param e The exception.
562 */
563 public void uncaughtException(Thread t, Throwable e) {
564 unCaughtExceptionThrown = true;
565 threads.remove(t);
566 Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
567 t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
568 logError(msg);
569 }
570
571 /**
572 * Get the entry limit exceeded counts from the indexes.
573 *
574 * @return Count of the index with entry limit exceeded values.
575 */
576 private int getEntryLimitExceededCount() {
577 int count = 0;
578 for (DNContext ic : importMap.values())
579 {
580 count += ic.getEntryContainer().getEntryLimitExceededCount();
581 }
582 return count;
583 }
584
585 /**
586 * Return an import context related to the specified DN.
587 * @param dn The dn.
588 * @return An import context.
589 * @throws DirectoryException If an directory error occurs.
590 */
591 private DNContext getImportConfig(DN dn) throws DirectoryException {
592 DNContext DNContext = null;
593 DN nodeDN = dn;
594
595 while (DNContext == null && nodeDN != null) {
596 DNContext = importMap.get(nodeDN);
597 if (DNContext == null)
598 {
599 nodeDN = nodeDN.getParentDNInSuffix();
600 }
601 }
602
603 if (nodeDN == null) {
604 // The entry should not have been given to this backend.
605 Message message =
606 JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
607 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
608 }
609
610 return DNContext;
611 }
612
613 /**
614 * Creates an import context for the specified entry container.
615 *
616 * @param entryContainer The entry container.
617 * @return Import context to use during import.
618 * @throws DatabaseException If a database error occurs.
619 * @throws JebException If a JEB error occurs.
620 * @throws ConfigException If a configuration contains error.
621 */
622 private DNContext getImportContext(EntryContainer entryContainer)
623 throws DatabaseException, JebException, ConfigException {
624 DN baseDN = entryContainer.getBaseDN();
625 EntryContainer srcEntryContainer = null;
626 List<DN> includeBranches = new ArrayList<DN>();
627 List<DN> excludeBranches = new ArrayList<DN>();
628
629 if(!ldifImportConfig.appendToExistingData() &&
630 !ldifImportConfig.clearBackend())
631 {
632 for(DN dn : ldifImportConfig.getExcludeBranches())
633 {
634 if(baseDN.equals(dn))
635 {
636 // This entire base DN was explicitly excluded. Skip.
637 return null;
638 }
639 if(baseDN.isAncestorOf(dn))
640 {
641 excludeBranches.add(dn);
642 }
643 }
644
645 if(!ldifImportConfig.getIncludeBranches().isEmpty())
646 {
647 for(DN dn : ldifImportConfig.getIncludeBranches())
648 {
649 if(baseDN.isAncestorOf(dn))
650 {
651 includeBranches.add(dn);
652 }
653 }
654
655 if(includeBranches.isEmpty())
656 {
657 // There are no branches in the explicitly defined include list under
658 // this base DN. Skip this base DN alltogether.
659
660 return null;
661 }
662
663 // Remove any overlapping include branches.
664 Iterator<DN> includeBranchIterator = includeBranches.iterator();
665 while(includeBranchIterator.hasNext())
666 {
667 DN includeDN = includeBranchIterator.next();
668 boolean keep = true;
669 for(DN dn : includeBranches)
670 {
671 if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
672 {
673 keep = false;
674 break;
675 }
676 }
677 if(!keep)
678 {
679 includeBranchIterator.remove();
680 }
681 }
682
683 // Remvoe any exclude branches that are not are not under a include
684 // branch since they will be migrated as part of the existing entries
685 // outside of the include branches anyways.
686 Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
687 while(excludeBranchIterator.hasNext())
688 {
689 DN excludeDN = excludeBranchIterator.next();
690 boolean keep = false;
691 for(DN includeDN : includeBranches)
692 {
693 if(includeDN.isAncestorOf(excludeDN))
694 {
695 keep = true;
696 break;
697 }
698 }
699 if(!keep)
700 {
701 excludeBranchIterator.remove();
702 }
703 }
704
705 if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
706 includeBranches.get(0).equals(baseDN))
707 {
708 // This entire base DN is explicitly included in the import with
709 // no exclude branches that we need to migrate. Just clear the entry
710 // container.
711 entryContainer.lock();
712 entryContainer.clear();
713 entryContainer.unlock();
714 }
715 else
716 {
717 // Create a temp entry container
718 srcEntryContainer = entryContainer;
719 entryContainer =
720 rootContainer.openEntryContainer(baseDN,
721 baseDN.toNormalizedString() +
722 "_importTmp");
723 }
724 }
725 }
726
727 // Create an import context.
728 DNContext DNContext = new DNContext();
729 DNContext.setConfig(config);
730 DNContext.setLDIFImportConfig(this.ldifImportConfig);
731 DNContext.setLDIFReader(reader);
732
733 DNContext.setBaseDN(baseDN);
734 DNContext.setEntryContainer(entryContainer);
735 DNContext.setSrcEntryContainer(srcEntryContainer);
736
737 //Create queue.
738 LinkedBlockingQueue<WorkElement> works =
739 new LinkedBlockingQueue<WorkElement>
740 (config.getImportQueueSize());
741 DNContext.setWorkQueue(works);
742
743 // Set the include and exclude branches
744 DNContext.setIncludeBranches(includeBranches);
745 DNContext.setExcludeBranches(excludeBranches);
746
747 return DNContext;
748 }
749
750 /**
751 * Add specified context and entry to the work queue.
752 *
753 * @param context The context related to the entry DN.
754 * @param entry The entry to work on.
755 * @return <CODE>True</CODE> if the element was added to the work queue.
756 */
757 private boolean
758 addEntryQueue(DNContext context, Entry entry) {
759 WorkElement element =
760 WorkElement.decode(entry, context);
761 return addQueue(context, element);
762 }
763
764 /**
765 * Calculate the memory usage for the substring buffer and the DB cache.
766 */
767 private void calcMemoryLimits() {
768 Message msg;
769 Runtime runtime = Runtime.getRuntime();
770 long freeMemory = runtime.freeMemory();
771 long maxMemory = runtime.maxMemory();
772 long totMemory = runtime.totalMemory();
773 long totFreeMemory = (freeMemory + (maxMemory - totMemory));
774 long dbCacheLimit = (totFreeMemory * 40) / 100;
775 dbCacheSizeStr = Long.toString(dbCacheLimit);
776 totalAvailBufferMemory = (totFreeMemory * 10) / 100;
777 if(totalAvailBufferMemory < (10 * minBuffer)) {
778 msg =
779 NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
780 (10 * minBuffer));
781 logError(msg);
782 totalAvailBufferMemory = (10 * minBuffer);
783 }
784 msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
785 totalAvailBufferMemory);
786 logError(msg);
787 }
788
789 /**
790 * Return the string representation of the DB cache size.
791 *
792 * @return DB cache size string.
793 */
794 public String getDBCacheSize() {
795 return dbCacheSizeStr;
796 }
797
798 /**
799 * Migrate any existing entries.
800 *
801 * @throws JebException If a JEB error occurs.
802 * @throws DatabaseException If a DB error occurs.
803 * @throws DirectoryException If a directory error occurs.
804 */
805 private void migrateExistingEntries()
806 throws JebException, DatabaseException, DirectoryException {
807 for(DNContext context : importMap.values()) {
808 EntryContainer srcEntryContainer = context.getSrcEntryContainer();
809 if(srcEntryContainer != null &&
810 !context.getIncludeBranches().isEmpty()) {
811 DatabaseEntry key = new DatabaseEntry();
812 DatabaseEntry data = new DatabaseEntry();
813 LockMode lockMode = LockMode.DEFAULT;
814 OperationStatus status;
815 Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
816 "existing", String.valueOf(context.getBaseDN()));
817 logError(message);
818 Cursor cursor =
819 srcEntryContainer.getDN2ID().openCursor(null,
820 CursorConfig.READ_COMMITTED);
821 try {
822 status = cursor.getFirst(key, data, lockMode);
823 while(status == OperationStatus.SUCCESS &&
824 !ldifImportConfig.isCancelled()) {
825 if(threads.size() <= 0) {
826 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
827 throw new JebException(message);
828 }
829 DN dn = DN.decode(new ASN1OctetString(key.getData()));
830 if(!context.getIncludeBranches().contains(dn)) {
831 EntryID id = new EntryID(data);
832 Entry entry =
833 srcEntryContainer.getID2Entry().get(null,
834 id, LockMode.DEFAULT);
835 processEntry(context, entry);
836 migratedCount++;
837 status = cursor.getNext(key, data, lockMode);
838 } else {
839 // This is the base entry for a branch that will be included
840 // in the import so we don't want to copy the branch to the new
841 // entry container.
842
843 /**
844 * Advance the cursor to next entry at the same level in the DIT
845 * skipping all the entries in this branch.
846 * Set the next starting value to a value of equal length but
847 * slightly greater than the previous DN. Since keys are compared
848 * in reverse order we must set the first byte (the comma).
849 * No possibility of overflow here.
850 */
851 byte[] begin =
852 StaticUtils.getBytes("," + dn.toNormalizedString());
853 begin[0] = (byte) (begin[0] + 1);
854 key.setData(begin);
855 status = cursor.getSearchKeyRange(key, data, lockMode);
856 }
857 }
858 } finally {
859 cursor.close();
860 }
861 }
862 }
863 }
864
865
866 /**
867 * Migrate excluded entries.
868 *
869 * @throws JebException If a JEB error occurs.
870 * @throws DatabaseException If a DB error occurs.
871 * @throws DirectoryException If a directory error occurs.
872 */
873 private void migrateExcludedEntries()
874 throws JebException, DatabaseException, DirectoryException {
875 for(DNContext importContext : importMap.values()) {
876 EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
877 if(srcEntryContainer != null &&
878 !importContext.getExcludeBranches().isEmpty()) {
879 DatabaseEntry key = new DatabaseEntry();
880 DatabaseEntry data = new DatabaseEntry();
881 LockMode lockMode = LockMode.DEFAULT;
882 OperationStatus status;
883 Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
884 "excluded", String.valueOf(importContext.getBaseDN()));
885 logError(message);
886 Cursor cursor =
887 srcEntryContainer.getDN2ID().openCursor(null,
888 CursorConfig.READ_COMMITTED);
889 Comparator<byte[]> dn2idComparator =
890 srcEntryContainer.getDN2ID().getComparator();
891 try {
892 for(DN excludedDN : importContext.getExcludeBranches()) {
893 byte[] suffix =
894 StaticUtils.getBytes(excludedDN.toNormalizedString());
895 key.setData(suffix);
896 status = cursor.getSearchKeyRange(key, data, lockMode);
897 if(status == OperationStatus.SUCCESS &&
898 Arrays.equals(key.getData(), suffix)) {
899 // This is the base entry for a branch that was excluded in the
900 // import so we must migrate all entries in this branch over to
901 // the new entry container.
902 byte[] end =
903 StaticUtils.getBytes("," + excludedDN.toNormalizedString());
904 end[0] = (byte) (end[0] + 1);
905
906 while(status == OperationStatus.SUCCESS &&
907 dn2idComparator.compare(key.getData(), end) < 0 &&
908 !ldifImportConfig.isCancelled()) {
909 if(threads.size() <= 0) {
910 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
911 throw new JebException(message);
912 }
913 EntryID id = new EntryID(data);
914 Entry entry = srcEntryContainer.getID2Entry().get(null,
915 id, LockMode.DEFAULT);
916 processEntry(importContext, entry);
917 migratedCount++;
918 status = cursor.getNext(key, data, lockMode);
919 }
920 }
921 }
922 }
923 finally
924 {
925 cursor.close();
926 }
927 }
928 }
929 }
930
931
932 /**
933 * This class reports progress of the import job at fixed intervals.
934 */
935 private final class ProgressTask extends TimerTask
936 {
937 /**
938 * The number of entries that had been read at the time of the
939 * previous progress report.
940 */
941 private long previousCount = 0;
942
943 /**
944 * The time in milliseconds of the previous progress report.
945 */
946 private long previousTime;
947
948 /**
949 * The environment statistics at the time of the previous report.
950 */
951 private EnvironmentStats prevEnvStats;
952
953 /**
954 * The number of bytes in a megabyte.
955 * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
956 */
957 public static final int bytesPerMegabyte = 1024*1024;
958
959 //Determines if the ldif is being read.
960 private boolean ldifRead = false;
961
962 //Determines if eviction has been detected.
963 private boolean evicting = false;
964
965 //Entry count when eviction was detected.
966 private long evictionEntryCount = 0;
967
968 //Suspend output.
969 private boolean pause = false;
970
971 /**
972 * Create a new import progress task.
973 * @throws DatabaseException If an error occurs in the JE database.
974 */
975 public ProgressTask() throws DatabaseException
976 {
977 previousTime = System.currentTimeMillis();
978 prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
979 }
980
981 /**
982 * Return if reading the LDIF file.
983 */
984 public void ldifRead() {
985 ldifRead=true;
986 }
987
988 /**
989 * Return value of evicting flag.
990 *
991 * @return <CODE>True</CODE> if eviction is detected.
992 */
993 public boolean isEvicting() {
994 return evicting;
995 }
996
997 /**
998 * Return count of entries when eviction was detected.
999 *
1000 * @return The entry count when eviction was detected.
1001 */
1002 public long getEvictionEntryCount() {
1003 return evictionEntryCount;
1004 }
1005
1006 /**
1007 * Suspend output if true.
1008 *
1009 * @param v The value to set the suspend value to.
1010 */
1011 public void setPause(boolean v) {
1012 pause=v;
1013 }
1014
1015 /**
1016 * The action to be performed by this timer task.
1017 */
1018 public void run() {
1019 long latestCount = reader.getEntriesRead() + 0;
1020 long deltaCount = (latestCount - previousCount);
1021 long latestTime = System.currentTimeMillis();
1022 long deltaTime = latestTime - previousTime;
1023 Message message;
1024 if (deltaTime == 0) {
1025 return;
1026 }
1027 if(pause) {
1028 return;
1029 }
1030 if(!ldifRead) {
1031 long numRead = reader.getEntriesRead();
1032 long numIgnored = reader.getEntriesIgnored();
1033 long numRejected = reader.getEntriesRejected();
1034 float rate = 1000f*deltaCount / deltaTime;
1035 message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(
1036 numRead, numIgnored, numRejected, 0, rate);
1037 logError(message);
1038 }
1039 try
1040 {
1041 Runtime runtime = Runtime.getRuntime();
1042 long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
1043 EnvironmentStats envStats =
1044 rootContainer.getEnvironmentStats(new StatsConfig());
1045 long nCacheMiss =
1046 envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
1047
1048 float cacheMissRate = 0;
1049 if (deltaCount > 0) {
1050 cacheMissRate = nCacheMiss/(float)deltaCount;
1051 }
1052 message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
1053 freeMemory, cacheMissRate);
1054 logError(message);
1055 long evictPasses = envStats.getNEvictPasses();
1056 long evictNodes = envStats.getNNodesExplicitlyEvicted();
1057 long evictBinsStrip = envStats.getNBINsStripped();
1058 int cleanerRuns = envStats.getNCleanerRuns();
1059 int cleanerDeletions = envStats.getNCleanerDeletions();
1060 int cleanerEntriesRead = envStats.getNCleanerEntriesRead();
1061 int cleanerINCleaned = envStats.getNINsCleaned();
1062 int checkPoints = envStats.getNCheckpoints();
1063 if(evictPasses != 0) {
1064 if(!evicting) {
1065 evicting=true;
1066 if(!ldifRead) {
1067 evictionEntryCount=reader.getEntriesRead();
1068 message =
1069 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
1070 logError(message);
1071 }
1072 }
1073 message =
1074 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
1075 evictNodes, evictBinsStrip);
1076 logError(message);
1077 }
1078 if(cleanerRuns != 0) {
1079 message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
1080 cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
1081 logError(message);
1082 }
1083 if(checkPoints > 1) {
1084 message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
1085 logError(message);
1086 }
1087 prevEnvStats = envStats;
1088 } catch (DatabaseException e) {
1089 // Unlikely to happen and not critical.
1090 }
1091 previousCount = latestCount;
1092 previousTime = latestTime;
1093 }
1094 }
1095 }
1096