001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028
029 import org.opends.messages.*;
030
031 import static org.opends.server.loggers.ErrorLogger.logError;
032 import static org.opends.server.loggers.debug.DebugLogger.*;
033 import org.opends.server.loggers.debug.DebugTracer;
034 import static org.opends.messages.ReplicationMessages.*;
035 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
036
037 import java.io.IOException;
038 import java.net.ConnectException;
039 import java.net.InetAddress;
040 import java.net.InetSocketAddress;
041 import java.net.Socket;
042 import java.net.SocketException;
043 import java.net.SocketTimeoutException;
044 import java.util.Collection;
045 import java.util.HashMap;
046 import java.util.Iterator;
047 import java.util.LinkedHashSet;
048 import java.util.TreeSet;
049 import java.util.concurrent.Semaphore;
050 import java.util.concurrent.TimeUnit;
051
052 import org.opends.server.protocols.asn1.ASN1OctetString;
053 import org.opends.server.protocols.internal.InternalClientConnection;
054 import org.opends.server.protocols.internal.InternalSearchListener;
055 import org.opends.server.protocols.internal.InternalSearchOperation;
056 import org.opends.server.protocols.ldap.LDAPFilter;
057 import org.opends.server.replication.common.ChangeNumber;
058 import org.opends.server.replication.common.ServerState;
059 import org.opends.server.replication.protocol.*;
060 import org.opends.server.types.DN;
061 import org.opends.server.types.DereferencePolicy;
062 import org.opends.server.types.ResultCode;
063 import org.opends.server.types.SearchResultEntry;
064 import org.opends.server.types.SearchResultReference;
065 import org.opends.server.types.SearchScope;
066
067 /**
068 * The broker for Multi-master Replication.
069 */
070 public class ReplicationBroker implements InternalSearchListener
071 {
072
073 /**
074 * The tracer object for the debug logger.
075 */
076 private static final DebugTracer TRACER = getTracer();
077 private boolean shutdown = false;
078 private Collection<String> servers;
079 private boolean connected = false;
080 private String replicationServer = "Not connected";
081 private TreeSet<FakeOperation> replayOperations;
082 private ProtocolSession session = null;
083 private final ServerState state;
084 private final DN baseDn;
085 private final short serverID;
086 private int maxSendDelay;
087 private int maxReceiveDelay;
088 private int maxSendQueue;
089 private int maxReceiveQueue;
090 private Semaphore sendWindow;
091 private int maxSendWindow;
092 private int rcvWindow;
093 private int halfRcvWindow;
094 private int maxRcvWindow;
095 private int timeout = 0;
096 private short protocolVersion;
097 private long generationId = -1;
098 private ReplSessionSecurity replSessionSecurity;
099
100 // Trick for avoiding a inner class for many parameters return for
101 // performHandshake method.
102 private String tmpReadableServerName = null;
103 /**
104 * The time in milliseconds between heartbeats from the replication
105 * server. Zero means heartbeats are off.
106 */
107 private long heartbeatInterval = 0;
108 /**
109 * A thread to monitor heartbeats on the session.
110 */
111 private HeartbeatMonitor heartbeatMonitor = null;
112 /**
113 * The number of times the connection was lost.
114 */
115 private int numLostConnections = 0;
116 /**
117 * When the broker cannot connect to any replication server
118 * it log an error and keeps continuing every second.
119 * This boolean is set when the first failure happens and is used
120 * to avoid repeating the error message for further failure to connect
121 * and to know that it is necessary to print a new message when the broker
122 * finally succeed to connect.
123 */
124 private boolean connectionError = false;
125 private final Object connectPhaseLock = new Object();
126
127 /**
128 * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
129 *
130 * @param state The ServerState that should be used by this broker
131 * when negociating the session with the replicationServer.
132 * @param baseDn The base DN that should be used by this broker
133 * when negociating the session with the replicationServer.
134 * @param serverID The server ID that should be used by this broker
135 * when negociating the session with the replicationServer.
136 * @param maxReceiveQueue The maximum size of the receive queue to use on
137 * the replicationServer.
138 * @param maxReceiveDelay The maximum replication delay to use on the
139 * replicationServer.
140 * @param maxSendQueue The maximum size of the send queue to use on
141 * the replicationServer.
142 * @param maxSendDelay The maximum send delay to use on the replicationServer.
143 * @param window The size of the send and receive window to use.
144 * @param heartbeatInterval The interval between heartbeats requested of the
145 * replicationServer, or zero if no heartbeats are requested.
146 *
147 * @param generationId The generationId for the server associated to the
148 * provided serverID and for the domain associated to the provided baseDN.
149 * @param replSessionSecurity The session security configuration.
150 */
151 public ReplicationBroker(ServerState state, DN baseDn, short serverID,
152 int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
153 int maxSendDelay, int window, long heartbeatInterval,
154 long generationId, ReplSessionSecurity replSessionSecurity)
155 {
156 this.baseDn = baseDn;
157 this.serverID = serverID;
158 this.maxReceiveDelay = maxReceiveDelay;
159 this.maxSendDelay = maxSendDelay;
160 this.maxReceiveQueue = maxReceiveQueue;
161 this.maxSendQueue = maxSendQueue;
162 this.state = state;
163 replayOperations =
164 new TreeSet<FakeOperation>(new FakeOperationComparator());
165 this.rcvWindow = window;
166 this.maxRcvWindow = window;
167 this.halfRcvWindow = window / 2;
168 this.heartbeatInterval = heartbeatInterval;
169 this.protocolVersion = ProtocolVersion.currentVersion();
170 this.generationId = generationId;
171 this.replSessionSecurity = replSessionSecurity;
172 }
173
174 /**
175 * Start the ReplicationBroker.
176 *
177 * @param servers list of servers used
178 */
179 public void start(Collection<String> servers)
180 {
181 /*
182 * Open Socket to the ReplicationServer
183 * Send the Start message
184 */
185 shutdown = false;
186 this.servers = servers;
187 if (servers.size() < 1)
188 {
189 Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
190 logError(message);
191 }
192
193 this.rcvWindow = this.maxRcvWindow;
194 this.connect();
195 }
196
197 /**
198 * Connect to a ReplicationServer.
199 *
200 * @throws NumberFormatException address was invalid
201 */
202 private void connect()
203 {
204 HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
205
206 // Stop any existing heartbeat monitor from a previous session.
207 stopHeartBeat();
208
209 synchronized (connectPhaseLock)
210 {
211 /*
212 * Connect to each replication server and get their ServerState then find
213 * out which one is the best to connect to.
214 */
215 for (String server : servers)
216 {
217 // Connect to server and get reply message
218 ReplServerStartMessage replServerStartMsg =
219 performHandshake(server, false);
220 tmpReadableServerName = null; // Not needed now
221
222 // Store reply message in list
223 if (replServerStartMsg != null)
224 {
225 ServerState rsState = replServerStartMsg.getServerState();
226 rsStates.put(server, rsState);
227 }
228 } // for servers
229
230 ReplServerStartMessage replServerStartMsg = null;
231
232 if (rsStates.size() > 0)
233 {
234
235 // At least one server answered, find the best one.
236 String bestServer = computeBestReplicationServer(state, rsStates,
237 serverID, baseDn);
238
239 // Best found, now connect to this one
240 replServerStartMsg = performHandshake(bestServer, true);
241
242 if (replServerStartMsg != null)
243 {
244 try
245 {
246 /*
247 * We must not publish changes to a replicationServer that has not
248 * seen all our previous changes because this could cause some
249 * other ldap servers to miss those changes.
250 * Check that the ReplicationServer has seen all our previous
251 * changes.
252 */
253 ChangeNumber replServerMaxChangeNumber =
254 replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
255
256 if (replServerMaxChangeNumber == null)
257 {
258 replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
259 }
260 ChangeNumber ourMaxChangeNumber =
261 state.getMaxChangeNumber(serverID);
262
263 if ((ourMaxChangeNumber != null) &&
264 (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
265 {
266
267 // Replication server is missing some of our changes: let's send
268 // them to him.
269 replayOperations.clear();
270
271 Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
272 logError(message);
273
274 /*
275 * Get all the changes that have not been seen by this
276 * replication server and populate the replayOperations
277 * list.
278 */
279 InternalSearchOperation op = searchForChangedEntries(
280 baseDn, replServerMaxChangeNumber, this);
281 if (op.getResultCode() != ResultCode.SUCCESS)
282 {
283 /*
284 * An error happened trying to search for the updates
285 * This server will start acepting again new updates but
286 * some inconsistencies will stay between servers.
287 * Log an error for the repair tool
288 * that will need to resynchronize the servers.
289 */
290 message = ERR_CANNOT_RECOVER_CHANGES.get(
291 baseDn.toNormalizedString());
292 logError(message);
293 } else
294 {
295 for (FakeOperation replayOp : replayOperations)
296 {
297 message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
298 toString());
299 logError(message);
300 session.publish(replayOp.generateMessage());
301 }
302 message = DEBUG_CHANGES_SENT.get();
303 logError(message);
304 }
305 }
306
307 replicationServer = tmpReadableServerName;
308 maxSendWindow = replServerStartMsg.getWindowSize();
309 connected = true;
310 startHeartBeat();
311 } catch (IOException e)
312 {
313 Message message = ERR_PUBLISHING_FAKE_OPS.get(
314 baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
315 stackTraceToSingleLineString(e));
316 logError(message);
317 } catch (Exception e)
318 {
319 Message message = ERR_COMPUTING_FAKE_OPS.get(
320 baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
321 stackTraceToSingleLineString(e));
322 logError(message);
323 } finally
324 {
325 if (connected == false)
326 {
327 if (session != null)
328 {
329 try
330 {
331 session.close();
332 } catch (IOException e)
333 {
334 // The session was already closed, just ignore.
335 }
336 session = null;
337 }
338 }
339 }
340 } // Could perform handshake with best
341 } // Reached some servers
342
343 if (connected)
344 {
345 // Log a message to let the administrator know that the failure was
346 // resolved.
347 // Wakeup all the thread that were waiting on the window
348 // on the previous connection.
349 connectionError = false;
350 if (sendWindow != null)
351 {
352 sendWindow.release(Integer.MAX_VALUE);
353 }
354 this.sendWindow = new Semaphore(maxSendWindow);
355 connectPhaseLock.notify();
356
357 if ((replServerStartMsg.getGenerationId() == this.generationId) ||
358 (replServerStartMsg.getGenerationId() == -1))
359 {
360 Message message =
361 NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
362 baseDn.toString(),
363 replicationServer,
364 Long.toString(this.generationId));
365 logError(message);
366 } else
367 {
368 Message message =
369 NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
370 baseDn.toString(),
371 replicationServer,
372 Long.toString(this.generationId),
373 Long.toString(replServerStartMsg.getGenerationId()));
374 logError(message);
375 }
376 } else
377 {
378 /*
379 * This server could not find any replicationServer. It's going to start
380 * in degraded mode. Log a message.
381 */
382 if (!connectionError)
383 {
384 connectionError = true;
385 connectPhaseLock.notify();
386 Message message =
387 NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
388 logError(message);
389 }
390 }
391 }
392 }
393
394 /**
395 * Connect to the provided server performing the handshake (start messages
396 * exchange) and return the reply message from the replication server.
397 *
398 * @param server Server to connect to.
399 * @param keepConnection Do we keep session opened or not after handshake.
400 * @return The ReplServerStartMessage the server replied. Null if could not
401 * get an answer.
402 */
403 public ReplServerStartMessage performHandshake(String server,
404 boolean keepConnection)
405 {
406 ReplServerStartMessage replServerStartMsg = null;
407
408 // Parse server string.
409 int separator = server.lastIndexOf(':');
410 String port = server.substring(separator + 1);
411 String hostname = server.substring(0, separator);
412
413 boolean error = false;
414 try
415 {
416 /*
417 * Open a socket connection to the next candidate.
418 */
419 int intPort = Integer.parseInt(port);
420 InetSocketAddress serverAddr = new InetSocketAddress(
421 InetAddress.getByName(hostname), intPort);
422 tmpReadableServerName = serverAddr.toString();
423 Socket socket = new Socket();
424 socket.setReceiveBufferSize(1000000);
425 socket.setTcpNoDelay(true);
426 socket.connect(serverAddr, 500);
427 session = replSessionSecurity.createClientSession(server, socket);
428 boolean isSslEncryption =
429 replSessionSecurity.isSslEncryption(server);
430 /*
431 * Send our ServerStartMessage.
432 */
433 ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
434 maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
435 halfRcvWindow * 2, heartbeatInterval, state,
436 protocolVersion, generationId, isSslEncryption, !keepConnection);
437 session.publish(msg);
438
439 /*
440 * Read the ReplServerStartMessage that should come back.
441 */
442 session.setSoTimeout(1000);
443 replServerStartMsg = (ReplServerStartMessage) session.receive();
444
445 /*
446 * We have sent our own protocol version to the replication server.
447 * The replication server will use the same one (or an older one
448 * if it is an old replication server).
449 */
450 protocolVersion = ProtocolVersion.minWithCurrent(
451 replServerStartMsg.getVersion());
452 session.setSoTimeout(timeout);
453
454 if (!isSslEncryption)
455 {
456 session.stopEncryption();
457 }
458 } catch (ConnectException e)
459 {
460 /*
461 * There was no server waiting on this host:port
462 * Log a notice and try the next replicationServer in the list
463 */
464 if (!connectionError)
465 {
466 // the error message is only logged once to avoid overflowing
467 // the error log
468 Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
469 logError(message);
470 }
471 error = true;
472 } catch (Exception e)
473 {
474 Message message = ERR_EXCEPTION_STARTING_SESSION.get(
475 baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
476 stackTraceToSingleLineString(e));
477 logError(message);
478 error = true;
479 }
480
481 // Close session if requested
482 if (!keepConnection || error)
483 {
484 if (session != null)
485 {
486 try
487 {
488 session.close();
489 } catch (IOException e)
490 {
491 // The session was already closed, just ignore.
492 }
493 session = null;
494 }
495 if (error)
496 {
497 replServerStartMsg = null;
498 } // Be sure to return null.
499 }
500
501 return replServerStartMsg;
502 }
503
504 /**
505 * Returns the replication server that best fits our need so that we can
506 * connect to it.
507 *
508 * Note: this method put as public static for unit testing purpose.
509 *
510 * @param myState The local server state.
511 * @param rsStates The list of available replication servers and their
512 * associated server state.
513 * @param serverId The server id for the suffix we are working for.
514 * @param baseDn The suffix for which we are working for.
515 * @return The computed best replication server.
516 */
517 public static String computeBestReplicationServer(ServerState myState,
518 HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
519 {
520
521 /*
522 * Find replication servers who are up to date (or more up to date than us,
523 * if for instance we failed and restarted, having sent some changes to the
524 * RS but without having time to store our own state) regarding our own
525 * server id. Then, among them, choose the server that is the most up to
526 * date regarding the whole topology.
527 *
528 * If no server is up to date regarding our own server id, find the one who
529 * is the most up to date regarding our server id.
530 */
531
532 // Should never happen (sanity check)
533 if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
534 (baseDn == null))
535 {
536 return null;
537 }
538
539 String bestServer = null;
540 // Servers up to dates with regard to our changes
541 HashMap<String, ServerState> upToDateServers =
542 new HashMap<String, ServerState>();
543 // Servers late with regard to our changes
544 HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
545
546 /*
547 * Start loop to differenciate up to date servers from late ones.
548 */
549 ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
550 if (myChangeNumber == null)
551 {
552 myChangeNumber = new ChangeNumber(0, 0, serverId);
553 }
554 for (String repServer : rsStates.keySet())
555 {
556
557 ServerState rsState = rsStates.get(repServer);
558 ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
559 if (rsChangeNumber == null)
560 {
561 rsChangeNumber = new ChangeNumber(0, 0, serverId);
562 }
563
564 // Store state in right list
565 if (myChangeNumber.olderOrEqual(rsChangeNumber))
566 {
567 upToDateServers.put(repServer, rsState);
568 } else
569 {
570 lateOnes.put(repServer, rsState);
571 }
572 }
573
574 if (upToDateServers.size() > 0)
575 {
576
577 /*
578 * Some up to date servers, among them, choose the one that has the
579 * maximum number of changes to send us. This is the most up to date one
580 * regarding the whole topology. This server is the one which has the less
581 * difference with the topology server state. For comparison, we need to
582 * compute the difference for each server id with the topology server
583 * state.
584 */
585
586 Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
587 upToDateServers.size(),
588 baseDn.toNormalizedString());
589 logError(message);
590
591 /*
592 * First of all, compute the virtual server state for the whole topology,
593 * which is composed of the most up to date change numbers for
594 * each server id in the topology.
595 */
596 ServerState topoState = new ServerState();
597 for (ServerState curState : upToDateServers.values())
598 {
599
600 Iterator<Short> it = curState.iterator();
601 while (it.hasNext())
602 {
603 Short sId = it.next();
604 ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
605 if (curSidCn == null)
606 {
607 curSidCn = new ChangeNumber(0, 0, sId);
608 }
609 // Update topology state
610 topoState.update(curSidCn);
611 }
612 } // For up to date servers
613
614 // Min of the max shifts
615 long minShift = -1L;
616 for (String upServer : upToDateServers.keySet())
617 {
618
619 /*
620 * Compute the maximum difference between the time of a server id's
621 * change number and the time of the matching server id's change
622 * number in the topology server state.
623 *
624 * Note: we could have used the sequence number here instead of the
625 * timestamp, but this would have caused a problem when the sequence
626 * number loops and comes back to 0 (computation would have becomen
627 * meaningless).
628 */
629 long shift = -1L;
630 ServerState curState = upToDateServers.get(upServer);
631 Iterator<Short> it = curState.iterator();
632 while (it.hasNext())
633 {
634 Short sId = it.next();
635 ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
636 if (curSidCn == null)
637 {
638 curSidCn = new ChangeNumber(0, 0, sId);
639 }
640 // Cannot be null as checked at construction time
641 ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
642 // Cannot be negative as topoState computed as being the max CN
643 // for each server id in the topology
644 long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
645 if (tmpShift > shift)
646 {
647 shift = tmpShift;
648 }
649 }
650
651 if ((minShift < 0) // First time in loop
652 || (shift < minShift))
653 {
654 // This sever is even closer to topo state
655 bestServer = upServer;
656 minShift = shift;
657 }
658 } // For up to date servers
659
660 } else
661 {
662 /*
663 * We could not find a replication server that has seen all the
664 * changes that this server has already processed,
665 */
666 // lateOnes cannot be empty
667 Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
668 baseDn.toNormalizedString(), lateOnes.size());
669 logError(message);
670
671 // Min of the shifts
672 long minShift = -1L;
673 for (String lateServer : lateOnes.keySet())
674 {
675
676 /*
677 * Choose the server who is the closest to us regarding our server id
678 * (this is the most up to date regarding our server id).
679 */
680 ServerState curState = lateOnes.get(lateServer);
681 ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
682 if (ourSidCn == null)
683 {
684 ourSidCn = new ChangeNumber(0, 0, serverId);
685 }
686 // Cannot be negative as our Cn for our server id is strictly
687 // greater than those of the servers in late server list
688 long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
689
690 if ((minShift < 0) // First time in loop
691 || (tmpShift < minShift))
692 {
693 // This sever is even closer to topo state
694 bestServer = lateServer;
695 minShift = tmpShift;
696 }
697 } // For late servers
698 }
699
700 return bestServer;
701 }
702
703 /**
704 * Search for the changes that happened since fromChangeNumber
705 * based on the historical attribute.
706 * @param baseDn the base DN
707 * @param fromChangeNumber The change number from which we want the changes
708 * @param resultListener that will process the entries returned.
709 * @return the internal search operation
710 * @throws Exception when raised.
711 */
712 public static InternalSearchOperation searchForChangedEntries(
713 DN baseDn,
714 ChangeNumber fromChangeNumber,
715 InternalSearchListener resultListener)
716 throws Exception
717 {
718 InternalClientConnection conn =
719 InternalClientConnection.getRootConnection();
720 LDAPFilter filter = LDAPFilter.decode(
721 "(" + Historical.HISTORICALATTRIBUTENAME +
722 ">=dummy:" + fromChangeNumber + ")");
723 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
724 attrs.add(Historical.HISTORICALATTRIBUTENAME);
725 attrs.add(Historical.ENTRYUIDNAME);
726 return conn.processSearch(
727 new ASN1OctetString(baseDn.toString()),
728 SearchScope.WHOLE_SUBTREE,
729 DereferencePolicy.NEVER_DEREF_ALIASES,
730 0, 0, false, filter,
731 attrs,
732 resultListener);
733 }
734
735 /**
736 * Start the heartbeat monitor thread.
737 */
738 private void startHeartBeat()
739 {
740 // Start a heartbeat monitor thread.
741 if (heartbeatInterval > 0)
742 {
743 heartbeatMonitor =
744 new HeartbeatMonitor("Replication Heartbeat Monitor on " +
745 baseDn + " with " + getReplicationServer(),
746 session, heartbeatInterval);
747 heartbeatMonitor.start();
748 }
749 }
750
751 /**
752 * Stop the heartbeat monitor thread.
753 */
754 void stopHeartBeat()
755 {
756 if (heartbeatMonitor != null)
757 {
758 heartbeatMonitor.shutdown();
759 heartbeatMonitor = null;
760 }
761 }
762
763 /**
764 * restart the ReplicationBroker.
765 */
766 public void reStart()
767 {
768 reStart(this.session);
769 }
770
771 /**
772 * Restart the ReplicationServer broker after a failure.
773 *
774 * @param failingSession the socket which failed
775 */
776 public void reStart(ProtocolSession failingSession)
777 {
778 try
779 {
780 if (failingSession != null)
781 {
782 failingSession.close();
783 numLostConnections++;
784 }
785 } catch (IOException e1)
786 {
787 // ignore
788 }
789
790 if (failingSession == session)
791 {
792 this.connected = false;
793 }
794 while (!this.connected && (!this.shutdown))
795 {
796 try
797 {
798 this.connect();
799 } catch (Exception e)
800 {
801 MessageBuilder mb = new MessageBuilder();
802 mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
803 baseDn.toNormalizedString(), e.getLocalizedMessage()));
804 mb.append(stackTraceToSingleLineString(e));
805 logError(mb.toMessage());
806 }
807 if ((!connected) && (!shutdown))
808 {
809 try
810 {
811 Thread.sleep(500);
812 } catch (InterruptedException e)
813 {
814 // ignore
815 }
816 }
817 }
818 }
819
820 /**
821 * Publish a message to the other servers.
822 * @param msg the message to publish
823 */
824 public void publish(ReplicationMessage msg)
825 {
826 boolean done = false;
827
828 while (!done && !shutdown)
829 {
830 if (connectionError)
831 {
832 // It was not possible to connect to any replication server.
833 // Since the operation was already processed, we have no other
834 // choice than to return without sending the ReplicationMessage
835 // and relying on the resend procedure of the connect phase to
836 // fix the problem when we finally connect.
837
838 if (debugEnabled())
839 {
840 debugInfo("ReplicationBroker.publish() Publishing a " +
841 " message is not possible due to existing connection error.");
842 }
843
844 return;
845 }
846
847 try
848 {
849 boolean credit;
850 ProtocolSession current_session;
851 Semaphore currentWindowSemaphore;
852
853 // save the session at the time when we acquire the
854 // sendwindow credit so that we can make sure later
855 // that the session did not change in between.
856 // This is necessary to make sure that we don't publish a message
857 // on a session with a credit that was acquired from a previous
858 // session.
859 synchronized (connectPhaseLock)
860 {
861 current_session = session;
862 currentWindowSemaphore = sendWindow;
863 }
864
865 if (msg instanceof UpdateMessage)
866 {
867 // Acquiring the window credit must be done outside of the
868 // connectPhaseLock because it can be blocking and we don't
869 // want to hold off reconnection in case the connection dropped.
870 credit =
871 currentWindowSemaphore.tryAcquire(
872 (long) 500, TimeUnit.MILLISECONDS);
873 } else
874 {
875 credit = true;
876 }
877 if (credit)
878 {
879 synchronized (connectPhaseLock)
880 {
881 // check the session. If it has changed, some
882 // deconnection/reconnection happened and we need to restart from
883 // scratch.
884 if (session == current_session)
885 {
886 session.publish(msg);
887 done = true;
888 }
889 }
890 }
891 if (!credit)
892 {
893 // the window is still closed.
894 // Send a WindowProbe message to wakeup the receiver in case the
895 // window update message was lost somehow...
896 // then loop to check again if connection was closed.
897 session.publish(new WindowProbe());
898 }
899 } catch (IOException e)
900 {
901 // The receive threads should handle reconnection or
902 // mark this broker in error. Just retry.
903 synchronized (connectPhaseLock)
904 {
905 try
906 {
907 connectPhaseLock.wait(100);
908 } catch (InterruptedException e1)
909 {
910 // ignore
911 if (debugEnabled())
912 {
913 debugInfo("ReplicationBroker.publish() " +
914 "IO exception raised : " + e.getLocalizedMessage());
915 }
916 }
917 }
918 } catch (InterruptedException e)
919 {
920 // just loop.
921 if (debugEnabled())
922 {
923 debugInfo("ReplicationBroker.publish() " +
924 "Interrupted exception raised." + e.getLocalizedMessage());
925 }
926 }
927 }
928 }
929
930 /**
931 * Receive a message.
932 * This method is not multithread safe and should either always be
933 * called in a single thread or protected by a locking mechanism
934 * before being called.
935 *
936 * @return the received message
937 * @throws SocketTimeoutException if the timeout set by setSoTimeout
938 * has expired
939 */
940 public ReplicationMessage receive() throws SocketTimeoutException
941 {
942 while (shutdown == false)
943 {
944 if (!connected)
945 {
946 reStart(null);
947 }
948
949 ProtocolSession failingSession = session;
950 try
951 {
952 ReplicationMessage msg = session.receive();
953 if (msg instanceof WindowMessage)
954 {
955 WindowMessage windowMsg = (WindowMessage) msg;
956 sendWindow.release(windowMsg.getNumAck());
957 }
958 else
959 {
960 return msg;
961 }
962 } catch (SocketTimeoutException e)
963 {
964 throw e;
965 } catch (Exception e)
966 {
967 if (shutdown == false)
968 {
969 Message message =
970 NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
971 logError(message);
972
973 debugInfo("ReplicationBroker.receive() " + baseDn +
974 " Exception raised." + e + e.getLocalizedMessage());
975 this.reStart(failingSession);
976 }
977 }
978 }
979 return null;
980 }
981
982 /**
983 * This method allows to do the necessary computing for the window
984 * management after treatment by the worker threads.
985 *
986 * This should be called once the replay thread have done their job
987 * and the window can be open again.
988 */
989 public synchronized void updateWindowAfterReplay()
990 {
991 try
992 {
993 rcvWindow--;
994 if (rcvWindow < halfRcvWindow)
995 {
996 session.publish(new WindowMessage(halfRcvWindow));
997 rcvWindow += halfRcvWindow;
998 }
999 } catch (IOException e)
1000 {
1001 // Any error on the socket will be handled by the thread calling receive()
1002 // just ignore.
1003 }
1004 }
1005
1006 /**
1007 * stop the server.
1008 */
1009 public void stop()
1010 {
1011 replicationServer = "stopped";
1012 shutdown = true;
1013 connected = false;
1014 try
1015 {
1016 if (debugEnabled())
1017 {
1018 debugInfo("ReplicationBroker is stopping. and will" +
1019 " close the connection");
1020 }
1021
1022 if (session != null)
1023 {
1024 session.close();
1025 }
1026 } catch (IOException e)
1027 {
1028 }
1029 }
1030
1031 /**
1032 * Set a timeout value.
1033 * With this option set to a non-zero value, calls to the receive() method
1034 * block for only this amount of time after which a
1035 * java.net.SocketTimeoutException is raised.
1036 * The Broker is valid and useable even after such an Exception is raised.
1037 *
1038 * @param timeout the specified timeout, in milliseconds.
1039 * @throws SocketException if there is an error in the underlying protocol,
1040 * such as a TCP error.
1041 */
1042 public void setSoTimeout(int timeout) throws SocketException
1043 {
1044 this.timeout = timeout;
1045 if (session != null)
1046 {
1047 session.setSoTimeout(timeout);
1048 }
1049 }
1050
1051 /**
1052 * Set the value of the generationId for that broker. Normally the
1053 * generationId is set through the constructor but there are cases
1054 * where the value of the generationId must be changed while the broker
1055 * already exist for example after an on-line import.
1056 *
1057 * @param generationId The value of the generationId.
1058 *
1059 */
1060 public void setGenerationId(long generationId)
1061 {
1062 this.generationId = generationId;
1063 }
1064
1065 /**
1066 * Get the name of the replicationServer to which this broker is currently
1067 * connected.
1068 *
1069 * @return the name of the replicationServer to which this domain
1070 * is currently connected.
1071 */
1072 public String getReplicationServer()
1073 {
1074 return replicationServer;
1075 }
1076
1077 /**
1078 * {@inheritDoc}
1079 */
1080 public void handleInternalSearchEntry(
1081 InternalSearchOperation searchOperation,
1082 SearchResultEntry searchEntry)
1083 {
1084 /*
1085 * Only deal with modify operation so far
1086 * TODO : implement code for ADD, DEL, MODDN operation
1087 *
1088 * Parse all ds-sync-hist attribute values
1089 * - for each Changenumber > replication server MaxChangeNumber :
1090 * build an attribute mod
1091 *
1092 */
1093 Iterable<FakeOperation> updates =
1094 Historical.generateFakeOperations(searchEntry);
1095 for (FakeOperation op : updates)
1096 {
1097 replayOperations.add(op);
1098 }
1099 }
1100
1101 /**
1102 * {@inheritDoc}
1103 */
1104 public void handleInternalSearchReference(
1105 InternalSearchOperation searchOperation,
1106 SearchResultReference searchReference)
1107 {
1108 // TODO to be implemented
1109 }
1110
1111 /**
1112 * Get the maximum receive window size.
1113 *
1114 * @return The maximum receive window size.
1115 */
1116 public int getMaxRcvWindow()
1117 {
1118 return maxRcvWindow;
1119 }
1120
1121 /**
1122 * Get the current receive window size.
1123 *
1124 * @return The current receive window size.
1125 */
1126 public int getCurrentRcvWindow()
1127 {
1128 return rcvWindow;
1129 }
1130
1131 /**
1132 * Get the maximum send window size.
1133 *
1134 * @return The maximum send window size.
1135 */
1136 public int getMaxSendWindow()
1137 {
1138 return maxSendWindow;
1139 }
1140
1141 /**
1142 * Get the current send window size.
1143 *
1144 * @return The current send window size.
1145 */
1146 public int getCurrentSendWindow()
1147 {
1148 if (connected)
1149 {
1150 return sendWindow.availablePermits();
1151 } else
1152 {
1153 return 0;
1154 }
1155 }
1156
1157 /**
1158 * Get the number of times the connection was lost.
1159 * @return The number of times the connection was lost.
1160 */
1161 public int getNumLostConnections()
1162 {
1163 return numLostConnections;
1164 }
1165
1166 /**
1167 * Change some config parameters.
1168 *
1169 * @param replicationServers The new list of replication servers.
1170 * @param maxReceiveQueue The max size of receive queue.
1171 * @param maxReceiveDelay The max receive delay.
1172 * @param maxSendQueue The max send queue.
1173 * @param maxSendDelay The max Send Delay.
1174 * @param window The max window size.
1175 * @param heartbeatInterval The heartbeat interval.
1176 */
1177 public void changeConfig(Collection<String> replicationServers,
1178 int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
1179 int maxSendDelay, int window, long heartbeatInterval)
1180 {
1181 this.servers = replicationServers;
1182 this.maxRcvWindow = window;
1183 this.heartbeatInterval = heartbeatInterval;
1184 this.maxReceiveDelay = maxReceiveDelay;
1185 this.maxReceiveQueue = maxReceiveQueue;
1186 this.maxSendDelay = maxSendDelay;
1187 this.maxSendQueue = maxSendQueue;
1188 // TODO : Changing those parameters requires to either restart a new
1189 // session with the replicationServer or renegociate the parameters that
1190 // were sent in the ServerStart message
1191 }
1192
1193 /**
1194 * Get the version of the replication protocol.
1195 * @return The version of the replication protocol.
1196 */
1197 public short getProtocolVersion()
1198 {
1199 return protocolVersion;
1200 }
1201
1202 /**
1203 * Check if the broker is connected to a ReplicationServer and therefore
1204 * ready to received and send Replication Messages.
1205 *
1206 * @return true if the server is connected, false if not.
1207 */
1208 public boolean isConnected()
1209 {
1210 return !connectionError;
1211 }
1212
1213 private boolean debugEnabled()
1214 {
1215 return true;
1216 }
1217
1218 private static final void debugInfo(String s)
1219 {
1220 logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
1221 TRACER.debugInfo(s);
1222 }
1223
1224 /**
1225 * Determine whether the connection to the replication server is encrypted.
1226 * @return true if the connection is encrypted, false otherwise.
1227 */
1228 public boolean isSessionEncrypted()
1229 {
1230 boolean isEncrypted = false;
1231 if (session != null)
1232 {
1233 return session.isEncrypted();
1234 }
1235 return isEncrypted;
1236 }
1237 }