001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027
028 package org.opends.server.backends.jeb.importLDIF;
029
030 import static org.opends.server.loggers.debug.DebugLogger.*;
031 import org.opends.server.loggers.debug.DebugTracer;
032 import org.opends.server.types.*;
033 import org.opends.server.api.DirectoryThread;
034 import org.opends.server.backends.jeb.*;
035 import org.opends.messages.Message;
036 import static org.opends.messages.JebMessages.*;
037 import java.util.concurrent.BlockingQueue;
038 import java.util.concurrent.TimeUnit;
039 import java.util.*;
040 import com.sleepycat.je.DatabaseException;
041 import com.sleepycat.je.Transaction;
042 import com.sleepycat.je.LockMode;
043 import com.sleepycat.je.DatabaseEntry;
044
045 /**
046 * A thread to process import entries from a queue. Multiple instances of
047 * this class process entries from a single shared queue.
048 */
049 public class WorkThread extends DirectoryThread {
050
051 /**
052 * The tracer object for the debug logger.
053 */
054 private static final DebugTracer TRACER = getTracer();
055
056 /*
057 * Work queue of work items.
058 */
059 private BlockingQueue<WorkElement> workQueue;
060
061
062 /**
063 * The number of entries imported by this thread.
064 */
065 private int importedCount = 0;
066
067 //Root container.
068 private RootContainer rootContainer;
069
070 /**
071 * A flag that is set when the thread has been told to stop processing.
072 */
073 private boolean stopRequested = false;
074
075 //The thread number related to a thread.
076 private int threadNumber;
077
078 //The substring buffer manager to use.
079 private BufferManager bufferMgr;
080
081 //These are used to try and keep memory usage down.
082 private Set<byte[]> insertKeySet = new HashSet<byte[]>();
083 private Set<byte[]> childKeySet = new HashSet<byte[]>();
084 private Set<byte[]> subtreeKeySet = new HashSet<byte[]>();
085 private Set<byte[]> delKeySet = new HashSet<byte[]>();
086 private DatabaseEntry keyData = new DatabaseEntry();
087 private DatabaseEntry data = new DatabaseEntry();
088 ImportIDSet importIDSet = new IntegerImportIDSet();
089
090 /**
091 * Create a work thread instance using the specified parameters.
092 *
093 * @param workQueue The work queue to pull work off of.
094 * @param threadNumber The thread number.
095 * @param bufferMgr The buffer manager to use.
096 * @param rootContainer The root container.
097 */
098 public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber,
099 BufferManager bufferMgr,
100 RootContainer rootContainer) {
101 super("Import Worker Thread " + threadNumber);
102 this.threadNumber = threadNumber;
103 this.workQueue = workQueue;
104 this.bufferMgr = bufferMgr;
105 this.rootContainer = rootContainer;
106 }
107
108 /**
109 * Get the number of entries imported by this thread.
110 * @return The number of entries imported by this thread.
111 */
112 int getImportedCount() {
113 return importedCount;
114 }
115
116 /**
117 * Tells the thread to stop processing.
118 */
119 void stopProcessing() {
120 stopRequested = true;
121 }
122
123 /**
124 * Run the thread. Read from item from queue and give it to the
125 * buffer manage, unless told to stop. Once stopped, ask buffer manager
126 * to flush and exit.
127 *
128 */
129 public void run()
130 {
131 try {
132 do {
133 try {
134 WorkElement element = workQueue.poll(1000, TimeUnit.MILLISECONDS);
135 if(element != null) {
136 process(element);
137 }
138 }
139 catch (InterruptedException e) {
140 if (debugEnabled()) {
141 TRACER.debugCaught(DebugLogLevel.ERROR, e);
142 }
143 }
144 } while (!stopRequested);
145 } catch (Exception e) {
146 if (debugEnabled()) {
147 TRACER.debugCaught(DebugLogLevel.ERROR, e);
148 }
149 throw new RuntimeException(e);
150 }
151 }
152
153 /**
154 * Process a work element.
155 *
156 * @param element The work elemenet to process.
157 *
158 * @throws DatabaseException If a database error occurs.
159 * @throws DirectoryException If a directory error occurs.
160 * @throws JebException If a JEB error occurs.
161 */
162 private void process(WorkElement element)
163 throws DatabaseException, DirectoryException, JebException {
164 Transaction txn = null;
165 EntryID entryID;
166 if((entryID = processDN2ID(element, txn)) == null)
167 return;
168 if(!processID2Entry(element, entryID, txn))
169 return;
170 procesID2SCEntry(element, entryID, txn);
171 processIndexesEntry(element, entryID, txn);
172 }
173
174 /**
175 * Delete all indexes related to the specified entry ID using the specified
176 * entry to generate the keys.
177 *
178 * @param element The work element.
179 * @param existingEntry The existing entry to replace.
180 * @param entryID The entry ID to remove from the keys.
181 * @param txn A transaction.
182 * @throws DatabaseException If a database error occurs.
183 */
184 private void
185 processIndexesEntryDelete(WorkElement element, Entry existingEntry,
186 EntryID entryID, Transaction txn)
187 throws DatabaseException {
188 DNContext context = element.getContext();
189 Map<AttributeType, AttributeIndex> attrIndexMap =
190 context.getAttrIndexMap();
191 for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
192 attrIndexMap.entrySet()) {
193 AttributeType attrType = mapEntry.getKey();
194 if(existingEntry.hasAttribute(attrType)) {
195 AttributeIndex attributeIndex = mapEntry.getValue();
196 Index index;
197 if((index=attributeIndex.getEqualityIndex()) != null) {
198 delete(index, existingEntry, entryID, txn);
199 }
200 if((index=attributeIndex.getPresenceIndex()) != null) {
201 delete(index, existingEntry, entryID, txn);
202 }
203 if((index=attributeIndex.getSubstringIndex()) != null) {
204 delete(index, existingEntry, entryID, txn);
205 }
206 if((index=attributeIndex.getOrderingIndex()) != null) {
207 delete(index, existingEntry, entryID, txn);
208 }
209 if((index=attributeIndex.getApproximateIndex()) != null) {
210 delete(index, existingEntry, entryID, txn);
211 }
212 }
213 }
214 }
215
216 /**
217 * Process all indexes using the specified entry ID.
218 *
219 * @param element The work element.
220 * @param entryID The entry ID to process.
221 * @param txn A transaction.
222 * @throws DatabaseException If an database error occurs.
223 */
224 private void
225 processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn)
226 throws DatabaseException {
227 Entry entry = element.getEntry();
228 DNContext context = element.getContext();
229 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
230 if (ldifImportConfig.appendToExistingData() &&
231 ldifImportConfig.replaceExistingEntries()) {
232 Entry existingEntry = element.getExistingEntry();
233 if(existingEntry != null) {
234 processIndexesEntryDelete(element, existingEntry, entryID, txn);
235 }
236 }
237 Map<AttributeType, AttributeIndex> attrIndexMap =
238 context.getAttrIndexMap();
239 for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
240 attrIndexMap.entrySet()) {
241 AttributeType attrType = mapEntry.getKey();
242 if(entry.hasAttribute(attrType)) {
243 AttributeIndex attributeIndex = mapEntry.getValue();
244 Index index;
245 if((index=attributeIndex.getEqualityIndex()) != null) {
246 insert(index, entry, entryID, txn);
247 }
248 if((index=attributeIndex.getPresenceIndex()) != null) {
249 insert(index, entry, entryID, txn);
250 }
251 if((index=attributeIndex.getSubstringIndex()) != null) {
252 bufferMgr.insert(index,entry, entryID, txn, insertKeySet);
253 }
254 if((index=attributeIndex.getOrderingIndex()) != null) {
255 insert(index, entry, entryID, txn);
256 }
257 if((index=attributeIndex.getApproximateIndex()) != null) {
258 insert(index, entry, entryID, txn);
259 }
260 }
261 }
262 }
263
264 /**
265 * Process id2children/id2subtree indexes for the specified entry ID.
266 *
267 * @param element The work element.
268 * @param entryID The entry ID to process.
269 * @param txn A transaction.
270 * @throws DatabaseException If an database error occurs.
271 */
272 private void
273 procesID2SCEntry(WorkElement element, EntryID entryID,
274 Transaction txn) throws DatabaseException {
275 Entry entry = element.getEntry();
276 DNContext context = element.getContext();
277 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
278 if (ldifImportConfig.appendToExistingData() &&
279 ldifImportConfig.replaceExistingEntries()) {
280 return;
281 }
282 Index id2children = context.getEntryContainer().getID2Children();
283 Index id2subtree = context.getEntryContainer().getID2Subtree();
284 bufferMgr.insert(id2children, id2subtree, entry, entryID, txn,
285 childKeySet, subtreeKeySet);
286 }
287
288 /**
289 * Insert specified entry ID into the specified index using the entry to
290 * generate the keys.
291 *
292 * @param index The index to insert into.
293 * @param entry The entry to generate the keys from.
294 * @param entryID The entry ID to insert.
295 * @param txn A transaction.
296 * @return <CODE>True</CODE> if insert succeeded.
297 * @throws DatabaseException If a database error occurs.
298 */
299 private boolean
300 insert(Index index, Entry entry, EntryID entryID,
301 Transaction txn) throws DatabaseException {
302 insertKeySet.clear();
303 index.indexer.indexEntry(entry, insertKeySet);
304 importIDSet.setEntryID(entryID);
305 return index.insert(txn, importIDSet, insertKeySet, keyData, data);
306 }
307
308 /**
309 * Delete specified entry ID into the specified index using the entry to
310 * generate the keys.
311 *
312 * @param index The index to insert into.
313 * @param entry The entry to generate the keys from.
314 * @param entryID The entry ID to insert.
315 * @param txn A transaction.
316 * @throws DatabaseException If a database error occurs.
317 */
318 private void
319 delete(Index index, Entry entry, EntryID entryID,
320 Transaction txn) throws DatabaseException {
321 delKeySet.clear();
322 index.indexer.indexEntry(entry, delKeySet);
323 index.delete(txn, delKeySet, entryID);
324 }
325
326 /**
327 * Insert entry from work element into id2entry DB.
328 *
329 * @param element The work element containing the entry.
330 * @param entryID The entry ID to use as the key.
331 * @param txn A transaction.
332 * @return <CODE>True</CODE> If the insert succeeded.
333 * @throws DatabaseException If a database error occurs.
334 * @throws DirectoryException If a directory error occurs.
335 */
336 private boolean
337 processID2Entry(WorkElement element, EntryID entryID, Transaction txn)
338 throws DatabaseException, DirectoryException {
339 boolean ret;
340 Entry entry = element.getEntry();
341 DNContext context = element.getContext();
342 ID2Entry id2entry = context.getEntryContainer().getID2Entry();
343 DN2URI dn2uri = context.getEntryContainer().getDN2URI();
344 ret=id2entry.put(txn, entryID, entry);
345 if(ret) {
346 importedCount++;
347 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
348 if (ldifImportConfig.appendToExistingData() &&
349 ldifImportConfig.replaceExistingEntries()) {
350 Entry existingEntry = element.getExistingEntry();
351 if(existingEntry != null) {
352 dn2uri.replaceEntry(txn, existingEntry, entry);
353 }
354 } else {
355 ret= dn2uri.addEntry(txn, entry);
356 }
357 }
358 return ret;
359 }
360
361 /**
362 * Process entry from work element checking if it's parent exists.
363 *
364 * @param element The work element containing the entry.
365 * @param txn A transaction.
366 * @return <CODE>True</CODE> If the insert succeeded.
367 * @throws DatabaseException If a database error occurs.
368 */
369 private boolean
370 processParent(WorkElement element, Transaction txn)
371 throws DatabaseException {
372 Entry entry = element.getEntry();
373 DNContext context = element.getContext();
374 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
375 if (ldifImportConfig.appendToExistingData() &&
376 ldifImportConfig.replaceExistingEntries()) {
377 return true;
378 }
379 EntryID parentID = null;
380 DN entryDN = entry.getDN();
381 DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN);
382 DN2ID dn2id = context.getEntryContainer().getDN2ID();
383 if (parentDN != null) {
384 parentID = context.getParentID(parentDN, dn2id, txn);
385 if (parentID == null) {
386 dn2id.remove(txn, entryDN);
387 Message msg =
388 ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
389 context.getLDIFReader().rejectLastEntry(msg);
390 return false;
391 }
392 }
393 EntryID entryID = rootContainer.getNextEntryID();
394 ArrayList<EntryID> IDs;
395 if (parentDN != null && context.getParentDN() != null &&
396 parentDN.equals(context.getParentDN())) {
397 IDs = new ArrayList<EntryID>(context.getIDs());
398 IDs.set(0, entryID);
399 } else {
400 EntryID nodeID;
401 IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
402 IDs.add(entryID);
403 if (parentID != null)
404 {
405 IDs.add(parentID);
406 EntryContainer ec = context.getEntryContainer();
407 for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
408 dn = ec.getParentWithinBase(dn)) {
409 if((nodeID = getAncestorID(dn2id, dn, txn)) == null) {
410 return false;
411 } else {
412 IDs.add(nodeID);
413 }
414 }
415 }
416 }
417 context.setParentDN(parentDN);
418 context.setIDs(IDs);
419 entry.setAttachment(IDs);
420 return true;
421 }
422
423 private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
424 throws DatabaseException {
425 int i=0;
426 EntryID nodeID = dn2id.get(txn, dn, LockMode.DEFAULT);
427 if(nodeID == null) {
428 while((nodeID = dn2id.get(txn, dn, LockMode.DEFAULT)) == null) {
429 try {
430 Thread.sleep(50);
431 if(i == 3) {
432 return null;
433 }
434 i++;
435 } catch (Exception e) {
436 return null;
437 }
438 }
439 }
440 return nodeID;
441 }
442
443 /**
444 * Process the a entry from the work element into the dn2id DB.
445 *
446 * @param element The work element containing the entry.
447 * @param txn A transaction.
448 * @return An entry ID.
449 * @throws DatabaseException If a database error occurs.
450 * @throws JebException If a JEB error occurs.
451 */
452 private EntryID
453 processDN2ID(WorkElement element, Transaction txn)
454 throws DatabaseException, JebException {
455 Entry entry = element.getEntry();
456 DNContext context = element.getContext();
457 DN2ID dn2id = context.getEntryContainer().getDN2ID();
458 LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
459 DN entryDN = entry.getDN();
460 EntryID entryID = dn2id.get(txn, entryDN, LockMode.DEFAULT);
461 if (entryID != null) {
462 if (ldifImportConfig.appendToExistingData() &&
463 ldifImportConfig.replaceExistingEntries()) {
464 ID2Entry id2entry = context.getEntryContainer().getID2Entry();
465 Entry existingEntry = id2entry.get(txn, entryID, LockMode.DEFAULT);
466 element.setExistingEntry(existingEntry);
467 } else {
468 Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
469 context.getLDIFReader().rejectLastEntry(msg);
470 entryID = null;
471 }
472 } else {
473 if(!processParent(element, txn))
474 return null;
475 if (ldifImportConfig.appendToExistingData() &&
476 ldifImportConfig.replaceExistingEntries()) {
477 entryID = rootContainer.getNextEntryID();
478 } else {
479 ArrayList IDs = (ArrayList)entry.getAttachment();
480 entryID = (EntryID)IDs.get(0);
481 }
482 dn2id.insert(txn, entryDN, entryID);
483 }
484 context.removePending(entryDN);
485 return entryID;
486 }
487 }