001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028
029 import org.opends.messages.*;
030
031 import static org.opends.server.loggers.ErrorLogger.logError;
032 import static org.opends.server.loggers.debug.DebugLogger.*;
033
034 import org.opends.server.loggers.debug.DebugTracer;
035 import static org.opends.messages.ReplicationMessages.*;
036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037
038 import java.io.IOException;
039 import java.util.Date;
040 import java.util.List;
041 import java.util.ArrayList;
042 import java.util.HashMap;
043 import java.util.LinkedHashSet;
044 import java.util.Map;
045 import java.util.Set;
046 import java.util.SortedSet;
047 import java.util.TreeSet;
048 import java.util.concurrent.ConcurrentHashMap;
049 import java.util.concurrent.Semaphore;
050 import java.util.concurrent.TimeUnit;
051
052 import org.opends.server.admin.std.server.MonitorProviderCfg;
053 import org.opends.server.api.MonitorProvider;
054 import org.opends.server.config.ConfigException;
055 import org.opends.server.core.DirectoryServer;
056 import org.opends.server.replication.common.ChangeNumber;
057 import org.opends.server.replication.common.ServerState;
058 import org.opends.server.replication.protocol.*;
059 import org.opends.server.types.Attribute;
060 import org.opends.server.types.AttributeType;
061 import org.opends.server.types.AttributeValue;
062 import org.opends.server.types.DN;
063 import org.opends.server.types.InitializationException;
064 import org.opends.server.util.TimeThread;
065
066 /**
067 * This class defines a server handler, which handles all interaction with a
068 * replication server.
069 */
070 public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
071 {
072 /**
073 * The tracer object for the debug logger.
074 */
075 private static final DebugTracer TRACER = getTracer();
076
077 /**
078 * Time during which the server will wait for existing thread to stop
079 * during the shutdown.
080 */
081 private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
082
083 private short serverId;
084 private ProtocolSession session;
085 private final MsgQueue msgQueue = new MsgQueue();
086 private MsgQueue lateQueue = new MsgQueue();
087 private final Map<ChangeNumber, AckMessageList> waitingAcks =
088 new HashMap<ChangeNumber, AckMessageList>();
089 private ReplicationServerDomain replicationServerDomain = null;
090 private String serverURL;
091 private int outCount = 0; // number of update sent to the server
092 private int inCount = 0; // number of updates received from the server
093 private int inAckCount = 0;
094 private int outAckCount = 0;
095 private int maxReceiveQueue = 0;
096 private int maxSendQueue = 0;
097 private int maxReceiveDelay = 0;
098 private int maxSendDelay = 0;
099 private int maxQueueSize = 10000;
100 private int restartReceiveQueue;
101 private int restartSendQueue;
102 private int restartReceiveDelay;
103 private int restartSendDelay;
104 private boolean serverIsLDAPserver;
105 private boolean following = false;
106 private ServerState serverState;
107 private boolean active = true;
108 private ServerWriter writer = null;
109 private DN baseDn = null;
110 private String serverAddressURL;
111 private int rcvWindow;
112 private int rcvWindowSizeHalf;
113 private int maxRcvWindow;
114 private ServerReader reader;
115 private Semaphore sendWindow;
116 private int sendWindowSize;
117 private boolean flowControl = false; // indicate that the server is
118 // flow controled and should
119 // be stopped from sending messsages.
120 private int saturationCount = 0;
121 private short replicationServerId;
122
123 private short protocolVersion;
124 private long generationId = -1;
125
126
127 /**
128 * When this Handler is related to a remote replication server
129 * this collection will contain as many elements as there are
130 * LDAP servers connected to the remote replication server.
131 */
132 private final Map<Short, LightweightServerHandler> connectedServers =
133 new ConcurrentHashMap<Short, LightweightServerHandler>();
134
135 /**
136 * The time in milliseconds between heartbeats from the replication
137 * server. Zero means heartbeats are off.
138 */
139 private long heartbeatInterval = 0;
140
141 /**
142 * The thread that will send heartbeats.
143 */
144 HeartbeatThread heartbeatThread = null;
145
146 /**
147 * Set when ServerHandler is stopping.
148 */
149 private boolean shutdown = false;
150
151 private static final Map<ChangeNumber, ReplServerAckMessageList>
152 changelogsWaitingAcks =
153 new HashMap<ChangeNumber, ReplServerAckMessageList>();
154
155 /**
156 * Creates a new server handler instance with the provided socket.
157 *
158 * @param session The ProtocolSession used by the ServerHandler to
159 * communicate with the remote entity.
160 * @param queueSize The maximum number of update that will be kept
161 * in memory by this ServerHandler.
162 */
163 public ServerHandler(ProtocolSession session, int queueSize)
164 {
165 super("Server Handler");
166 this.session = session;
167 this.maxQueueSize = queueSize;
168 this.protocolVersion = ProtocolVersion.currentVersion();
169 }
170
171 /**
172 * Do the exchange of start messages to know if the remote
173 * server is an LDAP or replication server and to exchange serverID.
174 * Then create the reader and writer thread.
175 *
176 * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
177 * null if this is an incoming connection (listen).
178 * @param replicationServerId The identifier of the replicationServer that
179 * creates this server handler.
180 * @param replicationServerURL The URL of the replicationServer that creates
181 * this server handler.
182 * @param windowSize the window size that this server handler must use.
183 * @param sslEncryption For outgoing connections indicates whether encryption
184 * should be used after the exchange of start messages.
185 * Ignored for incoming connections.
186 * @param replicationServer the ReplicationServer that created this server
187 * handler.
188 */
189 public void start(DN baseDn, short replicationServerId,
190 String replicationServerURL,
191 int windowSize, boolean sslEncryption,
192 ReplicationServer replicationServer)
193 {
194 if (debugEnabled())
195 TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
196 " starts a new LS or RS " +
197 ((baseDn == null)?"incoming connection":"outgoing connection"));
198
199 this.replicationServerId = replicationServerId;
200 rcvWindowSizeHalf = windowSize/2;
201 maxRcvWindow = windowSize;
202 rcvWindow = windowSize;
203 long localGenerationId = -1;
204 boolean handshakeOnly = false;
205
206 try
207 {
208 if (baseDn != null)
209 {
210 // This is an outgoing connection. Publish our start message.
211 this.baseDn = baseDn;
212
213 // Get or create the ReplicationServerDomain
214 replicationServerDomain =
215 replicationServer.getReplicationServerDomain(baseDn, true);
216 localGenerationId = replicationServerDomain.getGenerationId();
217
218 ServerState localServerState =
219 replicationServerDomain.getDbServerState();
220 ReplServerStartMessage msg =
221 new ReplServerStartMessage(replicationServerId, replicationServerURL,
222 baseDn, windowSize, localServerState,
223 protocolVersion, localGenerationId,
224 sslEncryption);
225
226 session.publish(msg);
227 }
228
229 // Wait and process ServerStart or ReplServerStart
230 ReplicationMessage msg = session.receive();
231 if (msg instanceof ServerStartMessage)
232 {
233 // The remote server is an LDAP Server.
234 ServerStartMessage receivedMsg = (ServerStartMessage) msg;
235
236 generationId = receivedMsg.getGenerationId();
237 protocolVersion = ProtocolVersion.minWithCurrent(
238 receivedMsg.getVersion());
239 serverId = receivedMsg.getServerId();
240 serverURL = receivedMsg.getServerURL();
241 this.baseDn = receivedMsg.getBaseDn();
242 this.serverState = receivedMsg.getServerState();
243
244 maxReceiveDelay = receivedMsg.getMaxReceiveDelay();
245 maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
246 maxSendDelay = receivedMsg.getMaxSendDelay();
247 maxSendQueue = receivedMsg.getMaxSendQueue();
248 heartbeatInterval = receivedMsg.getHeartbeatInterval();
249
250 handshakeOnly = receivedMsg.isHandshakeOnly();
251
252 // The session initiator decides whether to use SSL.
253 sslEncryption = receivedMsg.getSSLEncryption();
254
255 if (maxReceiveQueue > 0)
256 restartReceiveQueue = (maxReceiveQueue > 1000 ?
257 maxReceiveQueue - 200 :
258 maxReceiveQueue*8/10);
259 else
260 restartReceiveQueue = 0;
261
262 if (maxSendQueue > 0)
263 restartSendQueue = (maxSendQueue > 1000 ? maxSendQueue - 200 :
264 maxSendQueue*8/10);
265 else
266 restartSendQueue = 0;
267
268 if (maxReceiveDelay > 0)
269 restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 :
270 maxReceiveDelay);
271 else
272 restartReceiveDelay = 0;
273
274 if (maxSendDelay > 0)
275 restartSendDelay = (maxSendDelay > 10 ?
276 maxSendDelay -1 :
277 maxSendDelay);
278 else
279 restartSendDelay = 0;
280
281 if (heartbeatInterval < 0)
282 {
283 heartbeatInterval = 0;
284 }
285
286 serverIsLDAPserver = true;
287
288 // Get or Create the ReplicationServerDomain
289 replicationServerDomain =
290 replicationServer.getReplicationServerDomain(this.baseDn, true);
291
292 replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
293 replicationServerDomain.mayResetGenerationId();
294
295 localGenerationId = replicationServerDomain.getGenerationId();
296
297 ServerState localServerState =
298 replicationServerDomain.getDbServerState();
299 // This an incoming connection. Publish our start message
300 ReplServerStartMessage myStartMsg =
301 new ReplServerStartMessage(replicationServerId, replicationServerURL,
302 this.baseDn, windowSize, localServerState,
303 protocolVersion, localGenerationId,
304 sslEncryption);
305 session.publish(myStartMsg);
306 sendWindowSize = receivedMsg.getWindowSize();
307
308 /* Until here session is encrypted then it depends on the negociation */
309 if (!sslEncryption)
310 {
311 session.stopEncryption();
312 }
313
314 if (debugEnabled())
315 {
316 Set<String> ss = this.serverState.toStringSet();
317 Set<String> lss =
318 replicationServerDomain.getDbServerState().toStringSet();
319 TRACER.debugInfo("In " + replicationServerDomain.
320 getReplicationServer().getMonitorInstanceName() +
321 ", SH received START from LS serverId=" + serverId +
322 " baseDN=" + this.baseDn +
323 " generationId=" + generationId +
324 " localGenerationId=" + localGenerationId +
325 " state=" + ss +
326 " and sent ReplServerStart with state=" + lss);
327 }
328
329 /*
330 * If we have already a generationID set for the domain
331 * then
332 * if the connecting replica has not the same
333 * then it is degraded locally and notified by an error message
334 * else
335 * we set the generationID from the one received
336 * (unsaved yet on disk . will be set with the 1rst change received)
337 */
338 if (localGenerationId>0)
339 {
340 if (generationId != localGenerationId)
341 {
342 Message message = NOTE_BAD_GENERATION_ID.get(
343 receivedMsg.getBaseDn().toNormalizedString(),
344 Short.toString(receivedMsg.getServerId()),
345 Long.toString(generationId),
346 Long.toString(localGenerationId));
347
348 ErrorMessage errorMsg =
349 new ErrorMessage(replicationServerId, serverId, message);
350 session.publish(errorMsg);
351 }
352 }
353 else
354 {
355 // We are an empty Replicationserver
356 if ((generationId>0)&&(!serverState.isEmpty()))
357 {
358 // If the LDAP server has already sent changes
359 // it is not expected to connect to an empty RS
360 Message message = NOTE_BAD_GENERATION_ID.get(
361 receivedMsg.getBaseDn().toNormalizedString(),
362 Short.toString(receivedMsg.getServerId()),
363 Long.toString(generationId),
364 Long.toString(localGenerationId));
365
366 ErrorMessage errorMsg =
367 new ErrorMessage(replicationServerId, serverId, message);
368 session.publish(errorMsg);
369 }
370 else
371 {
372 replicationServerDomain.setGenerationId(generationId, false);
373 }
374 }
375 }
376 else if (msg instanceof ReplServerStartMessage)
377 {
378 // The remote server is a replication server
379 ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
380 protocolVersion = ProtocolVersion.minWithCurrent(
381 receivedMsg.getVersion());
382 generationId = receivedMsg.getGenerationId();
383 serverId = receivedMsg.getServerId();
384 serverURL = receivedMsg.getServerURL();
385 int separator = serverURL.lastIndexOf(':');
386 serverAddressURL =
387 session.getRemoteAddress() + ":" + serverURL.substring(separator + 1);
388 serverIsLDAPserver = false;
389 this.baseDn = receivedMsg.getBaseDn();
390 if (baseDn == null)
391 {
392 // Get or create the ReplicationServerDomain
393 replicationServerDomain = replicationServer.
394 getReplicationServerDomain(this.baseDn, true);
395 localGenerationId = replicationServerDomain.getGenerationId();
396 ServerState serverState = replicationServerDomain.getDbServerState();
397
398 // The session initiator decides whether to use SSL.
399 sslEncryption = receivedMsg.getSSLEncryption();
400
401 // Publish our start message
402 ReplServerStartMessage outMsg =
403 new ReplServerStartMessage(replicationServerId,
404 replicationServerURL,
405 this.baseDn, windowSize, serverState,
406 protocolVersion,
407 localGenerationId,
408 sslEncryption);
409 session.publish(outMsg);
410 }
411 else
412 {
413 this.baseDn = baseDn;
414 }
415 this.serverState = receivedMsg.getServerState();
416 sendWindowSize = receivedMsg.getWindowSize();
417
418 /* Until here session is encrypted then it depends on the negociation */
419 if (!sslEncryption)
420 {
421 session.stopEncryption();
422 }
423
424 if (debugEnabled())
425 {
426 Set<String> ss = this.serverState.toStringSet();
427 Set<String> lss =
428 replicationServerDomain.getDbServerState().toStringSet();
429 TRACER.debugInfo("In " + replicationServerDomain.
430 getReplicationServer().getMonitorInstanceName() +
431 ", SH received START from RS serverId=" + serverId +
432 " baseDN=" + this.baseDn +
433 " generationId=" + generationId +
434 " localGenerationId=" + localGenerationId +
435 " state=" + ss +
436 " and sent ReplServerStart with state=" + lss);
437 }
438
439 // if the remote RS and the local RS have the same genID
440 // then it's ok and nothing else to do
441 if (generationId == localGenerationId)
442 {
443 if (debugEnabled())
444 {
445 TRACER.debugInfo("In " +
446 replicationServerDomain.getReplicationServer().
447 getMonitorInstanceName() + " RS with serverID=" + serverId +
448 " is connected with the right generation ID");
449 }
450 }
451 else
452 {
453 if (localGenerationId>0)
454 {
455 // if the local RS is initialized
456 if (generationId>0)
457 {
458 // if the remote RS is initialized
459 if (generationId != localGenerationId)
460 {
461 // if the 2 RS have different generationID
462 if (replicationServerDomain.getGenerationIdSavedStatus())
463 {
464 // it the present RS has received changes regarding its
465 // gen ID and so won't change without a reset
466 // then we are just degrading the peer.
467 Message message = NOTE_BAD_GENERATION_ID.get(
468 this.baseDn.toNormalizedString(),
469 Short.toString(receivedMsg.getServerId()),
470 Long.toString(generationId),
471 Long.toString(localGenerationId));
472
473 ErrorMessage errorMsg =
474 new ErrorMessage(replicationServerId, serverId, message);
475 session.publish(errorMsg);
476 }
477 else
478 {
479 // The present RS has never received changes regarding its
480 // gen ID.
481 //
482 // Example case:
483 // - we are in RS1
484 // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
485 // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
486 // - we are in RS1 and we receive a START msg from RS2
487 // - Each RS keeps its genID / is degraded and when LS2 will
488 // be populated from LS1 everything will becomes ok.
489 //
490 // Issue:
491 // FIXME : Would it be a good idea in some cases to just
492 // set the gen ID received from the peer RS
493 // specially if the peer has a non nul state and
494 // we have a nul state ?
495 // replicationServerDomain.
496 // setGenerationId(generationId, false);
497 Message message = NOTE_BAD_GENERATION_ID.get(
498 this.baseDn.toNormalizedString(),
499 Short.toString(receivedMsg.getServerId()),
500 Long.toString(generationId),
501 Long.toString(localGenerationId));
502
503 ErrorMessage errorMsg =
504 new ErrorMessage(replicationServerId, serverId, message);
505 session.publish(errorMsg);
506 }
507 }
508 }
509 else
510 {
511 // The remote has no genId. We don't change anything for the
512 // current RS.
513 }
514 }
515 else
516 {
517 // The local RS is not initialized - take the one received
518 replicationServerDomain.setGenerationId(generationId, false);
519 }
520 }
521 }
522 else
523 {
524 // TODO : log error
525 return; // we did not recognize the message, ignore it
526 }
527
528 // Get or create the ReplicationServerDomain
529 replicationServerDomain = replicationServer.
530 getReplicationServerDomain(this.baseDn,true);
531
532 if (!handshakeOnly)
533 {
534 boolean started;
535 if (serverIsLDAPserver)
536 {
537 started = replicationServerDomain.startServer(this);
538 }
539 else
540 {
541 started = replicationServerDomain.startReplicationServer(this);
542 }
543
544 if (started)
545 {
546 // sendWindow MUST be created before starting the writer
547 sendWindow = new Semaphore(sendWindowSize);
548
549 writer = new ServerWriter(session, serverId,
550 this, replicationServerDomain);
551 reader = new ServerReader(session, serverId,
552 this, replicationServerDomain);
553
554 reader.start();
555 writer.start();
556
557 // Create a thread to send heartbeat messages.
558 if (heartbeatInterval > 0)
559 {
560 heartbeatThread = new HeartbeatThread(
561 "replication Heartbeat to " + serverURL +
562 " for " + this.baseDn,
563 session, heartbeatInterval/3);
564 heartbeatThread.start();
565 }
566
567 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
568 DirectoryServer.registerMonitorProvider(this);
569 }
570 else
571 {
572 // the connection is not valid, close it.
573 try
574 {
575 if (debugEnabled())
576 {
577 TRACER.debugInfo("In " +
578 replicationServerDomain.getReplicationServer().
579 getMonitorInstanceName() + " RS failed to start locally " +
580 " the connection from serverID="+serverId);
581 }
582 session.close();
583 } catch (IOException e1)
584 {
585 // ignore
586 }
587 }
588 }
589 else
590 {
591 // For a hanshakeOnly connection, let's only create a reader
592 // in order to detect the connection closure.
593 reader = new ServerReader(session, serverId,
594 this, replicationServerDomain);
595 reader.start();
596 }
597 }
598 catch (Exception e)
599 {
600 // some problem happened, reject the connection
601 MessageBuilder mb = new MessageBuilder();
602 mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
603 this.getMonitorInstanceName()));
604 mb.append(stackTraceToSingleLineString(e));
605 logError(mb.toMessage());
606 try
607 {
608 session.close();
609 } catch (IOException e1)
610 {
611 // ignore
612 }
613 }
614 }
615
616 /**
617 * get the Server Id.
618 *
619 * @return the ID of the server to which this object is linked
620 */
621 public short getServerId()
622 {
623 return serverId;
624 }
625
626 /**
627 * Retrieves the Address URL for this server handler.
628 *
629 * @return The Address URL for this server handler,
630 * in the form of an IP address and port separated by a colon.
631 */
632 public String getServerAddressURL()
633 {
634 return serverAddressURL;
635 }
636
637 /**
638 * Retrieves the URL for this server handler.
639 *
640 * @return The URL for this server handler, in the form of an address and
641 * port separated by a colon.
642 */
643 public String getServerURL()
644 {
645 return serverURL;
646 }
647
648 /**
649 * Increase the counter of updates sent to the server.
650 */
651 public void incrementOutCount()
652 {
653 outCount++;
654 }
655
656 /**
657 * Increase the counter of update received from the server.
658 */
659 public void incrementInCount()
660 {
661 inCount++;
662 }
663
664 /**
665 * Get the count of updates received from the server.
666 * @return the count of update received from the server.
667 */
668 public int getInCount()
669 {
670 return inCount;
671 }
672
673 /**
674 * Get the count of updates sent to this server.
675 * @return The count of update sent to this server.
676 */
677 public int getOutCount()
678 {
679 return outCount;
680 }
681
682 /**
683 * Get the number of Ack received from the server managed by this handler.
684 *
685 * @return Returns the inAckCount.
686 */
687 public int getInAckCount()
688 {
689 return inAckCount;
690 }
691
692 /**
693 * Get the number of Ack sent to the server managed by this handler.
694 *
695 * @return Returns the outAckCount.
696 */
697 public int getOutAckCount()
698 {
699 return outAckCount;
700 }
701
702 /**
703 * Check is this server is saturated (this server has already been
704 * sent a bunch of updates and has not processed them so they are staying
705 * in the message queue for this server an the size of the queue
706 * for this server is above the configured limit.
707 *
708 * The limit can be defined in number of updates or with a maximum delay
709 *
710 * @param changeNumber The changenumber to use to make the delay calculations.
711 * @param sourceHandler The ServerHandler which is sending the update.
712 * @return true is saturated false if not saturated.
713 */
714 public boolean isSaturated(ChangeNumber changeNumber,
715 ServerHandler sourceHandler)
716 {
717 synchronized (msgQueue)
718 {
719 int size = msgQueue.size();
720
721 if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
722 return true;
723
724 if ((sourceHandler.maxSendQueue > 0) &&
725 (size >= sourceHandler.maxSendQueue))
726 return true;
727
728 if (!msgQueue.isEmpty())
729 {
730 UpdateMessage firstUpdate = msgQueue.first();
731
732 if (firstUpdate != null)
733 {
734 long timeDiff = changeNumber.getTimeSec() -
735 firstUpdate.getChangeNumber().getTimeSec();
736
737 if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
738 return true;
739
740 if ((sourceHandler.maxSendDelay > 0) &&
741 (timeDiff >= sourceHandler.maxSendDelay))
742 return true;
743 }
744 }
745 return false;
746 }
747 }
748
749 /**
750 * Check that the size of the Server Handler messages Queue has lowered
751 * below the limit and therefore allowing the reception of messages
752 * from other servers to restart.
753 * @param source The ServerHandler which was sending the update.
754 * can be null.
755 * @return true if the processing can restart
756 */
757 public boolean restartAfterSaturation(ServerHandler source)
758 {
759 synchronized (msgQueue)
760 {
761 int queueSize = msgQueue.size();
762 if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
763 return false;
764 if ((source != null) && (source.maxSendQueue > 0) &&
765 (queueSize >= source.restartSendQueue))
766 return false;
767
768 if (!msgQueue.isEmpty())
769 {
770 UpdateMessage firstUpdate = msgQueue.first();
771 UpdateMessage lastUpdate = msgQueue.last();
772
773 if ((firstUpdate != null) && (lastUpdate != null))
774 {
775 long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
776 firstUpdate.getChangeNumber().getTimeSec();
777 if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
778 return false;
779 if ((source != null) && (source.maxSendDelay > 0)
780 && (timeDiff >= source.restartSendDelay))
781 return false;
782 }
783 }
784 }
785 return true;
786 }
787
788 /**
789 * Check if the server associated to this ServerHandler is a replication
790 * server.
791 * @return true if the server associated to this ServerHandler is a
792 * replication server.
793 */
794 public boolean isReplicationServer()
795 {
796 return (!serverIsLDAPserver);
797 }
798
799 /**
800 * Get the number of message in the receive message queue.
801 * @return Size of the receive message queue.
802 */
803 public int getRcvMsgQueueSize()
804 {
805 synchronized (msgQueue)
806 {
807 /*
808 * When the server is up to date or close to be up to date,
809 * the number of updates to be sent is the size of the receive queue.
810 */
811 if (isFollowing())
812 return msgQueue.size();
813 else
814 {
815 /*
816 * When the server is not able to follow, the msgQueue
817 * may become too large and therefore won't contain all the
818 * changes. Some changes may only be stored in the backing DB
819 * of the servers.
820 * The total size of teh receieve queue is calculated by doing
821 * the sum of the number of missing changes for every dbHandler.
822 */
823 int totalCount = 0;
824 ServerState dbState = replicationServerDomain.getDbServerState();
825 for (short id : dbState)
826 {
827 totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
828 serverState.getMaxChangeNumber(id));
829 }
830 return totalCount;
831 }
832 }
833 }
834
835 /**
836 * Get an approximation of the delay by looking at the age of the oldest
837 * message that has not been sent to this server.
838 * This is an approximation because the age is calculated using the
839 * clock of the servee where the replicationServer is currently running
840 * while it should be calculated using the clock of the server
841 * that originally processed the change.
842 *
843 * The approximation error is therefore the time difference between
844 *
845 * @return the approximate delay for the connected server.
846 */
847 public long getApproxDelay()
848 {
849 long olderUpdateTime = getOlderUpdateTime();
850 if (olderUpdateTime == 0)
851 return 0;
852
853 long currentTime = TimeThread.getTime();
854 return ((currentTime - olderUpdateTime)/1000);
855 }
856
857 /**
858 * Get the age of the older change that has not yet been replicated
859 * to the server handled by this ServerHandler.
860 * @return The age if the older change has not yet been replicated
861 * to the server handled by this ServerHandler.
862 */
863 public Long getApproxFirstMissingDate()
864 {
865 Long result = (long)0;
866
867 // Get the older CN received
868 ChangeNumber olderUpdateCN = getOlderUpdateCN();
869 if (olderUpdateCN != null)
870 {
871 // If not present in the local RS db,
872 // then approximate with the older update time
873 result=olderUpdateCN.getTime();
874 }
875 return result;
876 }
877
878 /**
879 * Get the older update time for that server.
880 * @return The older update time.
881 */
882 public long getOlderUpdateTime()
883 {
884 ChangeNumber olderUpdateCN = getOlderUpdateCN();
885 if (olderUpdateCN == null)
886 return 0;
887 return olderUpdateCN.getTime();
888 }
889
890 /**
891 * Get the older Change Number for that server.
892 * Returns null when the queue is empty.
893 * @return The older change number.
894 */
895 public ChangeNumber getOlderUpdateCN()
896 {
897 ChangeNumber result = null;
898 synchronized (msgQueue)
899 {
900 if (isFollowing())
901 {
902 if (msgQueue.isEmpty())
903 {
904 result=null;
905 }
906 else
907 {
908 UpdateMessage msg = msgQueue.first();
909 result = msg.getChangeNumber();
910 }
911 }
912 else
913 {
914 if (lateQueue.isEmpty())
915 {
916 // isFollowing is false AND lateQueue is empty
917 // We may be at the very moment when the writer has emptyed the
918 // lateQueue when it sent the last update. The writer will fill again
919 // the lateQueue when it will send the next update but we are not yet
920 // there. So let's take the last change not sent directly from
921 // the db.
922
923 ReplicationIteratorComparator comparator =
924 new ReplicationIteratorComparator();
925 SortedSet<ReplicationIterator> iteratorSortedSet =
926 new TreeSet<ReplicationIterator>(comparator);
927 try
928 {
929 // Build a list of candidates iterator (i.e. db i.e. server)
930 for (short serverId : replicationServerDomain.getServers())
931 {
932 // get the last already sent CN from that server
933 ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
934 // get an iterator in this server db from that last change
935 ReplicationIterator iterator =
936 replicationServerDomain.getChangelogIterator(serverId, lastCsn);
937 // if that iterator has changes, then it is a candidate
938 // it is added in the sorted list at a position given by its
939 // current change (see ReplicationIteratorComparator).
940 if ((iterator != null) && (iterator.getChange() != null))
941 {
942 iteratorSortedSet.add(iterator);
943 }
944 }
945 UpdateMessage msg = iteratorSortedSet.first().getChange();
946 result = msg.getChangeNumber();
947 }
948 catch(Exception e)
949 {
950 result=null;
951 }
952 finally
953 {
954 for (ReplicationIterator iterator : iteratorSortedSet)
955 {
956 iterator.releaseCursor();
957 }
958 }
959 }
960 else
961 {
962 UpdateMessage msg = lateQueue.first();
963 result = msg.getChangeNumber();
964 }
965 }
966 }
967 return result;
968 }
969
970 /**
971 * Check if the LDAP server can follow the speed of the other servers.
972 * @return true when the server has all the not yet sent changes
973 * in its queue.
974 */
975 public boolean isFollowing()
976 {
977 return following;
978 }
979
980 /**
981 * Set the following flag of this server.
982 * @param following the value that should be set.
983 */
984 public void setFollowing(boolean following)
985 {
986 this.following = following;
987 }
988
989 /**
990 * Add an update the list of updates that must be sent to the server
991 * managed by this ServerHandler.
992 *
993 * @param update The update that must be added to the list of updates.
994 * @param sourceHandler The server that sent the update.
995 */
996 public void add(UpdateMessage update, ServerHandler sourceHandler)
997 {
998 /*
999 * Ignore updates from a server that is degraded due to
1000 * its inconsistent generationId
1001 */
1002 long referenceGenerationId = replicationServerDomain.getGenerationId();
1003 if ((referenceGenerationId>0) &&
1004 (referenceGenerationId != generationId))
1005 {
1006 logError(ERR_IGNORING_UPDATE_TO.get(
1007 update.getDn(),
1008 this.getMonitorInstanceName()));
1009
1010 return;
1011 }
1012
1013 synchronized (msgQueue)
1014 {
1015 /*
1016 * If queue was empty the writer thread was probably asleep
1017 * waiting for some changes, wake it up
1018 */
1019 if (msgQueue.isEmpty())
1020 msgQueue.notify();
1021
1022 msgQueue.add(update);
1023
1024 /* TODO : size should be configurable
1025 * and larger than max-receive-queue-size
1026 */
1027 while (msgQueue.size() > maxQueueSize)
1028 {
1029 setFollowing(false);
1030 msgQueue.removeFirst();
1031 }
1032 }
1033
1034 if (isSaturated(update.getChangeNumber(), sourceHandler))
1035 {
1036 sourceHandler.setSaturated(true);
1037 }
1038
1039 }
1040
1041 private void setSaturated(boolean value)
1042 {
1043 flowControl = value;
1044 }
1045
1046 /**
1047 * Select the next update that must be sent to the server managed by this
1048 * ServerHandler.
1049 *
1050 * @return the next update that must be sent to the server managed by this
1051 * ServerHandler.
1052 */
1053 public UpdateMessage take()
1054 {
1055 boolean interrupted = true;
1056 UpdateMessage msg = getnextMessage();
1057
1058 /*
1059 * When we remove a message from the queue we need to check if another
1060 * server is waiting in flow control because this queue was too long.
1061 * This check might cause a performance penalty an therefore it
1062 * is not done for every message removed but only every few messages.
1063 */
1064 if (++saturationCount > 10)
1065 {
1066 saturationCount = 0;
1067 try
1068 {
1069 replicationServerDomain.checkAllSaturation();
1070 }
1071 catch (IOException e)
1072 {
1073 }
1074 }
1075 boolean acquired = false;
1076 do
1077 {
1078 try
1079 {
1080 acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
1081 interrupted = false;
1082 } catch (InterruptedException e)
1083 {
1084 // loop until not interrupted
1085 }
1086 } while (((interrupted) || (!acquired )) && (!shutdown));
1087 this.incrementOutCount();
1088 return msg;
1089 }
1090
1091 /**
1092 * Get the next update that must be sent to the server
1093 * from the message queue or from the database.
1094 *
1095 * @return The next update that must be sent to the server.
1096 */
1097 private UpdateMessage getnextMessage()
1098 {
1099 UpdateMessage msg;
1100 while (active == true)
1101 {
1102 if (following == false)
1103 {
1104 /* this server is late with regard to some other masters
1105 * in the topology or just joined the topology.
1106 * In such cases, we can't keep all changes in the queue
1107 * without saturating the memory, we therefore use
1108 * a lateQueue that is filled with a few changes from the changelogDB
1109 * If this server is able to close the gap, it will start using again
1110 * the regular msgQueue later.
1111 */
1112 if (lateQueue.isEmpty())
1113 {
1114 /*
1115 * Start from the server State
1116 * Loop until the queue high mark or until no more changes
1117 * for each known LDAP master
1118 * get the next CSN after this last one :
1119 * - try to get next from the file
1120 * - if not found in the file
1121 * - try to get the next from the queue
1122 * select the smallest of changes
1123 * check if it is in the memory tree
1124 * yes : lock memory tree.
1125 * check all changes from the list, remove the ones that
1126 * are already sent
1127 * unlock memory tree
1128 * restart as usual
1129 * load this change on the delayList
1130 *
1131 */
1132 ReplicationIteratorComparator comparator =
1133 new ReplicationIteratorComparator();
1134 SortedSet<ReplicationIterator> iteratorSortedSet =
1135 new TreeSet<ReplicationIterator>(comparator);
1136 /* fill the lateQueue */
1137 for (short serverId : replicationServerDomain.getServers())
1138 {
1139 ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
1140 ReplicationIterator iterator =
1141 replicationServerDomain.getChangelogIterator(serverId, lastCsn);
1142 if (iterator != null)
1143 {
1144 if (iterator.getChange() != null)
1145 {
1146 iteratorSortedSet.add(iterator);
1147 }
1148 else
1149 {
1150 iterator.releaseCursor();
1151 }
1152 }
1153 }
1154
1155 // The loop below relies on the fact that it is sorted based
1156 // on the currentChange of each iterator to consider the next
1157 // change accross all servers.
1158 // Hence it is necessary to remove and eventual add again an iterator
1159 // when looping in order to keep consistent the order of the
1160 // iterators (see ReplicationIteratorComparator.
1161 while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
1162 {
1163 ReplicationIterator iterator = iteratorSortedSet.first();
1164 iteratorSortedSet.remove(iterator);
1165 lateQueue.add(iterator.getChange());
1166 if (iterator.next())
1167 iteratorSortedSet.add(iterator);
1168 else
1169 iterator.releaseCursor();
1170 }
1171 for (ReplicationIterator iterator : iteratorSortedSet)
1172 {
1173 iterator.releaseCursor();
1174 }
1175 /*
1176 * Check if the first change in the lateQueue is also on the regular
1177 * queue
1178 */
1179 if (lateQueue.isEmpty())
1180 {
1181 synchronized (msgQueue)
1182 {
1183 if (msgQueue.size() < maxQueueSize)
1184 {
1185 setFollowing(true);
1186 }
1187 }
1188 }
1189 else
1190 {
1191 msg = lateQueue.first();
1192 synchronized (msgQueue)
1193 {
1194 if (msgQueue.contains(msg))
1195 {
1196 /* we finally catched up with the regular queue */
1197 setFollowing(true);
1198 lateQueue.clear();
1199 UpdateMessage msg1;
1200 do
1201 {
1202 msg1 = msgQueue.removeFirst();
1203 } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
1204 this.updateServerState(msg);
1205 return msg;
1206 }
1207 }
1208 }
1209 }
1210 else
1211 {
1212 /* get the next change from the lateQueue */
1213 msg = lateQueue.removeFirst();
1214 this.updateServerState(msg);
1215 return msg;
1216 }
1217 }
1218 synchronized (msgQueue)
1219 {
1220 if (following == true)
1221 {
1222 try
1223 {
1224 while (msgQueue.isEmpty())
1225 {
1226 msgQueue.wait(500);
1227 if (!active)
1228 return null;
1229 }
1230 } catch (InterruptedException e)
1231 {
1232 return null;
1233 }
1234 msg = msgQueue.removeFirst();
1235 if (this.updateServerState(msg))
1236 {
1237 /*
1238 * Only push the message if it has not yet been seen
1239 * by the other server.
1240 * Otherwise just loop to select the next message.
1241 */
1242 return msg;
1243 }
1244 }
1245 }
1246 /*
1247 * Need to loop because following flag may have gone to false between
1248 * the first check at the beginning of this method
1249 * and the second check just above.
1250 */
1251 }
1252 return null;
1253 }
1254
1255 /**
1256 * Update the serverState with the last message sent.
1257 *
1258 * @param msg the last update sent.
1259 * @return boolean indicating if the update was meaningfull.
1260 */
1261 public boolean updateServerState(UpdateMessage msg)
1262 {
1263 return serverState.update(msg.getChangeNumber());
1264 }
1265
1266 /**
1267 * Get the state of this server.
1268 *
1269 * @return ServerState the state for this server..
1270 */
1271 public ServerState getServerState()
1272 {
1273 return serverState;
1274 }
1275
1276 /**
1277 * Stop this server handler processing.
1278 */
1279 public void stopHandler()
1280 {
1281 active = false;
1282
1283 // Stop the remote LSHandler
1284 for (LightweightServerHandler lsh : connectedServers.values())
1285 {
1286 lsh.stopHandler();
1287 }
1288 connectedServers.clear();
1289
1290 try
1291 {
1292 session.close();
1293 } catch (IOException e)
1294 {
1295 // ignore.
1296 }
1297
1298 synchronized (msgQueue)
1299 {
1300 /* wake up the writer thread on an empty queue so that it disappear */
1301 msgQueue.clear();
1302 msgQueue.notify();
1303 msgQueue.notifyAll();
1304 }
1305
1306 // Stop the heartbeat thread.
1307 if (heartbeatThread != null)
1308 {
1309 heartbeatThread.shutdown();
1310 }
1311
1312 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
1313 }
1314
1315 /**
1316 * Send the ack to the server that did the original modification.
1317 *
1318 * @param changeNumber The ChangeNumber of the update that is acked.
1319 * @throws IOException In case of Exception thrown sending the ack.
1320 */
1321 public void sendAck(ChangeNumber changeNumber) throws IOException
1322 {
1323 AckMessage ack = new AckMessage(changeNumber);
1324 session.publish(ack);
1325 outAckCount++;
1326 }
1327
1328 /**
1329 * Do the work when an ack message has been received from another server.
1330 *
1331 * @param message The ack message that was received.
1332 * @param ackingServerId The id of the server that acked the change.
1333 */
1334 public void ack(AckMessage message, short ackingServerId)
1335 {
1336 ChangeNumber changeNumber = message.getChangeNumber();
1337 AckMessageList ackList;
1338 boolean completedFlag;
1339 synchronized (waitingAcks)
1340 {
1341 ackList = waitingAcks.get(changeNumber);
1342 if (ackList == null)
1343 return;
1344 ackList.addAck(ackingServerId);
1345 completedFlag = ackList.completed();
1346 if (completedFlag)
1347 {
1348 waitingAcks.remove(changeNumber);
1349 }
1350 }
1351 if (completedFlag)
1352 {
1353 replicationServerDomain.sendAck(changeNumber, true);
1354 }
1355 }
1356
1357 /**
1358 * Process reception of an for an update that was received from a
1359 * ReplicationServer.
1360 *
1361 * @param message the ack message that was received.
1362 * @param ackingServerId The id of the server that acked the change.
1363 */
1364 public static void ackChangelog(AckMessage message, short ackingServerId)
1365 {
1366 ChangeNumber changeNumber = message.getChangeNumber();
1367 ReplServerAckMessageList ackList;
1368 boolean completedFlag;
1369 synchronized (changelogsWaitingAcks)
1370 {
1371 ackList = changelogsWaitingAcks.get(changeNumber);
1372 if (ackList == null)
1373 return;
1374 ackList.addAck(ackingServerId);
1375 completedFlag = ackList.completed();
1376 if (completedFlag)
1377 {
1378 changelogsWaitingAcks.remove(changeNumber);
1379 }
1380 }
1381 if (completedFlag)
1382 {
1383 ReplicationServerDomain replicationServerDomain =
1384 ackList.getChangelogCache();
1385 replicationServerDomain.sendAck(changeNumber, false,
1386 ackList.getReplicationServerId());
1387 }
1388 }
1389
1390 /**
1391 * Add an update to the list of update waiting for acks.
1392 *
1393 * @param update the update that must be added to the list
1394 * @param nbWaitedAck The number of ack that must be received before
1395 * the update is fully acked.
1396 */
1397 public void addWaitingAck(UpdateMessage update, int nbWaitedAck)
1398 {
1399 AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
1400 nbWaitedAck);
1401 synchronized(waitingAcks)
1402 {
1403 waitingAcks.put(update.getChangeNumber(), ackList);
1404 }
1405 }
1406
1407 /**
1408 * Add an update to the list of update received from a replicationServer and
1409 * waiting for acks.
1410 *
1411 * @param update The update that must be added to the list.
1412 * @param ChangelogServerId The identifier of the replicationServer that sent
1413 * the update.
1414 * @param replicationServerDomain The ReplicationServerDomain from which the
1415 * change was processed and to which the ack
1416 * must later be sent.
1417 * @param nbWaitedAck The number of ack that must be received before
1418 * the update is fully acked.
1419 */
1420 public static void addWaitingAck(
1421 UpdateMessage update,
1422 short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
1423 int nbWaitedAck)
1424 {
1425 ReplServerAckMessageList ackList =
1426 new ReplServerAckMessageList(update.getChangeNumber(),
1427 nbWaitedAck,
1428 ChangelogServerId,
1429 replicationServerDomain);
1430 synchronized(changelogsWaitingAcks)
1431 {
1432 changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
1433 }
1434 }
1435
1436 /**
1437 * Get the size of the list of update waiting for acks.
1438 *
1439 * @return the size of the list of update waiting for acks.
1440 */
1441 public int getWaitingAckSize()
1442 {
1443 synchronized (waitingAcks)
1444 {
1445 return waitingAcks.size();
1446 }
1447 }
1448
1449 /**
1450 * Increment the count of Acks received from this server.
1451 */
1452 public void incrementInAckCount()
1453 {
1454 inAckCount++;
1455 }
1456
1457 /**
1458 * Check type of server handled.
1459 *
1460 * @return true if the handled server is an LDAP server.
1461 * false if the handled server is a replicationServer
1462 */
1463 public boolean isLDAPserver()
1464 {
1465 return serverIsLDAPserver;
1466 }
1467
1468 /**
1469 * {@inheritDoc}
1470 */
1471 @Override
1472 public void initializeMonitorProvider(MonitorProviderCfg configuration)
1473 throws ConfigException,InitializationException
1474 {
1475 // Nothing to do for now
1476 }
1477
1478 /**
1479 * Retrieves the name of this monitor provider. It should be unique among all
1480 * monitor providers, including all instances of the same monitor provider.
1481 *
1482 * @return The name of this monitor provider.
1483 */
1484 @Override
1485 public String getMonitorInstanceName()
1486 {
1487 String str = baseDn.toString() +
1488 " " + serverURL + " " + String.valueOf(serverId);
1489
1490 if (serverIsLDAPserver)
1491 return "Direct LDAP Server " + str;
1492 else
1493 return "Remote Repl Server " + str;
1494 }
1495
1496 /**
1497 * Retrieves the length of time in milliseconds that should elapse between
1498 * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
1499 * return value indicates that the <CODE>updateMonitorData()</CODE> method
1500 * should not be periodically invoked.
1501 *
1502 * @return The length of time in milliseconds that should elapse between
1503 * calls to the <CODE>updateMonitorData()</CODE> method.
1504 */
1505 @Override
1506 public long getUpdateInterval()
1507 {
1508 /* we don't wont to do polling on this monitor */
1509 return 0;
1510 }
1511
1512 /**
1513 * Performs any processing periodic processing that may be desired to update
1514 * the information associated with this monitor. Note that best-effort
1515 * attempts will be made to ensure that calls to this method come
1516 * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
1517 * be made.
1518 */
1519 @Override
1520 public void updateMonitorData()
1521 {
1522 // As long as getUpdateInterval() returns 0, this will never get called
1523
1524 }
1525
1526 /**
1527 * Retrieves a set of attributes containing monitor data that should be
1528 * returned to the client if the corresponding monitor entry is requested.
1529 *
1530 * @return A set of attributes containing monitor data that should be
1531 * returned to the client if the corresponding monitor entry is
1532 * requested.
1533 */
1534 @Override
1535 public ArrayList<Attribute> getMonitorData()
1536 {
1537 ArrayList<Attribute> attributes = new ArrayList<Attribute>();
1538 if (serverIsLDAPserver)
1539 {
1540 attributes.add(new Attribute("LDAP-Server", serverURL));
1541 attributes.add(new Attribute("connected-to", this.replicationServerDomain.
1542 getReplicationServer().getMonitorInstanceName()));
1543
1544 }
1545 else
1546 {
1547 attributes.add(new Attribute("ReplicationServer-Server", serverURL));
1548 }
1549 attributes.add(new Attribute("server-id",
1550 String.valueOf(serverId)));
1551 attributes.add(new Attribute("base-dn",
1552 baseDn.toString()));
1553
1554 if (serverIsLDAPserver)
1555 {
1556 MonitorData md;
1557 try
1558 {
1559 md = replicationServerDomain.getMonitorData();
1560
1561 // Oldest missing update
1562 Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
1563 if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
1564 {
1565 Date date = new Date(approxFirstMissingDate);
1566 attributes.add(new Attribute("approx-older-change-not-synchronized",
1567 date.toString()));
1568 attributes.add(
1569 new Attribute("approx-older-change-not-synchronized-millis",
1570 String.valueOf(approxFirstMissingDate)));
1571 }
1572
1573 // Missing changes
1574 long missingChanges = md.getMissingChanges(serverId);
1575 attributes.add(new Attribute("missing-changes",
1576 String.valueOf(missingChanges)));
1577
1578 // Replication delay
1579 long delay = md.getApproxDelay(serverId);
1580 attributes.add(new Attribute("approximate-delay",
1581 String.valueOf(delay)));
1582 }
1583 catch(Exception e)
1584 {
1585 // TODO: improve the log
1586 // We failed retrieving the remote monitor data.
1587 attributes.add(new Attribute("error",
1588 stackTraceToSingleLineString(e)));
1589 }
1590 }
1591
1592 // Deprecated
1593 attributes.add(new Attribute("max-waiting-changes",
1594 String.valueOf(maxQueueSize)));
1595 attributes.add(new Attribute("update-sent",
1596 String.valueOf(getOutCount())));
1597 attributes.add(new Attribute("update-received",
1598 String.valueOf(getInCount())));
1599
1600 // Deprecated as long as assured is not exposed
1601 attributes.add(new Attribute("update-waiting-acks",
1602 String.valueOf(getWaitingAckSize())));
1603 attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
1604 attributes.add(new Attribute("ack-received",
1605 String.valueOf(getInAckCount())));
1606
1607 // Window stats
1608 attributes.add(new Attribute("max-send-window",
1609 String.valueOf(sendWindowSize)));
1610 attributes.add(new Attribute("current-send-window",
1611 String.valueOf(sendWindow.availablePermits())));
1612 attributes.add(new Attribute("max-rcv-window",
1613 String.valueOf(maxRcvWindow)));
1614 attributes.add(new Attribute("current-rcv-window",
1615 String.valueOf(rcvWindow)));
1616
1617 /*
1618 * FIXME:PGB DEPRECATED
1619 *
1620 // Missing changes
1621 attributes.add(new Attribute("waiting-changes",
1622 String.valueOf(getRcvMsgQueueSize())));
1623 // Age of oldest missing change
1624
1625 // Date of the oldest missing change
1626 long olderUpdateTime = getOlderUpdateTime();
1627 if (olderUpdateTime != 0)
1628 {
1629 Date date = new Date(getOlderUpdateTime());
1630 attributes.add(new Attribute("older-change-not-synchronized",
1631 String.valueOf(date.toString())));
1632 }
1633 */
1634
1635 /* get the Server State */
1636 final String ATTR_SERVER_STATE = "server-state";
1637 AttributeType type =
1638 DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
1639 LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
1640 for (String str : serverState.toStringSet())
1641 {
1642 values.add(new AttributeValue(type,str));
1643 }
1644 Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
1645 attributes.add(attr);
1646
1647 // Encryption
1648 attributes.add(new Attribute("ssl-encryption",
1649 String.valueOf(session.isEncrypted())));
1650
1651 // Data generation
1652 attributes.add(new Attribute("generation-id",
1653 String.valueOf(generationId)));
1654
1655 return attributes;
1656 }
1657
1658 /**
1659 * Shutdown This ServerHandler.
1660 */
1661 public void shutdown()
1662 {
1663 shutdown = true;
1664 try
1665 {
1666 session.close();
1667 } catch (IOException e)
1668 {
1669 // Service is closing.
1670 }
1671
1672 stopHandler();
1673
1674 try
1675 {
1676 if (writer != null) {
1677 writer.join(SHUTDOWN_JOIN_TIMEOUT);
1678 }
1679 if (reader != null) {
1680 reader.join(SHUTDOWN_JOIN_TIMEOUT);
1681 }
1682 } catch (InterruptedException e)
1683 {
1684 // don't try anymore to join and return.
1685 }
1686 }
1687
1688 /**
1689 * {@inheritDoc}
1690 */
1691 @Override
1692 public String toString()
1693 {
1694 String localString;
1695 if (serverId != 0)
1696 {
1697 if (serverIsLDAPserver)
1698 localString = "Directory Server ";
1699 else
1700 localString = "Replication Server ";
1701
1702
1703 localString += serverId + " " + serverURL + " " + baseDn;
1704 }
1705 else
1706 localString = "Unknown server";
1707
1708 return localString;
1709 }
1710
1711 /**
1712 * Decrement the protocol window, then check if it is necessary
1713 * to send a WindowMessage and send it.
1714 *
1715 * @throws IOException when the session becomes unavailable.
1716 */
1717 public synchronized void decAndCheckWindow() throws IOException
1718 {
1719 rcvWindow--;
1720 checkWindow();
1721 }
1722
1723 /**
1724 * Check the protocol window and send WindowMessage if necessary.
1725 *
1726 * @throws IOException when the session becomes unavailable.
1727 */
1728 public synchronized void checkWindow() throws IOException
1729 {
1730 if (rcvWindow < rcvWindowSizeHalf)
1731 {
1732 if (flowControl)
1733 {
1734 if (replicationServerDomain.restartAfterSaturation(this))
1735 {
1736 flowControl = false;
1737 }
1738 }
1739 if (!flowControl)
1740 {
1741 WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
1742 session.publish(msg);
1743 outAckCount++;
1744 rcvWindow += rcvWindowSizeHalf;
1745 }
1746 }
1747 }
1748
1749 /**
1750 * Update the send window size based on the credit specified in the
1751 * given window message.
1752 *
1753 * @param windowMsg The Window Message containing the information
1754 * necessary for updating the window size.
1755 */
1756 public void updateWindow(WindowMessage windowMsg)
1757 {
1758 sendWindow.release(windowMsg.getNumAck());
1759 }
1760
1761 /**
1762 * Get our heartbeat interval.
1763 * @return Our heartbeat interval.
1764 */
1765 public long getHeartbeatInterval()
1766 {
1767 return heartbeatInterval;
1768 }
1769
1770 /**
1771 * Processes a routable message.
1772 *
1773 * @param msg The message to be processed.
1774 */
1775 public void process(RoutableMessage msg)
1776 {
1777 if (debugEnabled())
1778 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1779 getMonitorInstanceName() +
1780 " SH for remote server " + this.getMonitorInstanceName() +
1781 " processes received msg=" + msg);
1782 replicationServerDomain.process(msg, this);
1783 }
1784
1785 /**
1786 * Sends the provided ReplServerInfoMessage.
1787 *
1788 * @param info The ReplServerInfoMessage message to be sent.
1789 * @throws IOException When it occurs while sending the message,
1790 *
1791 */
1792 public void sendInfo(ReplServerInfoMessage info)
1793 throws IOException
1794 {
1795 if (debugEnabled())
1796 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1797 getMonitorInstanceName() +
1798 " SH for remote server " + this.getMonitorInstanceName() +
1799 " sends message=" + info);
1800
1801 session.publish(info);
1802 }
1803
1804 /**
1805 *
1806 * Sets the replication server from the message provided.
1807 *
1808 * @param infoMsg The information message.
1809 */
1810 public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
1811 {
1812 if (debugEnabled())
1813 TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
1814 getMonitorInstanceName() +
1815 " SH for remote server " + this.getMonitorInstanceName() +
1816 " sets replServerInfo " + "<" + infoMsg + ">");
1817
1818 List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
1819 generationId = infoMsg.getGenerationId();
1820
1821 synchronized(connectedServers)
1822 {
1823 // Removes the existing structures
1824 for (LightweightServerHandler lsh : connectedServers.values())
1825 {
1826 lsh.stopHandler();
1827 }
1828 connectedServers.clear();
1829
1830 // Creates the new structure according to the message received.
1831 for (String newConnectedServer : newRemoteLDAPservers)
1832 {
1833 LightweightServerHandler lsh
1834 = new LightweightServerHandler(newConnectedServer, this);
1835 lsh.startHandler();
1836 connectedServers.put(lsh.getServerId(), lsh);
1837 }
1838 }
1839 }
1840
1841 /**
1842 * When this handler is connected to a replication server, specifies if
1843 * a wanted server is connected to this replication server.
1844 *
1845 * @param wantedServer The server we want to know if it is connected
1846 * to the replication server represented by this handler.
1847 * @return boolean True is the wanted server is connected to the server
1848 * represented by this handler.
1849 */
1850 public boolean isRemoteLDAPServer(short wantedServer)
1851 {
1852 synchronized(connectedServers)
1853 {
1854 for (LightweightServerHandler server : connectedServers.values())
1855 {
1856 if (wantedServer == server.getServerId())
1857 {
1858 return true;
1859 }
1860 }
1861 return false;
1862 }
1863 }
1864
1865 /**
1866 * When the handler is connected to a replication server, specifies the
1867 * replication server has remote LDAP servers connected to it.
1868 *
1869 * @return boolean True is the replication server has remote LDAP servers
1870 * connected to it.
1871 */
1872 public boolean hasRemoteLDAPServers()
1873 {
1874 return !connectedServers.isEmpty();
1875 }
1876
1877 /**
1878 * Send an InitializeRequestMessage to the server connected through this
1879 * handler.
1880 *
1881 * @param msg The message to be processed
1882 * @throws IOException when raised by the underlying session
1883 */
1884 public void send(RoutableMessage msg) throws IOException
1885 {
1886 if (debugEnabled())
1887 TRACER.debugInfo("In " +
1888 replicationServerDomain.getReplicationServer().
1889 getMonitorInstanceName() +
1890 " SH for remote server " + this.getMonitorInstanceName() +
1891 " sends message=" + msg);
1892 session.publish(msg);
1893 }
1894
1895 /**
1896 * Send an ErrorMessage to the peer.
1897 *
1898 * @param errorMsg The message to be sent
1899 * @throws IOException when raised by the underlying session
1900 */
1901 public void sendError(ErrorMessage errorMsg) throws IOException
1902 {
1903 session.publish(errorMsg);
1904 }
1905
1906 /**
1907 * Process the reception of a WindowProbe message.
1908 *
1909 * @param windowProbeMsg The message to process.
1910 *
1911 * @throws IOException When the session becomes unavailable.
1912 */
1913 public void process(WindowProbe windowProbeMsg) throws IOException
1914 {
1915 if (rcvWindow > 0)
1916 {
1917 // The LDAP server believes that its window is closed
1918 // while it is not, this means that some problem happened in the
1919 // window exchange procedure !
1920 // lets update the LDAP server with out current window size and hope
1921 // that everything will work better in the futur.
1922 // TODO also log an error message.
1923 WindowMessage msg = new WindowMessage(rcvWindow);
1924 session.publish(msg);
1925 outAckCount++;
1926 }
1927 else
1928 {
1929 // Both the LDAP server and the replication server believes that the
1930 // window is closed. Lets check the flowcontrol in case we
1931 // can now resume operations and send a windowMessage if necessary.
1932 checkWindow();
1933 }
1934 }
1935
1936 /**
1937 * Returns the value of generationId for that handler.
1938 * @return The value of the generationId.
1939 */
1940 public long getGenerationId()
1941 {
1942 return generationId;
1943 }
1944
1945 /**
1946 * Resets the generationId for this domain.
1947 */
1948 public void warnBadGenerationId()
1949 {
1950 // Notify the peer that it is now invalid regarding the generationId
1951 // We are now waiting a startServer message from this server with
1952 // a valid generationId.
1953 try
1954 {
1955 Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
1956 ErrorMessage errorMsg =
1957 new ErrorMessage(serverId, replicationServerId, message);
1958 session.publish(errorMsg);
1959 }
1960 catch (Exception e)
1961 {
1962 // FIXME Log exception when sending reset error message
1963 }
1964 }
1965
1966 /**
1967 * Sends a message containing a generationId to a peer server.
1968 * The peer is expected to be a replication server.
1969 *
1970 * @param msg The GenerationIdMessage message to be sent.
1971 * @throws IOException When it occurs while sending the message,
1972 *
1973 */
1974 public void forwardGenerationIdToRS(ResetGenerationId msg)
1975 throws IOException
1976 {
1977 session.publish(msg);
1978 }
1979
1980 /**
1981 * Set a new generation ID.
1982 *
1983 * @param generationId The new generation ID
1984 *
1985 */
1986 public void setGenerationId(long generationId)
1987 {
1988 this.generationId = generationId;
1989 }
1990
1991 /**
1992 * Returns the Replication Server Domain to which belongs this server handler.
1993 *
1994 * @return The replication server domain.
1995 */
1996 public ReplicationServerDomain getDomain()
1997 {
1998 return this.replicationServerDomain;
1999 }
2000
2001 /**
2002 * Return a Set containing the servers known by this replicationServer.
2003 * @return a set containing the servers known by this replicationServer.
2004 */
2005 public Set<Short> getConnectedServerIds()
2006 {
2007 return connectedServers.keySet();
2008 }
2009 }