001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.Message;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.messages.ReplicationMessages.*;
032 import static org.opends.server.loggers.debug.DebugLogger.*;
033 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
034
035
036 import java.io.IOException;
037
038 import org.opends.server.api.DirectoryThread;
039 import org.opends.server.replication.protocol.AckMessage;
040 import org.opends.server.replication.protocol.DoneMessage;
041 import org.opends.server.replication.protocol.EntryMessage;
042 import org.opends.server.replication.protocol.ErrorMessage;
043 import org.opends.server.replication.protocol.ResetGenerationId;
044 import org.opends.server.replication.protocol.InitializeRequestMessage;
045 import org.opends.server.replication.protocol.InitializeTargetMessage;
046 import org.opends.server.replication.protocol.ProtocolSession;
047 import org.opends.server.replication.protocol.ReplicationMessage;
048 import org.opends.server.replication.protocol.UpdateMessage;
049 import org.opends.server.replication.protocol.WindowMessage;
050 import org.opends.server.replication.protocol.WindowProbe;
051 import org.opends.server.replication.protocol.ReplServerInfoMessage;
052 import org.opends.server.replication.protocol.MonitorMessage;
053 import org.opends.server.replication.protocol.MonitorRequestMessage;
054 import org.opends.server.loggers.debug.DebugTracer;
055
056
057 /**
058 * This class implement the part of the replicationServer that is reading
059 * the connection from the LDAP servers to get all the updates that
060 * were done on this replica and forward them to other servers.
061 *
062 * A single thread is dedicated to this work.
063 * It waits in a blocking mode on the connection from the LDAP server
064 * and upon receiving an update puts in into the replicationServer cache
065 * from where the other servers will grab it.
066 */
067 public class ServerReader extends DirectoryThread
068 {
069 /**
070 * The tracer object for the debug logger.
071 */
072 private static final DebugTracer TRACER = getTracer();
073
074 private short serverId;
075 private ProtocolSession session;
076 private ServerHandler handler;
077 private ReplicationServerDomain replicationServerDomain;
078
079 /**
080 * Constructor for the LDAP server reader part of the replicationServer.
081 *
082 * @param session The ProtocolSession from which to read the data.
083 * @param serverId The server ID of the server from which we read messages.
084 * @param handler The server handler for this server reader.
085 * @param replicationServerDomain The ReplicationServerDomain for this server
086 * reader.
087 */
088 public ServerReader(ProtocolSession session, short serverId,
089 ServerHandler handler,
090 ReplicationServerDomain replicationServerDomain)
091 {
092 super(handler.toString() + " reader");
093 this.session = session;
094 this.serverId = serverId;
095 this.handler = handler;
096 this.replicationServerDomain = replicationServerDomain;
097 }
098
099 /**
100 * Create a loop that reads changes and hands them off to be processed.
101 */
102 public void run()
103 {
104 if (debugEnabled())
105 {
106 TRACER.debugInfo(
107 "In RS " + replicationServerDomain.getReplicationServer().
108 getMonitorInstanceName() +
109 (handler.isReplicationServer()?" RS ":" LS")+
110 " reader starting for serverId=" + serverId);
111 }
112 /*
113 * wait on input stream
114 * grab all incoming messages and publish them to the
115 * replicationServerDomain
116 */
117 try
118 {
119 while (true)
120 {
121 ReplicationMessage msg = session.receive();
122
123 /*
124 if (debugEnabled())
125 {
126 TRACER.debugInfo(
127 "In RS " + replicationServerDomain.getReplicationServer().
128 getMonitorInstanceName() +
129 (handler.isReplicationServer()?" From RS ":" From LS")+
130 " with serverId=" + serverId + " receives " + msg);
131 }
132 */
133 if (msg instanceof AckMessage)
134 {
135 AckMessage ack = (AckMessage) msg;
136 handler.checkWindow();
137 replicationServerDomain.ack(ack, serverId);
138 }
139 else if (msg instanceof UpdateMessage)
140 {
141 // Ignore update received from a replica with
142 // a bad generation ID
143 long referenceGenerationId =
144 replicationServerDomain.getGenerationId();
145 if ((referenceGenerationId>0) &&
146 (referenceGenerationId != handler.getGenerationId()))
147 {
148 logError(ERR_IGNORING_UPDATE_FROM.get(
149 msg.toString(),
150 handler.getMonitorInstanceName()));
151 }
152 else
153 {
154 UpdateMessage update = (UpdateMessage) msg;
155 handler.decAndCheckWindow();
156 replicationServerDomain.put(update, handler);
157 }
158 }
159 else if (msg instanceof WindowMessage)
160 {
161 WindowMessage windowMsg = (WindowMessage) msg;
162 handler.updateWindow(windowMsg);
163 }
164 else if (msg instanceof InitializeRequestMessage)
165 {
166 InitializeRequestMessage initializeMsg =
167 (InitializeRequestMessage) msg;
168 handler.process(initializeMsg);
169 }
170 else if (msg instanceof InitializeTargetMessage)
171 {
172 InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
173 handler.process(initializeMsg);
174 }
175 else if (msg instanceof EntryMessage)
176 {
177 EntryMessage entryMsg = (EntryMessage) msg;
178 handler.process(entryMsg);
179 }
180 else if (msg instanceof DoneMessage)
181 {
182 DoneMessage doneMsg = (DoneMessage) msg;
183 handler.process(doneMsg);
184 }
185 else if (msg instanceof ErrorMessage)
186 {
187 ErrorMessage errorMsg = (ErrorMessage) msg;
188 handler.process(errorMsg);
189 }
190 else if (msg instanceof ResetGenerationId)
191 {
192 ResetGenerationId genIdMsg = (ResetGenerationId) msg;
193 replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
194 }
195 else if (msg instanceof WindowProbe)
196 {
197 WindowProbe windowProbeMsg = (WindowProbe) msg;
198 handler.process(windowProbeMsg);
199 }
200 else if (msg instanceof ReplServerInfoMessage)
201 {
202 ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
203 handler.receiveReplServerInfo(infoMsg);
204 replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
205 }
206 else if (msg instanceof MonitorRequestMessage)
207 {
208 MonitorRequestMessage replServerMonitorRequestMsg =
209 (MonitorRequestMessage) msg;
210 handler.process(replServerMonitorRequestMsg);
211 }
212 else if (msg instanceof MonitorMessage)
213 {
214 MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
215 handler.process(replServerMonitorMsg);
216 }
217 else if (msg == null)
218 {
219 /*
220 * The remote server has sent an unknown message,
221 * close the conenction.
222 */
223 Message message = NOTE_READER_NULL_MSG.get(handler.toString());
224 logError(message);
225 return;
226 }
227 }
228 } catch (IOException e)
229 {
230 /*
231 * The connection has been broken
232 * Log a message and exit from this loop
233 * So that this handler is stopped.
234 */
235 if (debugEnabled())
236 TRACER.debugInfo(
237 "In RS " + replicationServerDomain.getReplicationServer().
238 getMonitorInstanceName() +
239 " reader IO EXCEPTION for serverID=" + serverId
240 + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
241 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
242 logError(message);
243 } catch (ClassNotFoundException e)
244 {
245 if (debugEnabled())
246 TRACER.debugInfo(
247 "In RS <" + replicationServerDomain.getReplicationServer().
248 getMonitorInstanceName() +
249 " reader CNF EXCEPTION serverID=" + serverId
250 + stackTraceToSingleLineString(e));
251 /*
252 * The remote server has sent an unknown message,
253 * close the connection.
254 */
255 Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
256 logError(message);
257 } catch (Exception e)
258 {
259 if (debugEnabled())
260 TRACER.debugInfo(
261 "In RS <" + replicationServerDomain.getReplicationServer().
262 getMonitorInstanceName() +
263 " server reader EXCEPTION serverID=" + serverId
264 + stackTraceToSingleLineString(e));
265 /*
266 * The remote server has sent an unknown message,
267 * close the connection.
268 */
269 Message message = NOTE_READER_EXCEPTION.get(handler.toString());
270 logError(message);
271 }
272 finally
273 {
274 /*
275 * The thread only exit the loop above is some error condition
276 * happen.
277 * Attempt to close the socket and stop the server handler.
278 */
279 if (debugEnabled())
280 TRACER.debugInfo(
281 "In RS " + replicationServerDomain.getReplicationServer().
282 getMonitorInstanceName() +
283 " server reader for serverID=" + serverId +
284 " is closing the session");
285 try
286 {
287 session.close();
288 } catch (IOException e)
289 {
290 // ignore
291 }
292 replicationServerDomain.stopServer(handler);
293 }
294 if (debugEnabled())
295 TRACER.debugInfo(
296 "In RS " + replicationServerDomain.getReplicationServer().
297 getMonitorInstanceName() +
298 (handler.isReplicationServer()?" RS":" LDAP") +
299 " server reader stopped for serverID=" + serverId);
300 }
301 }