001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.Message;
029 import org.opends.messages.MessageBuilder;
030
031 import static org.opends.server.loggers.debug.DebugLogger.*;
032
033 import org.opends.server.loggers.debug.DebugTracer;
034 import static org.opends.server.loggers.ErrorLogger.logError;
035 import static org.opends.messages.ReplicationMessages.*;
036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037
038 import java.io.IOException;
039 import java.util.ArrayList;
040 import java.util.LinkedHashSet;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.Set;
044 import java.util.concurrent.ConcurrentHashMap;
045 import java.util.concurrent.Semaphore;
046 import java.util.concurrent.TimeUnit;
047 import java.util.Iterator;
048
049 import org.opends.server.replication.common.ChangeNumber;
050 import org.opends.server.replication.common.ServerState;
051 import org.opends.server.replication.protocol.AckMessage;
052 import org.opends.server.replication.protocol.ErrorMessage;
053 import org.opends.server.replication.protocol.RoutableMessage;
054 import org.opends.server.replication.protocol.UpdateMessage;
055 import org.opends.server.replication.protocol.ReplServerInfoMessage;
056 import org.opends.server.replication.protocol.MonitorMessage;
057 import org.opends.server.replication.protocol.MonitorRequestMessage;
058 import org.opends.server.replication.protocol.ResetGenerationId;
059 import org.opends.server.types.DN;
060 import org.opends.server.types.DirectoryException;
061 import org.opends.server.types.ResultCode;
062 import org.opends.server.util.TimeThread;
063 import com.sleepycat.je.DatabaseException;
064
065 /**
066 * This class define an in-memory cache that will be used to store
067 * the messages that have been received from an LDAP server or
068 * from another replication server and that should be forwarded to
069 * other servers.
070 *
071 * The size of the cache is set by configuration.
072 * If the cache becomes bigger than the configured size, the older messages
073 * are removed and should they be needed again must be read from the backing
074 * file
075 *
076 *
077 * it runs a thread that is responsible for saving the messages
078 * received to the disk and for trimming them
079 * Decision to trim can be based on disk space or age of the message
080 */
081 public class ReplicationServerDomain
082 {
083 private final Object flowControlLock = new Object();
084 private final DN baseDn;
085
086 /*
087 * The following map contains one balanced tree for each replica ID
088 * to which we are currently publishing
089 * the first update in the balanced tree is the next change that we
090 * must push to this particular server
091 *
092 * We add new TreeSet in the HashMap when a new server register
093 * to this replication server.
094 *
095 */
096 private final Map<Short, ServerHandler> connectedServers =
097 new ConcurrentHashMap<Short, ServerHandler>();
098
099 /*
100 * This map contains one ServerHandler for each replication servers
101 * with which we are connected (so normally all the replication servers)
102 * the first update in the balanced tree is the next change that we
103 * must push to this particular server
104 *
105 * We add new TreeSet in the HashMap when a new replication server register
106 * to this replication server.
107 */
108
109 private final Map<Short, ServerHandler> replicationServers =
110 new ConcurrentHashMap<Short, ServerHandler>();
111
112 /*
113 * This map contains the List of updates received from each
114 * LDAP server
115 */
116 private final Map<Short, DbHandler> sourceDbHandlers =
117 new ConcurrentHashMap<Short, DbHandler>();
118 private ReplicationServer replicationServer;
119
120 /* GenerationId management */
121 private long generationId = -1;
122 private boolean generationIdSavedStatus = false;
123
124 /**
125 * The tracer object for the debug logger.
126 */
127 private static final DebugTracer TRACER = getTracer();
128
129 /* Monitor data management */
130
131 // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
132 private long monitorDataLifeTime = 500;
133
134 /* Search op on monitor data is processed by a worker thread.
135 * Requests are sent to the other RS,and responses are received by the
136 * listener threads.
137 * The worker thread is awoke on this semaphore, or on timeout.
138 */
139 Semaphore remoteMonitorResponsesSemaphore;
140
141 /**
142 * The monitor data consolidated over the topology.
143 */
144 private MonitorData monitorData = new MonitorData();
145 private MonitorData wrkMonitorData;
146
147 /**
148 * Creates a new ReplicationServerDomain associated to the DN baseDn.
149 *
150 * @param baseDn The baseDn associated to the ReplicationServerDomain.
151 * @param replicationServer the ReplicationServer that created this
152 * replicationServer cache.
153 */
154 public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
155 {
156 this.baseDn = baseDn;
157 this.replicationServer = replicationServer;
158 }
159
160 /**
161 * Add an update that has been received to the list of
162 * updates that must be forwarded to all other servers.
163 *
164 * @param update The update that has been received.
165 * @param sourceHandler The ServerHandler for the server from which the
166 * update was received
167 * @throws IOException When an IO exception happens during the update
168 * processing.
169 */
170 public void put(UpdateMessage update, ServerHandler sourceHandler)
171 throws IOException
172 {
173 /*
174 * TODO : In case that the source server is a LDAP server this method
175 * should check that change did get pushed to at least one
176 * other replication server before pushing it to the LDAP servers
177 */
178
179 short id = update.getChangeNumber().getServerId();
180 sourceHandler.updateServerState(update);
181 sourceHandler.incrementInCount();
182
183 if (update.isAssured())
184 {
185 int count = this.NumServers();
186 if (count > 1)
187 {
188 if (sourceHandler.isReplicationServer())
189 ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
190 this, count - 1);
191 else
192 sourceHandler.addWaitingAck(update, count - 1);
193 }
194 else
195 {
196 sourceHandler.sendAck(update.getChangeNumber());
197 }
198 }
199
200 if (generationId < 0)
201 {
202 generationId = sourceHandler.getGenerationId();
203 }
204
205 // look for the dbHandler that is responsible for the LDAP server which
206 // generated the change.
207 DbHandler dbHandler = null;
208 synchronized (sourceDbHandlers)
209 {
210 dbHandler = sourceDbHandlers.get(id);
211 if (dbHandler == null)
212 {
213 try
214 {
215 dbHandler = replicationServer.newDbHandler(id, baseDn);
216 generationIdSavedStatus = true;
217 }
218 catch (DatabaseException e)
219 {
220 /*
221 * Because of database problem we can't save any more changes
222 * from at least one LDAP server.
223 * This replicationServer therefore can't do it's job properly anymore
224 * and needs to close all its connections and shutdown itself.
225 */
226 MessageBuilder mb = new MessageBuilder();
227 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
228 mb.append(stackTraceToSingleLineString(e));
229 logError(mb.toMessage());
230 replicationServer.shutdown();
231 return;
232 }
233 sourceDbHandlers.put(id, dbHandler);
234 }
235 }
236
237 // Publish the messages to the source handler
238 dbHandler.add(update);
239
240
241 /*
242 * Push the message to the replication servers
243 */
244 if (!sourceHandler.isReplicationServer())
245 {
246 for (ServerHandler handler : replicationServers.values())
247 {
248 handler.add(update, sourceHandler);
249 }
250 }
251
252 /*
253 * Push the message to the LDAP servers
254 */
255 for (ServerHandler handler : connectedServers.values())
256 {
257 // don't forward the change to the server that just sent it
258 if (handler == sourceHandler)
259 {
260 continue;
261 }
262
263 handler.add(update, sourceHandler);
264 }
265
266 }
267
268 /**
269 * Wait a short while for ServerId disconnection.
270 *
271 * @param serverId the serverId to be checked.
272 */
273 public void waitDisconnection(short serverId)
274 {
275 if (connectedServers.containsKey(serverId))
276 {
277 // try again
278 try
279 {
280 Thread.sleep(100);
281 } catch (InterruptedException e)
282 {
283 }
284 }
285 }
286
287 /**
288 * Create initialize context necessary for finding the changes
289 * that must be sent to a given LDAP or replication server.
290 *
291 * @param handler handler for the server that must be started
292 * @throws Exception when method has failed
293 * @return A boolean indicating if the start was successfull.
294 */
295 public boolean startServer(ServerHandler handler) throws Exception
296 {
297 /*
298 * create the balanced tree that will be used to forward changes
299 */
300 synchronized (connectedServers)
301 {
302 ServerHandler oldHandler = connectedServers.get(handler.getServerId());
303
304 if (connectedServers.containsKey(handler.getServerId()))
305 {
306 // looks like two LDAP servers have the same serverId
307 // log an error message and drop this connection.
308 Message message = ERR_DUPLICATE_SERVER_ID.get(
309 oldHandler.toString(), handler.toString(), handler.getServerId());
310 logError(message);
311 return false;
312 }
313 connectedServers.put(handler.getServerId(), handler);
314
315 // It can be that the server that connects here is the
316 // first server connected for a domain.
317 // In that case, we will establish the appriopriate connections
318 // to the other repl servers for this domain and receive
319 // their ReplServerInfo messages.
320 // FIXME: Is it necessary to end this above processing BEFORE listening
321 // to incoming messages for that domain ? But the replica
322 // would raise Read Timeout for replica that connects.
323
324 // Update the remote replication servers with our list
325 // of connected LDAP servers
326 sendReplServerInfo();
327
328 return true;
329 }
330 }
331
332 /**
333 * Stop operations with a given server.
334 *
335 * @param handler the server for which we want to stop operations
336 */
337 public void stopServer(ServerHandler handler)
338 {
339 if (debugEnabled())
340 TRACER.debugInfo(
341 "In RS " + this.replicationServer.getMonitorInstanceName() +
342 " for " + baseDn + " " +
343 " stopServer " + handler.getMonitorInstanceName());
344
345
346 if (handler.isReplicationServer())
347 {
348 if (replicationServers.containsValue(handler))
349 {
350 replicationServers.remove(handler.getServerId());
351 handler.stopHandler();
352
353 // Update the remote replication servers with our list
354 // of connected LDAP servers
355 sendReplServerInfo();
356 }
357 }
358 else
359 {
360 if (connectedServers.containsValue(handler))
361 {
362 connectedServers.remove(handler.getServerId());
363 handler.stopHandler();
364
365 // Update the remote replication servers with our list
366 // of connected LDAP servers
367 sendReplServerInfo();
368 }
369 }
370 }
371
372 /**
373 * Resets the generationId for this domain if there is no LDAP
374 * server currently connected and if the generationId has never
375 * been saved.
376 */
377 protected void mayResetGenerationId()
378 {
379 if (debugEnabled())
380 TRACER.debugInfo(
381 "In RS " + this.replicationServer.getMonitorInstanceName() +
382 " for " + baseDn + " " +
383 " mayResetGenerationId generationIdSavedStatus=" +
384 generationIdSavedStatus);
385
386 // If there is no more any LDAP server connected to this domain in the
387 // topology and the generationId has never been saved, then we can reset
388 // it and the next LDAP server to connect will become the new reference.
389 boolean lDAPServersConnectedInTheTopology = false;
390 if (connectedServers.isEmpty())
391 {
392 for (ServerHandler rsh : replicationServers.values())
393 {
394 if (generationId != rsh.getGenerationId())
395 {
396 if (debugEnabled())
397 TRACER.debugInfo(
398 "In RS " + this.replicationServer.getMonitorInstanceName() +
399 " for " + baseDn + " " +
400 " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
401 " thas different genId");
402 }
403 else
404 {
405 if (rsh.hasRemoteLDAPServers())
406 {
407 lDAPServersConnectedInTheTopology = true;
408
409 if (debugEnabled())
410 TRACER.debugInfo(
411 "In RS " + this.replicationServer.getMonitorInstanceName() +
412 " for " + baseDn + " " +
413 " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
414 " has servers connected to it - will not reset generationId");
415 }
416 }
417 }
418 }
419 else
420 {
421 lDAPServersConnectedInTheTopology = true;
422 if (debugEnabled())
423 TRACER.debugInfo(
424 "In RS " + this.replicationServer.getMonitorInstanceName() +
425 " for " + baseDn + " " +
426 " has servers connected to it - will not reset generationId");
427 }
428
429 if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus)
430 && (generationId != -1))
431 {
432 setGenerationId(-1, false);
433 }
434 }
435
436 /**
437 * Create initialize context necessary for finding the changes
438 * that must be sent to a given replication server.
439 *
440 * @param handler the server ID to which we want to forward changes
441 * @throws Exception in case of errors
442 * @return A boolean indicating if the start was successfull.
443 */
444 public boolean startReplicationServer(ServerHandler handler) throws Exception
445 {
446 /*
447 * create the balanced tree that will be used to forward changes
448 */
449 synchronized (replicationServers)
450 {
451 ServerHandler oldHandler = replicationServers.get(handler.getServerId());
452 if ((oldHandler != null))
453 {
454 if (oldHandler.getServerAddressURL().equals(
455 handler.getServerAddressURL()))
456 {
457 // this is the same server, this means that our ServerStart messages
458 // have been sent at about the same time and 2 connections
459 // have been established.
460 // Silently drop this connection.
461 }
462 else
463 {
464 // looks like two replication servers have the same serverId
465 // log an error message and drop this connection.
466 Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.
467 get(oldHandler.getServerAddressURL(),
468 handler.getServerAddressURL(), handler.getServerId());
469 logError(message);
470 }
471 return false;
472 }
473 replicationServers.put(handler.getServerId(), handler);
474
475 // Update this server with the list of LDAP servers
476 // already connected
477 handler.sendInfo(
478 new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
479
480 return true;
481 }
482 }
483
484 /**
485 * Get the next update that need to be sent to a given LDAP server.
486 * This call is blocking when no update is available or when dependencies
487 * do not allow to send the next available change
488 *
489 * @param handler The server handler for the target directory server.
490 *
491 * @return the update that must be forwarded
492 */
493 public UpdateMessage take(ServerHandler handler)
494 {
495 UpdateMessage msg;
496 /*
497 * Get the balanced tree that we use to sort the changes to be
498 * sent to the replica from the cookie
499 *
500 * The next change to send is always the first one in the tree
501 * So this methods simply need to check that dependencies are OK
502 * and update this replicaId RUV
503 *
504 * TODO : dependency :
505 * before forwarding change, we should check that the dependency
506 * that is indicated in this change is OK (change already in the RUV)
507 */
508 msg = handler.take();
509 synchronized (flowControlLock)
510 {
511 if (handler.restartAfterSaturation(null))
512 flowControlLock.notifyAll();
513 }
514 return msg;
515 }
516
517 /**
518 * Return a Set of String containing the lists of Replication servers
519 * connected to this server.
520 * @return the set of connected servers
521 */
522 public Set<String> getChangelogs()
523 {
524 LinkedHashSet<String> mySet = new LinkedHashSet<String>();
525
526 for (ServerHandler handler : replicationServers.values())
527 {
528 mySet.add(handler.getServerAddressURL());
529 }
530
531 return mySet;
532 }
533
534
535 /**
536 * Return a Set containing the servers known by this replicationServer.
537 * @return a set containing the servers known by this replicationServer.
538 */
539 public Set<Short> getServers()
540 {
541 return sourceDbHandlers.keySet();
542 }
543
544 /**
545 * Returns as a set of String the list of LDAP servers connected to us.
546 * Each string is the serverID of a connected LDAP server.
547 *
548 * @return The set of connected LDAP servers
549 */
550 public List<String> getConnectedLDAPservers()
551 {
552 List<String> mySet = new ArrayList<String>(0);
553
554 for (ServerHandler handler : connectedServers.values())
555 {
556 mySet.add(String.valueOf(handler.getServerId()));
557 }
558 return mySet;
559 }
560
561 /**
562 * Creates and returns an iterator.
563 * When the iterator is not used anymore, the caller MUST call the
564 * ReplicationIterator.releaseCursor() method to free the ressources
565 * and locks used by the ReplicationIterator.
566 *
567 * @param serverId Identifier of the server for which the iterator is created.
568 * @param changeNumber Starting point for the iterator.
569 * @return the created ReplicationIterator. Null when no DB is available
570 * for the provided server Id.
571 */
572 public ReplicationIterator getChangelogIterator(short serverId,
573 ChangeNumber changeNumber)
574 {
575 DbHandler handler = sourceDbHandlers.get(serverId);
576 if (handler == null)
577 return null;
578
579 try
580 {
581 return handler.generateIterator(changeNumber);
582 }
583 catch (Exception e)
584 {
585 return null;
586 }
587 }
588
589 /**
590 * Returns the change count for that ReplicationServerDomain.
591 *
592 * @return the change count.
593 */
594 public long getChangesCount()
595 {
596 long entryCount = 0;
597 for (DbHandler dbHandler : sourceDbHandlers.values())
598 {
599 entryCount += dbHandler.getChangesCount();
600 }
601 return entryCount;
602 }
603
604 /**
605 * Get the baseDn.
606 * @return Returns the baseDn.
607 */
608 public DN getBaseDn()
609 {
610 return baseDn;
611 }
612
613 /**
614 * Sets the provided DbHandler associated to the provided serverId.
615 *
616 * @param serverId the serverId for the server to which is
617 * associated the Dbhandler.
618 * @param dbHandler the dbHandler associated to the serverId.
619 *
620 * @throws DatabaseException If a database error happened.
621 */
622 public void setDbHandler(short serverId, DbHandler dbHandler)
623 throws DatabaseException
624 {
625 synchronized (sourceDbHandlers)
626 {
627 sourceDbHandlers.put(serverId , dbHandler);
628 }
629 }
630
631 /**
632 * Get the number of currently connected servers.
633 *
634 * @return the number of currently connected servers.
635 */
636 private int NumServers()
637 {
638 return replicationServers.size() + connectedServers.size();
639 }
640
641
642 /**
643 * Add an ack to the list of ack received for a given change.
644 *
645 * @param message The ack message received.
646 * @param fromServerId The identifier of the server that sent the ack.
647 */
648 public void ack(AckMessage message, short fromServerId)
649 {
650 /*
651 * there are 2 possible cases here :
652 * - the message that was acked comes from a server to which
653 * we are directly connected.
654 * In this case, we can find the handler from the connectedServers map
655 * - the message that was acked comes from a server to which we are not
656 * connected.
657 * In this case we need to find the replication server that forwarded
658 * the change and send back the ack to this server.
659 */
660 ServerHandler handler = connectedServers.get(
661 message.getChangeNumber().getServerId());
662 if (handler != null)
663 handler.ack(message, fromServerId);
664 else
665 {
666 ServerHandler.ackChangelog(message, fromServerId);
667 }
668 }
669
670 /**
671 * Retrieves the destination handlers for a routable message.
672 *
673 * @param msg The message to route.
674 * @param senderHandler The handler of the server that published this message.
675 * @return The list of destination handlers.
676 */
677 protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
678 ServerHandler senderHandler)
679 {
680 List<ServerHandler> servers =
681 new ArrayList<ServerHandler>();
682
683 if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
684 {
685 // TODO Import from the "closest server" to be implemented
686 }
687 else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
688 {
689 if (!senderHandler.isReplicationServer())
690 {
691 // Send to all replication servers with a least one remote
692 // server connected
693 for (ServerHandler rsh : replicationServers.values())
694 {
695 if (rsh.hasRemoteLDAPServers())
696 {
697 servers.add(rsh);
698 }
699 }
700 }
701
702 // Sends to all connected LDAP servers
703 for (ServerHandler destinationHandler : connectedServers.values())
704 {
705 // Don't loop on the sender
706 if (destinationHandler == senderHandler)
707 continue;
708 servers.add(destinationHandler);
709 }
710 }
711 else
712 {
713 // Destination is one server
714 ServerHandler destinationHandler =
715 connectedServers.get(msg.getDestination());
716 if (destinationHandler != null)
717 {
718 servers.add(destinationHandler);
719 }
720 else
721 {
722 // the targeted server is NOT connected
723 // Let's search for THE changelog server that MAY
724 // have the targeted server connected.
725 if (senderHandler.isLDAPserver())
726 {
727 for (ServerHandler h : replicationServers.values())
728 {
729 // Send to all replication servers with a least one remote
730 // server connected
731 if (h.isRemoteLDAPServer(msg.getDestination()))
732 {
733 servers.add(h);
734 }
735 }
736 }
737 }
738 }
739 return servers;
740 }
741
742 /**
743 * Processes a message coming from one server in the topology
744 * and potentially forwards it to one or all other servers.
745 *
746 * @param msg The message received and to be processed.
747 * @param senderHandler The server handler of the server that emitted
748 * the message.
749 */
750 public void process(RoutableMessage msg, ServerHandler senderHandler)
751 {
752
753 // Test the message for which a ReplicationServer is expected
754 // to be the destination
755 if (msg.getDestination() == this.replicationServer.getServerId())
756 {
757 if (msg instanceof ErrorMessage)
758 {
759 ErrorMessage errorMsg = (ErrorMessage)msg;
760 logError(ERR_ERROR_MSG_RECEIVED.get(
761 errorMsg.getDetails()));
762 }
763 else if (msg instanceof MonitorRequestMessage)
764 {
765 MonitorRequestMessage replServerMonitorRequestMsg =
766 (MonitorRequestMessage) msg;
767
768 MonitorMessage monitorMsg =
769 new MonitorMessage(
770 replServerMonitorRequestMsg.getDestination(),
771 replServerMonitorRequestMsg.getsenderID());
772
773 // Populate for each connected LDAP Server
774 // from the states stored in the serverHandler.
775 // - the server state
776 // - the older missing change
777 for (ServerHandler lsh : this.connectedServers.values())
778 {
779 monitorMsg.setServerState(
780 lsh.getServerId(),
781 lsh.getServerState(),
782 lsh.getApproxFirstMissingDate(),
783 true);
784 }
785
786 // Same for the connected RS
787 for (ServerHandler rsh : this.replicationServers.values())
788 {
789 monitorMsg.setServerState(
790 rsh.getServerId(),
791 rsh.getServerState(),
792 rsh.getApproxFirstMissingDate(),
793 false);
794 }
795
796 // Populate the RS state in the msg from the DbState
797 monitorMsg.setReplServerDbState(this.getDbServerState());
798
799
800 try
801 {
802 senderHandler.send(monitorMsg);
803 }
804 catch(Exception e)
805 {
806 // We log the error. The requestor will detect a timeout or
807 // any other failure on the connection.
808 logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
809 Short.toString((msg.getDestination()))));
810 }
811 }
812 else if (msg instanceof MonitorMessage)
813 {
814 MonitorMessage monitorMsg =
815 (MonitorMessage) msg;
816
817 receivesMonitorDataResponse(monitorMsg);
818 }
819 else
820 {
821 logError(NOTE_ERR_ROUTING_TO_SERVER.get(
822 msg.getClass().getCanonicalName()));
823 }
824 return;
825 }
826
827 List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
828
829 if (servers.isEmpty())
830 {
831 MessageBuilder mb = new MessageBuilder();
832 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
833 mb.append(" In Replication Server=" + this.replicationServer.
834 getMonitorInstanceName());
835 mb.append(" domain =" + this.baseDn);
836 mb.append(" unroutable message =" + msg.toString());
837 mb.append(" routing table is empty");
838 ErrorMessage errMsg = new ErrorMessage(
839 this.replicationServer.getServerId(),
840 msg.getsenderID(),
841 mb.toMessage());
842 logError(mb.toMessage());
843 try
844 {
845 senderHandler.send(errMsg);
846 }
847 catch(IOException ioe)
848 {
849 // TODO Handle error properly (sender timeout in addition)
850 /*
851 * An error happened trying to send an error msg to this server.
852 * Log an error and close the connection to this server.
853 */
854 MessageBuilder mb2 = new MessageBuilder();
855 mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
856 mb2.append(stackTraceToSingleLineString(ioe));
857 logError(mb2.toMessage());
858 senderHandler.shutdown();
859 }
860 }
861 else
862 {
863 for (ServerHandler targetHandler : servers)
864 {
865 try
866 {
867 targetHandler.send(msg);
868 }
869 catch(IOException ioe)
870 {
871 /*
872 * An error happened trying the send a routabled message
873 * to its destination server.
874 * Send back an error to the originator of the message.
875 */
876 MessageBuilder mb = new MessageBuilder();
877 mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
878 mb.append(stackTraceToSingleLineString(ioe));
879 mb.append(" ");
880 mb.append(msg.getClass().getCanonicalName());
881 logError(mb.toMessage());
882
883 MessageBuilder mb1 = new MessageBuilder();
884 mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
885 mb1.append("serverID:" + msg.getDestination());
886 ErrorMessage errMsg = new ErrorMessage(
887 msg.getsenderID(), mb1.toMessage());
888 try
889 {
890 senderHandler.send(errMsg);
891 }
892 catch(IOException ioe1)
893 {
894 // an error happened on the sender session trying to recover
895 // from an error on the receiver session.
896 // We don't have much solution left beside closing the sessions.
897 senderHandler.shutdown();
898 targetHandler.shutdown();
899 }
900 // TODO Handle error properly (sender timeout in addition)
901 }
902 }
903 }
904
905 }
906
907 /**
908 * Send back an ack to the server that sent the change.
909 *
910 * @param changeNumber The ChangeNumber of the change that must be acked.
911 * @param isLDAPserver This boolean indicates if the server that sent the
912 * change was an LDAP server or a ReplicationServer.
913 */
914 public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
915 {
916 short serverId = changeNumber.getServerId();
917 sendAck(changeNumber, isLDAPserver, serverId);
918 }
919
920 /**
921 *
922 * Send back an ack to a server that sent the change.
923 *
924 * @param changeNumber The ChangeNumber of the change that must be acked.
925 * @param isLDAPserver This boolean indicates if the server that sent the
926 * change was an LDAP server or a ReplicationServer.
927 * @param serverId The identifier of the server from which we
928 * received the change..
929 */
930 public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
931 short serverId)
932 {
933 ServerHandler handler;
934 if (isLDAPserver)
935 handler = connectedServers.get(serverId);
936 else
937 handler = replicationServers.get(serverId);
938
939 // TODO : check for null handler and log error
940 try
941 {
942 handler.sendAck(changeNumber);
943 } catch (IOException e)
944 {
945 /*
946 * An error happened trying the send back an ack to this server.
947 * Log an error and close the connection to this server.
948 */
949 MessageBuilder mb = new MessageBuilder();
950 mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
951 mb.append(stackTraceToSingleLineString(e));
952 logError(mb.toMessage());
953 handler.shutdown();
954 }
955 }
956
957 /**
958 * Shutdown this ReplicationServerDomain.
959 */
960 public void shutdown()
961 {
962 // Close session with other changelogs
963 for (ServerHandler serverHandler : replicationServers.values())
964 {
965 serverHandler.shutdown();
966 }
967
968 // Close session with other LDAP servers
969 for (ServerHandler serverHandler : connectedServers.values())
970 {
971 serverHandler.shutdown();
972 }
973
974 // Shutdown the dbHandlers
975 synchronized (sourceDbHandlers)
976 {
977 for (DbHandler dbHandler : sourceDbHandlers.values())
978 {
979 dbHandler.shutdown();
980 }
981 sourceDbHandlers.clear();
982 }
983 }
984
985 /**
986 * Returns the ServerState describing the last change from this replica.
987 *
988 * @return The ServerState describing the last change from this replica.
989 */
990 public ServerState getDbServerState()
991 {
992 ServerState serverState = new ServerState();
993 for (DbHandler db : sourceDbHandlers.values())
994 {
995 serverState.update(db.getLastChange());
996 }
997 return serverState;
998 }
999
1000 /**
1001 * {@inheritDoc}
1002 */
1003 @Override
1004 public String toString()
1005 {
1006 return "ReplicationServerDomain " + baseDn;
1007 }
1008
1009 /**
1010 * Check if some server Handler should be removed from flow control state.
1011 * @throws IOException If an error happened.
1012 */
1013 public void checkAllSaturation() throws IOException
1014 {
1015 for (ServerHandler handler : replicationServers.values())
1016 {
1017 handler.checkWindow();
1018 }
1019
1020 for (ServerHandler handler : connectedServers.values())
1021 {
1022 handler.checkWindow();
1023 }
1024 }
1025
1026 /**
1027 * Check if a server that was in flow control can now restart
1028 * sending updates.
1029 * @param sourceHandler The server that must be checked.
1030 * @return true if the server can restart sending changes.
1031 * false if the server can't restart sending changes.
1032 */
1033 public boolean restartAfterSaturation(ServerHandler sourceHandler)
1034 {
1035 for (ServerHandler handler : replicationServers.values())
1036 {
1037 if (!handler.restartAfterSaturation(sourceHandler))
1038 return false;
1039 }
1040
1041 for (ServerHandler handler : connectedServers.values())
1042 {
1043 if (!handler.restartAfterSaturation(sourceHandler))
1044 return false;
1045 }
1046 return true;
1047 }
1048
1049 /**
1050 * Send a ReplServerInfoMessage to all the connected replication servers
1051 * in order to let them know our connected LDAP servers.
1052 */
1053 private void sendReplServerInfo()
1054 {
1055 ReplServerInfoMessage info =
1056 new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
1057 for (ServerHandler handler : replicationServers.values())
1058 {
1059 try
1060 {
1061 handler.sendInfo(info);
1062 }
1063 catch (IOException e)
1064 {
1065 /*
1066 * An error happened trying the send back an ack to this server.
1067 * Log an error and close the connection to this server.
1068 */
1069 MessageBuilder mb = new MessageBuilder();
1070 mb.append(ERR_CHANGELOG_ERROR_SENDING_INFO.get(this.toString()));
1071 mb.append(stackTraceToSingleLineString(e));
1072 logError(mb.toMessage());
1073 handler.shutdown();
1074 }
1075 }
1076 }
1077
1078 /**
1079 * Get the generationId associated to this domain.
1080 *
1081 * @return The generationId
1082 */
1083 public long getGenerationId()
1084 {
1085 return generationId;
1086 }
1087
1088 /**
1089 * Get the generationId saved status.
1090 *
1091 * @return The generationId saved status.
1092 */
1093 public boolean getGenerationIdSavedStatus()
1094 {
1095 return generationIdSavedStatus;
1096 }
1097
1098 /**
1099 * Sets the provided value as the new in memory generationId.
1100 *
1101 * @param generationId The new value of generationId.
1102 * @param savedStatus The saved status of the generationId.
1103 */
1104 synchronized public void setGenerationId(long generationId,
1105 boolean savedStatus)
1106 {
1107 if (debugEnabled())
1108 TRACER.debugInfo(
1109 "In " + this.replicationServer.getMonitorInstanceName() +
1110 " baseDN=" + baseDn +
1111 " RCache.set GenerationId=" + generationId);
1112
1113 if (this.generationId != generationId)
1114 {
1115 // we are changing of genId
1116 clearDbs();
1117
1118 this.generationId = generationId;
1119 this.generationIdSavedStatus = savedStatus;
1120
1121 // they have a generationId different from the reference one
1122 for (ServerHandler handler : connectedServers.values())
1123 {
1124 if (generationId != handler.getGenerationId())
1125 {
1126 // Notify our remote LS that from now on have a different genID
1127 handler.warnBadGenerationId();
1128 }
1129 }
1130 }
1131 }
1132
1133 /**
1134 * Resets the generationID.
1135 *
1136 * @param senderHandler The handler associated to the server
1137 * that requested to reset the generationId.
1138 * @param genIdMsg The reset generation ID msg received.
1139 */
1140 public void resetGenerationId(ServerHandler senderHandler,
1141 ResetGenerationId genIdMsg)
1142 {
1143 long newGenId = genIdMsg.getGenerationId();
1144
1145 if (newGenId != this.generationId)
1146 {
1147 this.setGenerationId(newGenId, false);
1148 }
1149
1150 // If we are the first replication server warned,
1151 // then forwards the reset message to the remote replication servers
1152 for (ServerHandler rsHandler : replicationServers.values())
1153 {
1154 try
1155 {
1156 // After we'll have send the mssage , the remote RS will adopt
1157 // the new genId
1158 rsHandler.setGenerationId(newGenId);
1159 if (senderHandler.isLDAPserver())
1160 {
1161 rsHandler.forwardGenerationIdToRS(genIdMsg);
1162 }
1163 }
1164 catch (IOException e)
1165 {
1166 logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
1167 get(rsHandler.getMonitorInstanceName()));
1168 }
1169 }
1170 }
1171
1172 /**
1173 * Clears the Db associated with that cache.
1174 */
1175 public void clearDbs()
1176 {
1177 // Reset the localchange and state db for the current domain
1178 synchronized (sourceDbHandlers)
1179 {
1180 for (DbHandler dbHandler : sourceDbHandlers.values())
1181 {
1182 try
1183 {
1184 dbHandler.clear();
1185 }
1186 catch (Exception e)
1187 {
1188 // TODO: i18n
1189 MessageBuilder mb = new MessageBuilder();
1190 mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
1191 e.getMessage() + " " +
1192 stackTraceToSingleLineString(e)));
1193 logError(mb.toMessage());
1194 }
1195 }
1196 sourceDbHandlers.clear();
1197
1198 if (debugEnabled())
1199 TRACER.debugInfo(
1200 "In " + this.replicationServer.getMonitorInstanceName() +
1201 " baseDN=" + baseDn +
1202 " The source db handler has been cleared");
1203 }
1204 try
1205 {
1206 replicationServer.clearGenerationId(baseDn);
1207 }
1208 catch (Exception e)
1209 {
1210 // TODO: i18n
1211 logError(Message.raw(
1212 "Exception caught while clearing generationId:" +
1213 e.getLocalizedMessage()));
1214 }
1215 }
1216
1217 /**
1218 * Returns whether the provided server is in degraded
1219 * state due to the fact that the peer server has an invalid
1220 * generationId for this domain.
1221 *
1222 * @param serverId The serverId for which we want to know the
1223 * the state.
1224 * @return Whether it is degraded or not.
1225 */
1226
1227 public boolean isDegradedDueToGenerationId(short serverId)
1228 {
1229 if (debugEnabled())
1230 TRACER.debugInfo(
1231 "In " + this.replicationServer.getMonitorInstanceName() +
1232 " baseDN=" + baseDn +
1233 " isDegraded serverId=" + serverId +
1234 " given local generation Id=" + this.generationId);
1235
1236 ServerHandler handler = replicationServers.get(serverId);
1237 if (handler == null)
1238 {
1239 handler = connectedServers.get(serverId);
1240 if (handler == null)
1241 {
1242 return false;
1243 }
1244 }
1245
1246 if (debugEnabled())
1247 TRACER.debugInfo(
1248 "In " + this.replicationServer.getMonitorInstanceName() +
1249 " baseDN=" + baseDn +
1250 " Compute degradation of serverId=" + serverId +
1251 " LS server generation Id=" + handler.getGenerationId());
1252 return (handler.getGenerationId() != this.generationId);
1253 }
1254
1255 /**
1256 * Return the associated replication server.
1257 * @return The replication server.
1258 */
1259 public ReplicationServer getReplicationServer()
1260 {
1261 return replicationServer;
1262 }
1263
1264 /**
1265 * Process reception of a ReplServerInfoMessage.
1266 *
1267 * @param infoMsg The received message.
1268 * @param handler The handler that received the message.
1269 * @throws IOException when raised by the underlying session.
1270 */
1271 public void receiveReplServerInfo(
1272 ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
1273 {
1274 if (debugEnabled())
1275 {
1276 if (handler.isReplicationServer())
1277 TRACER.debugInfo(
1278 "In RS " + getReplicationServer().getServerId() +
1279 " Receiving replServerInfo from " + handler.getServerId() +
1280 " baseDn=" + baseDn +
1281 " genId=" + infoMsg.getGenerationId());
1282 }
1283
1284 mayResetGenerationId();
1285 if (generationId < 0)
1286 generationId = handler.getGenerationId();
1287 if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
1288 {
1289 Message message = NOTE_BAD_GENERATION_ID.get(
1290 baseDn.toNormalizedString(),
1291 Short.toString(handler.getServerId()),
1292 Long.toString(infoMsg.getGenerationId()),
1293 Long.toString(generationId));
1294
1295 ErrorMessage errorMsg = new ErrorMessage(
1296 getReplicationServer().getServerId(),
1297 handler.getServerId(),
1298 message);
1299 handler.sendError(errorMsg);
1300 }
1301 }
1302
1303 /* =======================
1304 * Monitor Data generation
1305 * =======================
1306 */
1307
1308 /**
1309 * Retrieves the global monitor data.
1310 * @return The monitor data.
1311 * @throws DirectoryException When an error occurs.
1312 */
1313 synchronized protected MonitorData getMonitorData()
1314 throws DirectoryException
1315 {
1316 if (monitorData.getBuildDate() + monitorDataLifeTime
1317 > TimeThread.getTime())
1318 {
1319 if (debugEnabled())
1320 TRACER.debugInfo(
1321 "In " + this.replicationServer.getMonitorInstanceName() +
1322 " baseDn=" + baseDn + " getRemoteMonitorData in cache");
1323 // The current data are still valid. No need to renew them.
1324 return monitorData;
1325 }
1326
1327 wrkMonitorData = new MonitorData();
1328 synchronized(wrkMonitorData)
1329 {
1330 if (debugEnabled())
1331 TRACER.debugInfo(
1332 "In " + this.replicationServer.getMonitorInstanceName() +
1333 " baseDn=" + baseDn + " Computing monitor data ");
1334
1335 // Let's process our directly connected LSes
1336 // - in the ServerHandler for a given LS1, the stored state contains :
1337 // - the max CN produced by LS1
1338 // - the last CN consumed by LS1 from LS2..n
1339 // - in the RSdomain/dbHandler, the built-in state contains :
1340 // - the max CN produced by each server
1341 // So for a given LS connected we can take the state and the max from
1342 // the LS/state.
1343
1344 for (ServerHandler directlsh : connectedServers.values())
1345 {
1346 short serverID = directlsh.getServerId();
1347
1348 // the state comes from the state stored in the SH
1349 ServerState directlshState = directlsh.getServerState().duplicate();
1350
1351 // the max CN sent by that LS also comes from the SH
1352 ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
1353 if (maxcn == null)
1354 {
1355 // This directly connected LS has never produced any change
1356 maxcn = new ChangeNumber(0, 0 , serverID);
1357 }
1358 wrkMonitorData.setMaxCN(serverID, maxcn);
1359 wrkMonitorData.setLDAPServerState(serverID, directlshState);
1360 wrkMonitorData.setFirstMissingDate(serverID, directlsh.
1361 getApproxFirstMissingDate());
1362 }
1363
1364 // Then initialize the max CN for the LS that produced something
1365 // - from our own local db state
1366 // - whatever they are directly or undirectly connected
1367 ServerState dbServerState = getDbServerState();
1368 Iterator<Short> it = dbServerState.iterator();
1369 while (it.hasNext())
1370 {
1371 short sid = it.next();
1372 ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
1373 wrkMonitorData.setMaxCN(sid, storedCN);
1374 }
1375
1376 // Now we have used all available local informations
1377 // and we need the remote ones.
1378 if (debugEnabled())
1379 TRACER.debugInfo(
1380 "In " + this.replicationServer.getMonitorInstanceName() +
1381 " baseDn=" + baseDn + " Local monitor data: " +
1382 wrkMonitorData.toString());
1383 }
1384
1385 // Send Request to the other Replication Servers
1386 if (remoteMonitorResponsesSemaphore == null)
1387 {
1388 remoteMonitorResponsesSemaphore = new Semaphore(0);
1389 short requestCnt = sendMonitorDataRequest();
1390 // Wait reponses from them or timeout
1391 waitMonitorDataResponses(requestCnt);
1392 }
1393 else
1394 {
1395 // The processing of renewing the monitor cache is already running
1396 // We'll make it sleeping until the end
1397 // TODO: unit test for this case.
1398 while (remoteMonitorResponsesSemaphore!=null)
1399 {
1400 waitMonitorDataResponses(1);
1401 }
1402 }
1403
1404 wrkMonitorData.completeComputing();
1405
1406 // Store the new computed data as the reference
1407 synchronized(monitorData)
1408 {
1409 // Now we have the expected answers or an error occured
1410 monitorData = wrkMonitorData;
1411 wrkMonitorData = null;
1412 if (debugEnabled())
1413 TRACER.debugInfo(
1414 "In " + this.replicationServer.getMonitorInstanceName() +
1415 " baseDn=" + baseDn + " *** Computed MonitorData: " +
1416 monitorData.toString());
1417 }
1418 return monitorData;
1419 }
1420
1421
1422 /**
1423 * Sends a MonitorRequest message to all connected RS.
1424 * @return the number of requests sent.
1425 * @throws DirectoryException when a problem occurs.
1426 */
1427 protected short sendMonitorDataRequest()
1428 throws DirectoryException
1429 {
1430 short sent=0;
1431 try
1432 {
1433 for (ServerHandler rs : replicationServers.values())
1434 {
1435 MonitorRequestMessage msg = new
1436 MonitorRequestMessage(this.replicationServer.getServerId(),
1437 rs.getServerId());
1438 rs.send(msg);
1439 sent++;
1440 }
1441 }
1442 catch(Exception e)
1443 {
1444 Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
1445 logError(message);
1446 throw new DirectoryException(ResultCode.OTHER,
1447 message, e);
1448 }
1449 return sent;
1450 }
1451
1452 /**
1453 * Wait for the expected count of received MonitorMessage.
1454 * @param expectedResponses The number of expected answers.
1455 * @throws DirectoryException When an error occurs.
1456 */
1457 protected void waitMonitorDataResponses(int expectedResponses)
1458 throws DirectoryException
1459 {
1460 try
1461 {
1462 if (debugEnabled())
1463 TRACER.debugInfo(
1464 "In " + this.replicationServer.getMonitorInstanceName() +
1465 " baseDn=" + baseDn +
1466 " waiting for " + expectedResponses
1467 + " expected monitor messages");
1468
1469 boolean allPermitsAcquired =
1470 remoteMonitorResponsesSemaphore.tryAcquire(
1471 expectedResponses,
1472 (long) 5000, TimeUnit.MILLISECONDS);
1473
1474 if (!allPermitsAcquired)
1475 {
1476 logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
1477 // let's go on in best effort even with limited data received.
1478 }
1479 else
1480 {
1481 if (debugEnabled())
1482 TRACER.debugInfo(
1483 "In " + this.replicationServer.getMonitorInstanceName() +
1484 " baseDn=" + baseDn +
1485 " Successfully received all " + expectedResponses
1486 + " expected monitor messages");
1487 }
1488 }
1489 catch(Exception e)
1490 {
1491 logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
1492 }
1493 finally
1494 {
1495 remoteMonitorResponsesSemaphore = null;
1496 }
1497 }
1498
1499 /**
1500 * Processes a Monitor message receives from a remote Replication Server
1501 * and stores the data received.
1502 *
1503 * @param msg The message to be processed.
1504 */
1505 public void receivesMonitorDataResponse(MonitorMessage msg)
1506 {
1507 if (debugEnabled())
1508 TRACER.debugInfo(
1509 "In " + this.replicationServer.getMonitorInstanceName() +
1510 "Receiving " + msg + " from " + msg.getsenderID() +
1511 remoteMonitorResponsesSemaphore);
1512
1513 if (remoteMonitorResponsesSemaphore == null)
1514 {
1515 // Let's ignore the remote monitor data just received
1516 // since the computing processing has been ended.
1517 // An error - probably a timemout - occured that was already logged
1518 logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
1519 Short.toString(msg.getsenderID())));
1520 return;
1521 }
1522
1523 try
1524 {
1525 synchronized(wrkMonitorData)
1526 {
1527 // Here is the RS state : list <serverID, lastChangeNumber>
1528 // For each LDAP Server, we keep the max CN accross the RSes
1529 ServerState replServerState = msg.getReplServerDbState();
1530 wrkMonitorData.setMaxCNs(replServerState);
1531
1532 // Store the remote LDAP servers states
1533 Iterator<Short> lsidIterator = msg.ldapIterator();
1534 while (lsidIterator.hasNext())
1535 {
1536 short sid = lsidIterator.next();
1537 wrkMonitorData.setLDAPServerState(sid,
1538 msg.getLDAPServerState(sid).duplicate());
1539 wrkMonitorData.setFirstMissingDate(sid,
1540 msg.getLDAPApproxFirstMissingDate(sid));
1541 }
1542
1543 // Process the latency reported by the remote RSi on its connections
1544 // to the other RSes
1545 Iterator<Short> rsidIterator = msg.rsIterator();
1546 while (rsidIterator.hasNext())
1547 {
1548 short rsid = rsidIterator.next();
1549 if (rsid == replicationServer.getServerId())
1550 {
1551 // this is the latency of the remote RSi regarding the current RS
1552 // let's update the fmd of my connected LS
1553 for (ServerHandler connectedlsh : connectedServers.values())
1554 {
1555 short connectedlsid = connectedlsh.getServerId();
1556 Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
1557 wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
1558 }
1559 }
1560 else
1561 {
1562 // this is the latency of the remote RSi regarding another RSj
1563 // let's update the latency of the LSes connected to RSj
1564 ServerHandler rsjHdr = replicationServers.get(rsid);
1565 if (rsjHdr != null)
1566 {
1567 for(short remotelsid : rsjHdr.getConnectedServerIds())
1568 {
1569 Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
1570 wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
1571 }
1572 }
1573 }
1574 }
1575 if (debugEnabled())
1576 {
1577 if (debugEnabled())
1578 TRACER.debugInfo(
1579 "In " + this.replicationServer.getMonitorInstanceName() +
1580 " baseDn=" + baseDn +
1581 " Processed msg from " + msg.getsenderID() +
1582 " New monitor data: " + wrkMonitorData.toString());
1583 }
1584 }
1585
1586 // Decreases the number of expected responses and potentially
1587 // wakes up the waiting requestor thread.
1588 remoteMonitorResponsesSemaphore.release();
1589
1590 }
1591 catch (Exception e)
1592 {
1593 logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
1594 stackTraceToSingleLineString(e)));
1595
1596 // If an exception occurs while processing one of the expected message,
1597 // the processing is aborted and the waiting thread is awoke.
1598 remoteMonitorResponsesSemaphore.notifyAll();
1599 }
1600 }
1601
1602 /**
1603 * Set the purge delay on all the db Handlers for this Domain
1604 * of Replicaiton.
1605 *
1606 * @param delay The new purge delay to use.
1607 */
1608 void setPurgeDelay(long delay)
1609 {
1610 for (DbHandler handler : sourceDbHandlers.values())
1611 {
1612 handler.setPurgeDelay(delay);
1613 }
1614 }
1615 }