001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.MessageBuilder;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.messages.ReplicationMessages.*;
032 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
033
034 import java.util.List;
035 import java.io.UnsupportedEncodingException;
036
037 import org.opends.server.types.DN;
038 import org.opends.server.replication.common.ChangeNumber;
039 import org.opends.server.replication.protocol.UpdateMessage;
040 import java.util.concurrent.locks.ReentrantReadWriteLock;
041
042 import com.sleepycat.je.Cursor;
043 import com.sleepycat.je.DatabaseEntry;
044 import com.sleepycat.je.DatabaseException;
045 import com.sleepycat.je.Database;
046 import com.sleepycat.je.DeadlockException;
047 import com.sleepycat.je.LockMode;
048 import com.sleepycat.je.OperationStatus;
049 import com.sleepycat.je.Transaction;
050
051 /**
052 * This class implements the interface between the underlying database
053 * and the dbHandler class.
054 * This is the only class that should have code using the BDB interfaces.
055 */
056 public class ReplicationDB
057 {
058 private Database db = null;
059 private ReplicationDbEnv dbenv = null;
060 private ReplicationServer replicationServer;
061 private Short serverId;
062 private DN baseDn;
063
064 // The maximum number of retries in case of DatabaseDeadlock Exception.
065 private static final int DEADLOCK_RETRIES = 10;
066
067 // The lock used to provide exclusive access to the thread that
068 // close the db (shutdown or clear).
069 private ReentrantReadWriteLock dbCloseLock;
070
071 /**
072 * Creates a new database or open existing database that will be used
073 * to store and retrieve changes from an LDAP server.
074 * @param serverId The identifier of the LDAP server.
075 * @param baseDn The baseDn of the replication domain.
076 * @param replicationServer The ReplicationServer that needs to be shutdown.
077 * @param dbenv The Db environment to use to create the db.
078 * @throws DatabaseException If a database problem happened.
079 */
080 public ReplicationDB(Short serverId, DN baseDn,
081 ReplicationServer replicationServer,
082 ReplicationDbEnv dbenv)
083 throws DatabaseException
084 {
085 this.serverId = serverId;
086 this.baseDn = baseDn;
087 this.dbenv = dbenv;
088 this.replicationServer = replicationServer;
089
090 // Get or create the associated ReplicationServerDomain and Db.
091 db = dbenv.getOrAddDb(serverId, baseDn,
092 replicationServer.getReplicationServerDomain(baseDn,
093 true).getGenerationId());
094
095 dbCloseLock = new ReentrantReadWriteLock(true);
096 }
097
098 /**
099 * add a list of changes to the underlying db.
100 *
101 * @param changes The list of changes to add to the underlying db.
102 */
103 public void addEntries(List<UpdateMessage> changes)
104 {
105 Transaction txn = null;
106
107 try
108 {
109 int tries = 0;
110 boolean done = false;
111
112 // The database can return a Deadlock Exception if several threads are
113 // accessing the database at the same time. This Exception is a
114 // transient state, when it happens the transaction is aborted and
115 // the operation is attempted again up to DEADLOCK_RETRIES times.
116 while ((tries++ < DEADLOCK_RETRIES) && (!done))
117 {
118 dbCloseLock.readLock().lock();
119 try
120 {
121 txn = dbenv.beginTransaction();
122
123 for (UpdateMessage change : changes)
124 {
125 DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
126 DatabaseEntry data = new ReplicationData(change);
127 db.put(txn, key, data);
128 }
129
130 txn.commitWriteNoSync();
131 txn = null;
132 done = true;
133 }
134 catch (DeadlockException e)
135 {
136 txn.abort();
137 txn = null;
138 }
139 finally
140 {
141 dbCloseLock.readLock().unlock();
142 }
143 }
144 if (!done)
145 {
146 // Could not write to the DB after DEADLOCK_RETRIES tries.
147 // This ReplicationServer is not reliable and will be shutdown.
148 MessageBuilder mb = new MessageBuilder();
149 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
150 logError(mb.toMessage());
151 if (txn != null)
152 {
153 txn.abort();
154 }
155 replicationServer.shutdown();
156 }
157 }
158 catch (DatabaseException e)
159 {
160 MessageBuilder mb = new MessageBuilder();
161 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
162 mb.append(stackTraceToSingleLineString(e));
163 logError(mb.toMessage());
164 if (txn != null)
165 {
166 try
167 {
168 txn.abort();
169 } catch (DatabaseException e1)
170 {
171 // can't do much more. The ReplicationServer is shuting down.
172 }
173 }
174 replicationServer.shutdown();
175 }
176 catch (UnsupportedEncodingException e)
177 {
178 MessageBuilder mb = new MessageBuilder();
179 mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
180 mb.append(stackTraceToSingleLineString(e));
181 logError(mb.toMessage());
182 replicationServer.shutdown();
183 if (txn != null)
184 {
185 try
186 {
187 txn.abort();
188 } catch (DatabaseException e1)
189 {
190 // can't do much more. The ReplicationServer is shuting down.
191 }
192 }
193 replicationServer.shutdown();
194 }
195 }
196
197
198 /**
199 * Shutdown the database.
200 */
201 public void shutdown()
202 {
203 try
204 {
205 dbCloseLock.writeLock().lock();
206 try
207 {
208 db.close();
209 }
210 finally
211 {
212 dbCloseLock.writeLock().unlock();
213 }
214 }
215 catch (DatabaseException e)
216 {
217 MessageBuilder mb = new MessageBuilder();
218 mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
219 mb.append(stackTraceToSingleLineString(e));
220 logError(mb.toMessage());
221 }
222 }
223
224 /**
225 * Create a cursor that can be used to search or iterate on this
226 * ReplicationServer DB.
227 *
228 * @param changeNumber The ChangeNumber from which the cursor must start.
229 * @throws DatabaseException If a database error prevented the cursor
230 * creation.
231 * @throws Exception if the ReplServerDBCursor creation failed.
232 * @return The ReplServerDBCursor.
233 */
234 public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
235 throws DatabaseException, Exception
236 {
237 return new ReplServerDBCursor(changeNumber);
238 }
239
240 /**
241 * Create a cursor that can be used to delete some record from this
242 * ReplicationServer database.
243 *
244 * @throws DatabaseException If a database error prevented the cursor
245 * creation.
246 * @throws Exception if the ReplServerDBCursor creation failed.
247 *
248 * @return The ReplServerDBCursor.
249 */
250 public ReplServerDBCursor openDeleteCursor()
251 throws DatabaseException, Exception
252 {
253 return new ReplServerDBCursor();
254 }
255
256 private void closeLockedCursor(Cursor cursor)
257 throws DatabaseException
258 {
259 try
260 {
261 if (cursor != null)
262 cursor.close();
263 }
264 finally
265 {
266 dbCloseLock.readLock().unlock();
267 }
268 }
269
270 /**
271 * Read the first Change from the database.
272 * @return the first ChangeNumber.
273 */
274 public ChangeNumber readFirstChange()
275 {
276 Cursor cursor = null;
277 String str = null;
278
279 try
280 {
281 dbCloseLock.readLock().lock();
282 cursor = db.openCursor(null, null);
283 }
284 catch (DatabaseException e1)
285 {
286 dbCloseLock.readLock().unlock();
287 return null;
288 }
289 try
290 {
291 try
292 {
293 DatabaseEntry key = new DatabaseEntry();
294 DatabaseEntry data = new DatabaseEntry();
295 OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
296 if (status != OperationStatus.SUCCESS)
297 {
298 /* database is empty */
299 return null;
300 }
301 try
302 {
303 str = new String(key.getData(), "UTF-8");
304 } catch (UnsupportedEncodingException e)
305 {
306 // never happens
307 }
308 return new ChangeNumber(str);
309 }
310 finally
311 {
312 closeLockedCursor(cursor);
313 }
314 }
315 catch (DatabaseException e)
316 {
317 /* database is faulty */
318 MessageBuilder mb = new MessageBuilder();
319 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
320 mb.append(stackTraceToSingleLineString(e));
321 logError(mb.toMessage());
322 replicationServer.shutdown();
323 return null;
324 }
325 }
326
327 /**
328 * Read the last Change from the database.
329 * @return the last ChangeNumber.
330 */
331 public ChangeNumber readLastChange()
332 {
333 Cursor cursor = null;
334 String str = null;
335
336 try
337 {
338 dbCloseLock.readLock().lock();
339 try
340 {
341 cursor = db.openCursor(null, null);
342 DatabaseEntry key = new DatabaseEntry();
343 DatabaseEntry data = new DatabaseEntry();
344 OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
345 if (status != OperationStatus.SUCCESS)
346 {
347 /* database is empty */
348 return null;
349 }
350 try
351 {
352 str = new String(key.getData(), "UTF-8");
353 }
354 catch (UnsupportedEncodingException e)
355 {
356 // never happens
357 }
358 return new ChangeNumber(str);
359 }
360 finally
361 {
362 closeLockedCursor(cursor);
363 }
364 }
365 catch (DatabaseException e)
366 {
367 MessageBuilder mb = new MessageBuilder();
368 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
369 mb.append(stackTraceToSingleLineString(e));
370 logError(mb.toMessage());
371 replicationServer.shutdown();
372 return null;
373 }
374 }
375
376 /**
377 * {@inheritDoc}
378 */
379 @Override
380 public String toString()
381 {
382 return serverId.toString() + baseDn.toString();
383 }
384
385 /**
386 * This Class implements a cursor that can be used to browse a
387 * replicationServer database.
388 */
389 public class ReplServerDBCursor
390 {
391 private Cursor cursor = null;
392
393 // The transaction that will protect the actions done with the cursor
394 // Will be let null for a read cursor
395 // Will be set non null for a write cursor
396 private Transaction txn = null;
397 DatabaseEntry key = new DatabaseEntry();
398 DatabaseEntry data = new DatabaseEntry();
399
400 /**
401 * Creates a ReplServerDBCursor that can be used for browsing a
402 * replicationServer db.
403 *
404 * @param startingChangeNumber The ChangeNumber from which the cursor must
405 * start.
406 * @throws Exception When the startingChangeNumber does not exist.
407 */
408 private ReplServerDBCursor(ChangeNumber startingChangeNumber)
409 throws Exception
410 {
411 try
412 {
413 // Take the lock. From now on, whatever error that happen in the life
414 // of this cursor should end by unlocking that lock. We must also
415 // unlock it when throwing an exception.
416 dbCloseLock.readLock().lock();
417
418 cursor = db.openCursor(txn, null);
419 if (startingChangeNumber != null)
420 {
421 key = new ReplicationKey(startingChangeNumber);
422 data = new DatabaseEntry();
423
424 if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
425 OperationStatus.SUCCESS)
426 {
427 // We could not move the cursor to the expected startingChangeNumber
428 if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
429 OperationStatus.SUCCESS)
430 {
431 // We could not even move the cursor closed to it => failure
432 throw new Exception("ChangeNumber not available");
433 }
434 else
435 {
436 // We can move close to the startingChangeNumber.
437 // Let's create a cursor from that point.
438 DatabaseEntry key = new DatabaseEntry();
439 DatabaseEntry data = new DatabaseEntry();
440 if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
441 OperationStatus.SUCCESS)
442 {
443 closeLockedCursor(cursor);
444 dbCloseLock.readLock().lock();
445 cursor = db.openCursor(txn, null);
446 }
447 }
448 }
449 }
450 }
451 catch (Exception e)
452 {
453 // Unlocking is required before throwing any exception
454 closeLockedCursor(cursor);
455 throw (e);
456 }
457 }
458
459 private ReplServerDBCursor() throws DatabaseException
460 {
461 try
462 {
463 // We'll go on only if no close or no clear is running
464 dbCloseLock.readLock().lock();
465
466 // Create the transaction that will protect whatever done with this
467 // write cursor.
468 txn = dbenv.beginTransaction();
469
470 cursor = db.openCursor(txn, null);
471 }
472 catch(DatabaseException e)
473 {
474 if (txn != null)
475 {
476 try
477 {
478 txn.abort();
479 }
480 catch (DatabaseException dbe)
481 {}
482 }
483 closeLockedCursor(cursor);
484 throw (e);
485 }
486 }
487
488 /**
489 * Close the ReplicationServer Cursor.
490 */
491 public void close()
492 {
493 try
494 {
495 closeLockedCursor(cursor);
496 cursor = null;
497 }
498 catch (DatabaseException e)
499 {
500 MessageBuilder mb = new MessageBuilder();
501 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
502 mb.append(stackTraceToSingleLineString(e));
503 logError(mb.toMessage());
504 replicationServer.shutdown();
505 }
506 if (txn != null)
507 {
508 try
509 {
510 txn.commit();
511 } catch (DatabaseException e)
512 {
513 MessageBuilder mb = new MessageBuilder();
514 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
515 mb.append(stackTraceToSingleLineString(e));
516 logError(mb.toMessage());
517 replicationServer.shutdown();
518 }
519 }
520 }
521
522 /**
523 * Abort the Cursor after a Deadlock Exception.
524 * This method catch and ignore the DeadlockException because
525 * this must be done when aborting a cursor after a DeadlockException
526 * (per the Cursor documentation).
527 * This should not be used in any other case.
528 */
529 public void abort()
530 {
531 if (cursor == null)
532 return;
533 try
534 {
535 closeLockedCursor(cursor);
536 cursor = null;
537 }
538 catch (DeadlockException e1)
539 {
540 // The DB documentation states that a DeadlockException
541 // on the close method of a cursor that is aborting should
542 // be ignored.
543 }
544 catch (DatabaseException e)
545 {
546 MessageBuilder mb = new MessageBuilder();
547 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
548 mb.append(stackTraceToSingleLineString(e));
549 logError(mb.toMessage());
550 replicationServer.shutdown();
551 }
552 if (txn != null)
553 {
554 try
555 {
556 txn.abort();
557 } catch (DatabaseException e)
558 {
559 MessageBuilder mb = new MessageBuilder();
560 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
561 mb.append(stackTraceToSingleLineString(e));
562 logError(mb.toMessage());
563 replicationServer.shutdown();
564 }
565 }
566 }
567
568 /**
569 * Get the next ChangeNumber in the database from this Cursor.
570 *
571 * @return The next ChangeNumber in the database from this cursor.
572 * @throws DatabaseException In case of underlying database problem.
573 */
574 public ChangeNumber nextChangeNumber() throws DatabaseException
575 {
576 OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
577
578 if (status != OperationStatus.SUCCESS)
579 {
580 return null;
581 }
582 try
583 {
584 String csnString = new String(key.getData(), "UTF-8");
585 return new ChangeNumber(csnString);
586 } catch (UnsupportedEncodingException e)
587 {
588 // can't happen
589 return null;
590 }
591 }
592
593 /**
594 * Get the next UpdateMessage from this cursor.
595 *
596 * @return the next UpdateMessage.
597 */
598 public UpdateMessage next()
599 {
600 UpdateMessage currentChange = null;
601 while (currentChange == null)
602 {
603 try
604 {
605 OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
606 if (status != OperationStatus.SUCCESS)
607 {
608 return null;
609 }
610 } catch (DatabaseException e)
611 {
612 return null;
613 }
614 try {
615 currentChange = ReplicationData.generateChange(data.getData());
616 } catch (Exception e) {
617 /*
618 * An error happening trying to convert the data from the
619 * replicationServer database to an Update Message.
620 * This can only happen if the database is corrupted.
621 * There is not much more that we can do at this point except trying
622 * to continue with the next record.
623 * In such case, it is therefore possible that we miss some changes.
624 * TODO. log an error message.
625 * TODO : REPAIR : Such problem should be handled by the
626 * repair functionality.
627 */
628 }
629 }
630 return currentChange;
631 }
632
633 /**
634 * Delete the record at the current cursor position.
635 *
636 * @throws DatabaseException In case of database problem.
637 */
638 public void delete() throws DatabaseException
639 {
640 cursor.delete();
641 }
642 } // ReplServerDBCursor
643
644 /**
645 * Clears this change DB from the changes it contains.
646 *
647 * @throws Exception Throws an exception it occurs.
648 * @throws DatabaseException Throws a DatabaseException when it occurs.
649 */
650 public void clear() throws Exception, DatabaseException
651 {
652 // The coming users will be blocked until the clear is done
653 dbCloseLock.writeLock().lock();
654 try
655 {
656 String dbName = db.getDatabaseName();
657
658 // Clears the reference to this serverID
659 dbenv.clearServerId(baseDn, serverId);
660
661 // Closing is requested by the Berkeley DB before truncate
662 db.close();
663
664 // Clears the changes
665 dbenv.clearDb(dbName);
666
667 db = null;
668
669 // RE-create the db
670 db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
671 }
672 catch(Exception e)
673 {
674 MessageBuilder mb = new MessageBuilder();
675 mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
676 e.getMessage() + " " +
677 stackTraceToSingleLineString(e)));
678 logError(mb.toMessage());
679 }
680 finally
681 {
682 // Relax the waiting users
683 dbCloseLock.writeLock().unlock();
684 }
685 }
686 }