001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027
028
029 package org.opends.server.backends.jeb.importLDIF;
030
031 import org.opends.server.types.Entry;
032 import org.opends.server.backends.jeb.Index;
033 import org.opends.server.backends.jeb.EntryID;
034 import com.sleepycat.je.Transaction;
035 import com.sleepycat.je.DatabaseException;
036 import com.sleepycat.je.DatabaseEntry;
037 import com.sleepycat.je.dbi.MemoryBudget;
038 import static org.opends.server.loggers.ErrorLogger.logError;
039 import org.opends.messages.Message;
040 import static org.opends.messages.JebMessages.*;
041 import java.util.*;
042 import java.util.concurrent.locks.ReentrantLock;
043
044
045 /**
046 * Manages a shared cache among worker threads that caches substring
047 * key/value pairs to avoid DB cache access. Once the cache is above it's
048 * memory usage limit, it will start slowly flushing keys (similar to the
049 * JEB eviction process) until it is under the limit.
050 */
051
052 public class BufferManager {
053
054 //Memory usage counter.
055 private long memoryUsage=0;
056
057 //Memory limit.
058 private long memoryLimit;
059
060 //Next element in the cache to start flushing at during next flushAll cycle.
061 private KeyHashElement nextElem;
062
063 //Extra bytes to flushAll.
064 private final int extraBytes = 1024 * 1024;
065
066 //Counters for statistics, total is number of accesses, hit is number of
067 //keys found in cache.
068 private long total=0, hit=0;
069
070 //Actual map used to buffer keys.
071 private TreeMap<KeyHashElement, KeyHashElement> elementMap =
072 new TreeMap<KeyHashElement, KeyHashElement>();
073
074 //The current backup map being used.
075 private int currentMap = 1;
076
077 //Reference to use when the maps are switched.
078 private TreeMap<KeyHashElement, KeyHashElement> backupMap;
079
080 //The two backup maps to insert into if the main element map is being used.
081 private TreeMap<KeyHashElement, KeyHashElement> backupMap2 =
082 new TreeMap<KeyHashElement, KeyHashElement>();
083 private TreeMap<KeyHashElement, KeyHashElement> backupMap1 =
084 new TreeMap<KeyHashElement, KeyHashElement>();
085
086 //Overhead values determined from using JHAT. They appear to be the same
087 //for both 32 and 64 bit machines. Close enough.
088 private final static int TREEMAP_ENTRY_OVERHEAD = 29;
089 private final static int KEY_ELEMENT_OVERHEAD = 32;
090
091 //Lock used to get main element map.
092 private ReentrantLock lock = new ReentrantLock();
093
094 //Object to synchronize on if backup maps are being written.
095 private Object backupSynchObj = new Object();
096
097 /**
098 * Create buffer manager instance.
099 *
100 * @param memoryLimit The memory limit.
101 * @param importThreadCount The count of import worker threads.
102 */
103 public BufferManager(long memoryLimit, int importThreadCount) {
104 this.memoryLimit = memoryLimit;
105 this.nextElem = null;
106 this.backupMap = backupMap1;
107 }
108
109 /**
110 * Insert an entry ID into the buffer using the both the specified index and
111 * entry to build a key set. Will flush the buffer if over the memory limit.
112 *
113 * @param index The index to use.
114 * @param entry The entry used to build the key set.
115 * @param entryID The entry ID to insert into the key set.
116 * @param txn A transaction.
117 * @param keySet Keyset hash to store the keys in.
118 * @throws DatabaseException If a problem happened during a flushAll cycle.
119 */
120
121 void insert(Index index, Entry entry,
122 EntryID entryID, Transaction txn, Set<byte[]> keySet)
123 throws DatabaseException {
124
125 keySet.clear();
126 index.indexer.indexEntry(entry, keySet);
127 if(!lock.tryLock()) {
128 insertBackupMap(keySet, index, entryID);
129 return;
130 }
131 insertKeySet(keySet, index, entryID, elementMap, true);
132 if(!backupMap.isEmpty()) {
133 mergeMap();
134 }
135 //If over the memory limit, flush some keys from the cache to make room.
136 if(memoryUsage > memoryLimit) {
137 flushUntilUnderLimit();
138 }
139 lock.unlock();
140 }
141
142 /**
143 * Insert an entry ID into buffer using specified id2children and id2subtree
144 * indexes.
145 *
146 * @param id2children The id2children index to use.
147 * @param id2subtree The id2subtree index to use.
148 * @param entry The entry used to build the key set.
149 * @param entryID The entry ID to insert into the key set.
150 * @param txn A transaction.
151 * @param childKeySet id2children key set hash to use.
152 * @param subKeySet subtree key set hash to use.
153 * @throws DatabaseException If a problem occurs during processing.
154 */
155 void insert(Index id2children, Index id2subtree, Entry entry,
156 EntryID entryID, Transaction txn, Set<byte[]> childKeySet,
157 Set<byte[]> subKeySet) throws DatabaseException {
158 childKeySet.clear();
159 id2children.indexer.indexEntry(entry, childKeySet);
160 subKeySet.clear();
161 id2subtree.indexer.indexEntry(entry, subKeySet);
162 if(!lock.tryLock()) {
163 insertBackupMap(childKeySet, id2children, subKeySet, id2subtree, entryID);
164 return;
165 }
166 insertKeySet(childKeySet, id2children, entryID, elementMap, true);
167 insertKeySet(subKeySet, id2subtree, entryID, elementMap, true);
168 lock.unlock();
169 }
170
171 /**
172 * Insert into a backup tree if can't get a lock on the main table.
173 * @param childrenKeySet The id2children keyset to use.
174 * @param id2children The id2children index to use.
175 * @param subtreeKeySet The subtree keyset to use.
176 * @param id2subtree The id2subtree index to use.
177 * @param entryID The entry ID to insert into the key set.
178 */
179 void insertBackupMap(Set<byte[]> childrenKeySet, Index id2children,
180 Set<byte[]> subtreeKeySet,
181 Index id2subtree, EntryID entryID) {
182 synchronized(backupSynchObj) {
183 insertKeySet(childrenKeySet, id2children, entryID, backupMap, false);
184 insertKeySet(subtreeKeySet, id2subtree, entryID, backupMap, false);
185 }
186 }
187
188
189 /**
190 * Insert specified keyset, index and entry ID into the backup map.
191 *
192 * @param keySet The keyset to use.
193 * @param index The index to use.
194 * @param entryID The entry ID to use.
195 */
196 void insertBackupMap(Set<byte[]> keySet, Index index, EntryID entryID) {
197 synchronized(backupSynchObj) {
198 insertKeySet(keySet, index, entryID, backupMap, false);
199 }
200 }
201
202
203 /**
204 * Merge the backup map with the element map after switching the backup
205 * map reference to an empty map.
206 */
207 void mergeMap() {
208 TreeMap<KeyHashElement, KeyHashElement> tmpMap;
209 synchronized(backupSynchObj) {
210 tmpMap = backupMap;
211 if(currentMap == 1) {
212 backupMap = backupMap2;
213 tmpMap = backupMap1;
214 currentMap = 2;
215 } else {
216 backupMap = backupMap1;
217 tmpMap = backupMap2;
218 currentMap = 1;
219 }
220 }
221 TreeSet<KeyHashElement> tSet =
222 new TreeSet<KeyHashElement>(tmpMap.keySet());
223 for (KeyHashElement elem : tSet) {
224 total++;
225 if(!elementMap.containsKey(elem)) {
226 elementMap.put(elem, elem);
227 memoryUsage += TREEMAP_ENTRY_OVERHEAD + elem.getMemorySize();
228 } else {
229 KeyHashElement curElem = elementMap.get(elem);
230 if(curElem.isDefined() || curElem.getIndex().getMaintainCount()) {
231 int oldSize = curElem.getMemorySize();
232 curElem.merge(elem);
233 memoryUsage += (curElem.getMemorySize() - oldSize);
234 hit++;
235 }
236 }
237 }
238 tmpMap.clear();
239 }
240
241 /**
242 * Insert a keySet into the element map using the provided index and entry ID.
243 * @param keySet The key set to add to the map.
244 * @param index The index that eventually will contain the entry IDs.
245 * @param entryID The entry ID to add to the entry ID set.
246 * @param map The map to add the keys to
247 * @param trackStats <CODE>True</CODE> if memory and usage should be tracked.
248 */
249 private void insertKeySet(Set<byte[]> keySet, Index index, EntryID entryID,
250 TreeMap<KeyHashElement, KeyHashElement> map,
251 boolean trackStats) {
252 KeyHashElement elem = new KeyHashElement();
253 int entryLimit = index.getIndexEntryLimit();
254 for(byte[] key : keySet) {
255 elem.reset(key, index);
256 if(trackStats) {
257 total++;
258 }
259 if(!map.containsKey(elem)) {
260 KeyHashElement newElem = new KeyHashElement(key, index, entryID);
261 map.put(newElem, newElem);
262 if(trackStats) {
263 memoryUsage += TREEMAP_ENTRY_OVERHEAD + newElem.getMemorySize();
264 }
265 } else {
266 KeyHashElement curElem = map.get(elem);
267 if(curElem.isDefined() || index.getMaintainCount()) {
268 int oldSize = curElem.getMemorySize();
269 curElem.addEntryID(entryID, entryLimit);
270 if(trackStats) {
271 memoryUsage += (curElem.getMemorySize() - oldSize);
272 hit++;
273 }
274 }
275 }
276 }
277 }
278
279 /**
280 * Flush the buffer to DB until the buffer is under the memory limit.
281 *
282 * @throws DatabaseException If a problem happens during an index insert.
283 */
284 private void flushUntilUnderLimit() throws DatabaseException {
285 Iterator<KeyHashElement> iter;
286 if(nextElem == null) {
287 iter = elementMap.keySet().iterator();
288 } else {
289 iter = elementMap.tailMap(nextElem).keySet().iterator();
290 }
291 DatabaseEntry dbEntry = new DatabaseEntry();
292 DatabaseEntry entry = new DatabaseEntry();
293 while((memoryUsage + extraBytes) > memoryLimit) {
294 if(iter.hasNext()) {
295 KeyHashElement curElem = iter.next();
296 //Never flush undefined elements.
297 if(curElem.isDefined()) {
298 int oldSize = curElem.getMemorySize();
299 Index index = curElem.getIndex();
300 dbEntry.setData(curElem.getKey());
301 index.insert(null, dbEntry, curElem.getIDSet(), entry);
302 if(curElem.isDefined()) {
303 memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
304 iter.remove();
305 } else {
306 //Went undefined don't remove the element, just substract the
307 //memory size difference.
308 memoryUsage -= (oldSize - curElem.getMemorySize());
309 }
310 }
311 } else {
312 //Wrapped around, start at the first element.
313 nextElem = elementMap.firstKey();
314 iter = elementMap.keySet().iterator();
315 }
316 }
317 //Start at this element next flushAll cycle.
318 nextElem = iter.next();
319 }
320
321 /**
322 * Called from main thread to prepare for final buffer flush at end of
323 * ldif load.
324 */
325 void prepareFlush() {
326 Message msg =
327 NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
328 logError(msg);
329 }
330
331 /**
332 * Writes all of the buffer elements to DB. The specific id is used to
333 * share the buffer among the worker threads so this function can be
334 * multi-threaded.
335 *
336 * @throws DatabaseException If an error occurred during the insert.
337 */
338 void flushAll() throws DatabaseException {
339 mergeMap();
340 TreeSet<KeyHashElement> tSet =
341 new TreeSet<KeyHashElement>(elementMap.keySet());
342 DatabaseEntry dbEntry = new DatabaseEntry();
343 DatabaseEntry entry = new DatabaseEntry();
344 for (KeyHashElement curElem : tSet) {
345 Index index = curElem.getIndex();
346 dbEntry.setData(curElem.getKey());
347 index.insert(null, dbEntry, curElem.getIDSet(), entry);
348 }
349 }
350
351 /**
352 * Class used to represent an element in the buffer.
353 */
354 class KeyHashElement implements Comparable {
355
356 //Bytes representing the key.
357 private byte[] key;
358
359 //Hash code returned from the System.identityHashCode method on the index
360 //object.
361 private int indexHashCode;
362
363 //Index related to the element.
364 private Index index;
365
366 //The set of IDs related to the key.
367 private ImportIDSet importIDSet;
368
369 //Used to speed up lookup.
370 private int keyHashCode;
371
372 /**
373 * Empty constructor for use when the element is being reused.
374 */
375 public KeyHashElement() {}
376
377 /**
378 * Reset the element. Used when the element is being reused.
379 *
380 * @param key The new key to reset to.
381 * @param index The new index to reset to.
382 */
383 public void reset(byte[] key, Index index) {
384 this.key = key;
385 this.index = index;
386 this.indexHashCode = System.identityHashCode(index);
387 this.keyHashCode = Arrays.hashCode(key);
388 if(this.importIDSet != null) {
389 this.importIDSet.reset();
390 }
391 }
392
393 /**
394 * Create instance of an element for the specified key and index, the add
395 * the specified entry ID to the ID set.
396 *
397 * @param key The key.
398 * @param index The index.
399 * @param entryID The entry ID to start off with.
400 */
401 public KeyHashElement(byte[] key, Index index, EntryID entryID) {
402 this.key = key;
403 this.index = index;
404 //Use the integer set for right now. This is good up to 2G number of
405 //entries. There is also a LongImportSet, but it currently isn't used.
406 this.importIDSet = new IntegerImportIDSet(entryID);
407 //Used if there when there are conflicts if two or more indexes have
408 //the same key.
409 this.indexHashCode = System.identityHashCode(index);
410 this.keyHashCode = Arrays.hashCode(key);
411 }
412
413 /**
414 * Add an entry ID to the set.
415 *
416 * @param entryID The entry ID to add.
417 * @param entryLimit The entry limit
418 */
419 void addEntryID(EntryID entryID, int entryLimit) {
420 importIDSet.addEntryID(entryID, entryLimit, index.getMaintainCount());
421 }
422
423 /**
424 * Return the index.
425 *
426 * @return The index.
427 */
428 Index getIndex(){
429 return index;
430 }
431
432 /**
433 * Return the key.
434 *
435 * @return The key.
436 */
437 byte[] getKey() {
438 return key;
439 }
440
441 /**
442 * Return value of the key hash code.
443 *
444 * @return The key hash code value.
445 */
446 int getKeyHashCode() {
447 return keyHashCode;
448 }
449
450 /**
451 * Return the ID set.
452 * @return The import ID set.
453 */
454 ImportIDSet getIDSet() {
455 return importIDSet;
456 }
457
458 /**
459 * Return if the ID set is defined or not.
460 *
461 * @return <CODE>True</CODE> if the ID set is defined.
462 */
463 boolean isDefined() {
464 return importIDSet.isDefined();
465 }
466
467 /**
468 * Compare the bytes of two keys. The is slow, only use if the hashcode
469 * had a collision.
470 *
471 * @param a Key a.
472 * @param b Key b.
473 * @return 0 if the keys are equal, -1 if key a is less than key b, 1 if
474 * key a is greater than key b.
475 */
476 private int compare(byte[] a, byte[] b) {
477 int i;
478 for (i = 0; i < a.length && i < b.length; i++) {
479 if (a[i] > b[i]) {
480 return 1;
481 }
482 else if (a[i] < b[i]) {
483 return -1;
484 }
485 }
486 if (a.length == b.length) {
487 return 0;
488 }
489 if (a.length > b.length){
490 return 1;
491 }
492 else {
493 return -1;
494 }
495 }
496
497 /**
498 * Compare two element keys. First check the precomputed hashCode. If
499 * the hashCodes are equal, do a second byte per byte comparision in case
500 * there was a collision.
501 *
502 * @param elem The element to compare.
503 * @return 0 if the keys are equal, -1 if key a is less than key b, 1 if
504 * key a is greater than key b.
505 */
506 private int compare(KeyHashElement elem) {
507 if(keyHashCode == elem.getKeyHashCode()) {
508 return compare(key, elem.key);
509 } else {
510 if(keyHashCode < elem.getKeyHashCode()) {
511 return -1;
512 } else {
513 return 1;
514 }
515 }
516 }
517
518 /**
519 * Compare the specified object to the current object. If the keys are
520 * equal, then the indexHashCode value is used as a tie-breaker.
521 *
522 * @param o The object representing a KeyHashElement.
523 * @return 0 if the objects are equal, -1 if the current object is less
524 * than the specified object, 1 otherwise.
525 */
526 public int compareTo(Object o) {
527 if (o == null) {
528 throw new NullPointerException();
529 }
530 KeyHashElement inElem = (KeyHashElement) o;
531 int keyCompare = compare(inElem);
532 if(keyCompare == 0) {
533 if(indexHashCode == inElem.indexHashCode) {
534 return 0;
535 } else if(indexHashCode < inElem.indexHashCode) {
536 return -1;
537 } else {
538 return 1;
539 }
540 } else {
541 return keyCompare;
542 }
543 }
544
545 /**
546 * Return the current total memory size of the element.
547 * @return The memory size estimate of a KeyHashElement.
548 */
549 int getMemorySize() {
550 return KEY_ELEMENT_OVERHEAD +
551 MemoryBudget.byteArraySize(key.length) +
552 importIDSet.getMemorySize();
553 }
554
555 /**
556 * Merge the specified element with this element.
557 * @param e The element to merge.
558 */
559 public void merge(KeyHashElement e) {
560 importIDSet.merge(e.importIDSet, e.getIndex().getIndexEntryLimit(),
561 e.getIndex().getMaintainCount());
562 }
563 }
564 }