001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.MessageBuilder;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.messages.ReplicationMessages.*;
032 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
033
034 import java.util.ArrayList;
035 import java.util.Date;
036 import java.util.List;
037 import java.util.LinkedList;
038 import java.util.NoSuchElementException;
039
040 import org.opends.server.admin.std.server.MonitorProviderCfg;
041 import org.opends.server.api.DirectoryThread;
042 import org.opends.server.api.MonitorProvider;
043 import org.opends.server.config.ConfigException;
044 import org.opends.server.types.Attribute;
045 import org.opends.server.types.DN;
046 import org.opends.server.types.InitializationException;
047 import org.opends.server.util.TimeThread;
048 import org.opends.server.core.DirectoryServer;
049 import org.opends.server.replication.common.ChangeNumber;
050 import org.opends.server.replication.protocol.UpdateMessage;
051 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
052
053 import com.sleepycat.je.DatabaseException;
054 import com.sleepycat.je.DeadlockException;
055
056 /**
057 * This class is used for managing the replicationServer database for each
058 * server in the topology.
059 * It is responsible for efficiently saving the updates that is received from
060 * each master server into stable storage.
061 * This class is also able to generate a ReplicationIterator that can be
062 * used to read all changes from a given ChangeNUmber.
063 *
064 * This class publish some monitoring information below cn=monitor.
065 *
066 */
067 public class DbHandler implements Runnable
068 {
069 // The msgQueue holds all the updates not yet saved to stable storage.
070 // This list is only used as a temporary placeholder so that the write
071 // in the stable storage can be grouped for efficiency reason.
072 // Adding an update synchronously add the update to this list.
073 // A dedicated thread loops on flush() and trim().
074 // flush() : get a number of changes from the in memory list by block
075 // and write them to the db.
076 // trim() : deletes from the DB a number of changes that are older than a
077 // certain date.
078 //
079 // Changes are not read back by replicationServer threads that are responsible
080 // for pushing the changes to other replication server or to LDAP server
081 //
082 private final LinkedList<UpdateMessage> msgQueue =
083 new LinkedList<UpdateMessage>();
084 private ReplicationDB db;
085 private ChangeNumber firstChange = null;
086 private ChangeNumber lastChange = null;
087 private short serverId;
088 private DN baseDn;
089 private DbMonitorProvider dbMonitor = new DbMonitorProvider();
090 private boolean shutdown = false;
091 private boolean done = false;
092 private DirectoryThread thread = null;
093 private final Object flushLock = new Object();
094 private ReplicationServer replicationServer;
095
096
097 // The High and low water mark for the max size of the msgQueue.
098 // the threads calling add() method will be blocked if the size of
099 // msgQueue becomes larger than the MSG_QUEUE_HIMARK and will resume
100 // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
101 final static int MSG_QUEUE_HIMARK = 5000;
102 final static int MSG_QUEUE_LOWMARK = 4000;
103
104 // The maximum number of retries in case of DatabaseDeadlock Exception.
105 private static final int DEADLOCK_RETRIES = 10;
106
107 /**
108 *
109 * The trim age in milliseconds. Changes record in the change DB that
110 * are older than this age are removed.
111 *
112 */
113 private long trimage;
114
115 /**
116 * Creates a new dbHandler associated to a given LDAP server.
117 *
118 * @param id Identifier of the DB.
119 * @param baseDn the baseDn for which this DB was created.
120 * @param replicationServer The ReplicationServer that creates this dbHandler.
121 * @param dbenv the Database Env to use to create the ReplicationServer DB.
122 * server for this domain.
123 * @throws DatabaseException If a database problem happened
124 */
125 public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
126 ReplicationDbEnv dbenv)
127 throws DatabaseException
128 {
129 this.replicationServer = replicationServer;
130 this.serverId = id;
131 this.baseDn = baseDn;
132 this.trimage = replicationServer.getTrimage();
133 db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
134 firstChange = db.readFirstChange();
135 lastChange = db.readLastChange();
136 thread = new DirectoryThread(this,
137 "Replication Server db " + id + " " + baseDn);
138 thread.start();
139
140 DirectoryServer.deregisterMonitorProvider(
141 dbMonitor.getMonitorInstanceName());
142 DirectoryServer.registerMonitorProvider(dbMonitor);
143 }
144
145 /**
146 * Add an update to the list of messages that must be saved to the db
147 * managed by this db handler.
148 * This method is blocking if the size of the list of message is larger
149 * than its maximum.
150 *
151 * @param update The update that must be saved to the db managed by this db
152 * handler.
153 */
154 public void add(UpdateMessage update)
155 {
156 synchronized (msgQueue)
157 {
158 int size = msgQueue.size();
159 while (size > MSG_QUEUE_HIMARK)
160 {
161 try
162 {
163 msgQueue.wait(500);
164 } catch (InterruptedException e)
165 {
166 // simply loop to try again.
167 }
168 size = msgQueue.size();
169 }
170
171 msgQueue.add(update);
172 if (lastChange == null || lastChange.older(update.getChangeNumber()))
173 {
174 lastChange = update.getChangeNumber();
175 }
176 if (firstChange == null)
177 firstChange = update.getChangeNumber();
178 }
179 }
180
181 /**
182 * Get some changes out of the message queue of the LDAP server.
183 *
184 * @param number the number of messages to extract.
185 * @return a List containing number changes extracted from the queue.
186 */
187 private List<UpdateMessage> getChanges(int number)
188 {
189 int current = 0;
190 LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>();
191
192 synchronized (msgQueue)
193 {
194 int size = msgQueue.size();
195 while ((current < number) && (current < size))
196 {
197 UpdateMessage msg = msgQueue.get(current);
198 current++;
199 changes.add(msg);
200 }
201 }
202 return changes;
203 }
204
205 /**
206 * Get the firstChange.
207 * @return Returns the firstChange.
208 */
209 public ChangeNumber getFirstChange()
210 {
211 return firstChange;
212 }
213
214 /**
215 * Get the lastChange.
216 * @return Returns the lastChange.
217 */
218 public ChangeNumber getLastChange()
219 {
220 return lastChange;
221 }
222
223 /**
224 * Get the number of changes.
225 *
226 * @return Returns the number of changes.
227 */
228 public long getChangesCount()
229 {
230 try
231 {
232 return lastChange.getSeqnum() - firstChange.getSeqnum() + 1;
233 }
234 catch (Exception e)
235 {
236 return 0;
237 }
238 }
239
240 /**
241 * Generate a new ReplicationIterator that allows to browse the db
242 * managed by this dbHandler and starting at the position defined
243 * by a given changeNumber.
244 *
245 * @param changeNumber The position where the iterator must start.
246 *
247 * @return a new ReplicationIterator that allows to browse the db
248 * managed by this dbHandler and starting at the position defined
249 * by a given changeNumber.
250 *
251 * @throws DatabaseException if a database problem happened.
252 * @throws Exception If there is no other change to push after change
253 * with changeNumber number.
254 */
255 public ReplicationIterator generateIterator(ChangeNumber changeNumber)
256 throws DatabaseException, Exception
257 {
258 /*
259 * When we create an iterator we need to make sure that we
260 * don't miss some changes because the iterator is created
261 * close to the limit of the changed that have not yet been
262 * flushed to the database.
263 * We detect this by comparing the date of the changeNumber where
264 * we want to start with the date of the first ChangeNumber
265 * of the msgQueue.
266 * If this is the case we flush the queue to the database.
267 */
268 ChangeNumber recentChangeNumber = null;
269
270 if (changeNumber == null)
271 flush();
272
273 synchronized (msgQueue)
274 {
275 try
276 {
277 UpdateMessage msg = msgQueue.getFirst();
278 recentChangeNumber = msg.getChangeNumber();
279 }
280 catch (NoSuchElementException e)
281 {}
282 }
283
284 if ( (recentChangeNumber != null) && (changeNumber != null))
285 {
286 if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
287 ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
288 {
289 flush();
290 }
291 }
292
293 ReplicationIterator it =
294 new ReplicationIterator(serverId, db, changeNumber);
295
296 return it;
297 }
298
299 /**
300 * Removes message in a subList of the msgQueue from the msgQueue.
301 *
302 * @param number the number of changes to be removed.
303 */
304 private void clearQueue(int number)
305 {
306 synchronized (msgQueue)
307 {
308 int current = 0;
309 while ((current < number) && (!msgQueue.isEmpty()))
310 {
311 msgQueue.remove();
312 current++;
313 }
314 if (msgQueue.size() < MSG_QUEUE_LOWMARK)
315 msgQueue.notify();
316 }
317 }
318
319 /**
320 * Shutdown this dbHandler.
321 */
322 public void shutdown()
323 {
324 if (shutdown == true)
325 {
326 return;
327 }
328
329 shutdown = true;
330 synchronized (this)
331 {
332 this.notifyAll();
333 }
334
335 synchronized (this)
336 {
337 while (done == false)
338 {
339 try
340 {
341 this.wait();
342 } catch (Exception e)
343 {}
344 }
345 }
346
347 while (msgQueue.size() != 0)
348 flush();
349
350 db.shutdown();
351 DirectoryServer.deregisterMonitorProvider(
352 dbMonitor.getMonitorInstanceName());
353 }
354
355 /**
356 * Run method for this class.
357 * Periodically Flushes the ReplicationServerDomain cache from memory to the
358 * stable storage and trims the old updates.
359 */
360 public void run()
361 {
362 while (shutdown == false)
363 {
364 try {
365 flush();
366 trim();
367
368 synchronized (this)
369 {
370 try
371 {
372 this.wait(1000);
373 } catch (InterruptedException e)
374 { }
375 }
376 } catch (Exception end)
377 {
378 MessageBuilder mb = new MessageBuilder();
379 mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
380 mb.append(stackTraceToSingleLineString(end));
381 logError(mb.toMessage());
382 if (replicationServer != null)
383 replicationServer.shutdown();
384 break;
385 }
386 }
387 // call flush a last time before exiting to make sure that
388 // no change was forgotten in the msgQueue
389 flush();
390
391 synchronized (this)
392 {
393 done = true;
394 this.notifyAll();
395 }
396 }
397
398 /**
399 * Trim old changes from this replicationServer database.
400 * @throws DatabaseException In case of database problem.
401 */
402 private void trim() throws DatabaseException, Exception
403 {
404 if (trimage == 0)
405 return;
406 int size = 0;
407 boolean finished = false;
408 boolean done = false;
409 ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
410 (short) 0, (short)0);
411
412 // In case of deadlock detection by the Database, this thread can
413 // by aborted by a DeadlockException. This is a transient error and
414 // the transaction should be attempted again.
415 // We will try DEADLOCK_RETRIES times before failing.
416 int tries = 0;
417 while ((tries++ < DEADLOCK_RETRIES) && (!done))
418 {
419 /* the trim is done by group in order to save some CPU and IO bandwidth
420 * start the transaction then do a bunch of remove then commit
421 */
422 ReplServerDBCursor cursor;
423 cursor = db.openDeleteCursor();
424
425 try
426 {
427 while ((size < 5000 ) && (!finished))
428 {
429 ChangeNumber changeNumber = cursor.nextChangeNumber();
430 if (changeNumber != null)
431 {
432 if ((!changeNumber.equals(lastChange))
433 && (changeNumber.older(trimDate)))
434 {
435 size++;
436 cursor.delete();
437 }
438 else
439 {
440 firstChange = changeNumber;
441 finished = true;
442 }
443 }
444 else
445 finished = true;
446 }
447 cursor.close();
448 done = true;
449 }
450 catch (DeadlockException e)
451 {
452 cursor.abort();
453 if (tries == DEADLOCK_RETRIES)
454 {
455 // could not handle the Deadlock after DEADLOCK_RETRIES tries.
456 // shutdown the ReplicationServer.
457 shutdown = true;
458 throw (e);
459 }
460 }
461 catch (DatabaseException e)
462 {
463 // mark shutdown for this db so that we don't try again to
464 // stop it from cursor.close() or methods called by cursor.close()
465 shutdown = true;
466 cursor.abort();
467 throw (e);
468 }
469 }
470 }
471
472 /**
473 * Flush a number of updates from the memory list to the stable storage.
474 */
475 private void flush()
476 {
477 int size;
478
479 do
480 {
481 synchronized(flushLock)
482 {
483 // get N messages to save in the DB
484 List<UpdateMessage> changes = getChanges(500);
485
486 // if no more changes to save exit immediately.
487 if ((changes == null) || ((size = changes.size()) == 0))
488 return;
489
490 // save the change to the stable storage.
491 db.addEntries(changes);
492
493 // remove the changes from the list of changes to be saved.
494 clearQueue(changes.size());
495 }
496 } while (size >=500);
497 }
498
499 /**
500 * This internal class is used to implement the Monitoring capabilities
501 * of the dbHandler.
502 */
503 private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
504 {
505 private DbMonitorProvider()
506 {
507 super("ReplicationServer Database");
508 }
509
510 /**
511 * {@inheritDoc}
512 */
513 @Override
514 public ArrayList<Attribute> getMonitorData()
515 {
516 ArrayList<Attribute> attributes = new ArrayList<Attribute>();
517 attributes.add(new Attribute("replicationServer-database",
518 String.valueOf(serverId)));
519 attributes.add(new Attribute("base-dn", baseDn.toString()));
520 if (firstChange != null)
521 {
522 Date firstTime = new Date(firstChange.getTime());
523 attributes.add(new Attribute("first-change",
524 firstChange.toString() + " " + firstTime.toString()));
525 }
526 if (lastChange != null)
527 {
528 Date lastTime = new Date(lastChange.getTime());
529 attributes.add(new Attribute("last-change",
530 lastChange.toString() + " " + lastTime.toString()));
531 }
532
533 return attributes;
534 }
535
536 /**
537 * {@inheritDoc}
538 */
539 @Override
540 public String getMonitorInstanceName()
541 {
542 return "ReplicationServer database " + baseDn.toString() +
543 " " + String.valueOf(serverId);
544 }
545
546 /**
547 * {@inheritDoc}
548 */
549 @Override
550 public long getUpdateInterval()
551 {
552 /* we don't wont to do polling on this monitor */
553 return 0;
554 }
555
556 /**
557 * {@inheritDoc}
558 */
559 @Override
560 public void initializeMonitorProvider(MonitorProviderCfg configuration)
561 throws ConfigException,InitializationException
562 {
563 // Nothing to do for now
564 }
565
566 /**
567 * {@inheritDoc}
568 */
569 @Override
570 public void updateMonitorData()
571 {
572 // As long as getUpdateInterval() returns 0, this will never get called
573 }
574 }
575
576 /**
577 * {@inheritDoc}
578 */
579 @Override
580 public String toString()
581 {
582 return(baseDn + " " + serverId + " " + firstChange + " " + lastChange);
583 }
584
585 /**
586 * Set the Purge delay for this db Handler.
587 * @param delay The purge delay in Milliseconds.
588 */
589 public void setPurgeDelay(long delay)
590 {
591 trimage = delay;
592 }
593
594 /**
595 * Clear the changes from this DB (from both memory cache and DB storage).
596 * @throws DatabaseException When an exception occurs while removing the
597 * changes from the DB.
598 * @throws Exception When an exception occurs while accessing a resource
599 * from the DB.
600 *
601 */
602 public void clear() throws DatabaseException, Exception
603 {
604 synchronized(flushLock)
605 {
606 msgQueue.clear();
607 }
608 db.clear();
609 firstChange = db.readFirstChange();
610 lastChange = db.readLastChange();
611 }
612 }