001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027
028 package org.opends.server.replication.protocol;
029
030 import org.opends.server.api.DirectoryThread;
031 import static org.opends.server.loggers.debug.DebugLogger.*;
032
033 import org.opends.server.loggers.debug.DebugTracer;
034
035 import java.io.IOException;
036
037 /**
038 * This thread publishes a heartbeat message on a given protocol session at
039 * regular intervals when there are no other replication messages being
040 * published.
041 */
042 public class HeartbeatThread extends DirectoryThread
043 {
044 /**
045 * The tracer object for the debug logger.
046 */
047 private static final DebugTracer TRACER = getTracer();
048
049
050 /**
051 * For test purposes only to simulate loss of heartbeats.
052 */
053 static private boolean heartbeatsDisabled = false;
054
055 /**
056 * The session on which heartbeats are to be sent.
057 */
058 private ProtocolSession session;
059
060
061 /**
062 * The time in milliseconds between heartbeats.
063 */
064 private long heartbeatInterval;
065
066
067 /**
068 * Set this to stop the thread.
069 */
070 private Boolean shutdown = false;
071 private final Object shutdown_lock = new Object();
072
073
074 /**
075 * Create a heartbeat thread.
076 * @param threadName The name of the heartbeat thread.
077 * @param session The session on which heartbeats are to be sent.
078 * @param heartbeatInterval The desired interval between heartbeats in
079 * milliseconds.
080 */
081 public HeartbeatThread(String threadName, ProtocolSession session,
082 long heartbeatInterval)
083 {
084 super(threadName);
085 this.session = session;
086 this.heartbeatInterval = heartbeatInterval;
087 }
088
089 /**
090 * {@inheritDoc}
091 */
092 @Override
093 public void run()
094 {
095 try
096 {
097 if (debugEnabled())
098 {
099 TRACER.debugInfo("Heartbeat thread is starting, interval is %d",
100 heartbeatInterval);
101 }
102 HeartbeatMessage heartbeatMessage = new HeartbeatMessage();
103
104 while (!shutdown)
105 {
106 long now = System.currentTimeMillis();
107 if (debugEnabled())
108 {
109 TRACER.debugVerbose("Heartbeat thread awoke at %d, last message " +
110 "was sent at %d", now, session.getLastPublishTime());
111 }
112
113 if (now > session.getLastPublishTime() + heartbeatInterval)
114 {
115 if (!heartbeatsDisabled)
116 {
117 if (debugEnabled())
118 {
119 TRACER.debugVerbose("Heartbeat sent at %d", now);
120 }
121 session.publish(heartbeatMessage);
122 }
123 }
124
125 try
126 {
127 long sleepTime = session.getLastPublishTime() +
128 heartbeatInterval - now;
129 if (sleepTime <= 0)
130 {
131 sleepTime = heartbeatInterval;
132 }
133
134 if (debugEnabled())
135 {
136 TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
137 }
138
139 synchronized (shutdown_lock)
140 {
141 if (!shutdown)
142 {
143 shutdown_lock.wait(sleepTime);
144 }
145 }
146 }
147 catch (InterruptedException e)
148 {
149 // Keep looping.
150 }
151 }
152 }
153 catch (IOException e)
154 {
155 if (debugEnabled())
156 {
157 TRACER.debugInfo("Heartbeat thread could not send a heartbeat.");
158 }
159 // This will be caught in another thread.
160 }
161 finally
162 {
163 if (debugEnabled())
164 {
165 TRACER.debugInfo("Heartbeat thread is exiting.");
166 }
167 }
168 }
169
170
171 /**
172 * Call this method to stop the thread.
173 * This method is blocking until the thread has stopped.
174 */
175 public void shutdown()
176 {
177 synchronized (shutdown_lock)
178 {
179 shutdown = true;
180 shutdown_lock.notifyAll();
181 if (debugEnabled())
182 {
183 TRACER.debugInfo("Going to notify Heartbeat thread.");
184 }
185 }
186 if (debugEnabled())
187 {
188 TRACER.debugInfo("Returning from Heartbeat shutdown.");
189 }
190 }
191
192
193 /**
194 * For testing purposes only to simulate loss of heartbeats.
195 * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
196 */
197 public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
198 {
199 HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
200 }
201 }