001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.Message;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
034 import static org.opends.messages.ReplicationMessages.*;
035
036 import java.io.IOException;
037 import java.net.SocketException;
038 import java.util.NoSuchElementException;
039
040 import org.opends.server.api.DirectoryThread;
041 import org.opends.server.loggers.debug.DebugTracer;
042 import org.opends.server.replication.protocol.ProtocolSession;
043 import org.opends.server.replication.protocol.UpdateMessage;
044
045
046 /**
047 * This class defines a server writer, which is used to send changes to a
048 * directory server.
049 */
050 public class ServerWriter extends DirectoryThread
051 {
052 /**
053 * The tracer object for the debug logger.
054 */
055 private static final DebugTracer TRACER = getTracer();
056
057 private ProtocolSession session;
058 private ServerHandler handler;
059 private ReplicationServerDomain replicationServerDomain;
060 private short serverId;
061
062 /**
063 * Create a ServerWriter.
064 * Then ServerWriter then waits on the ServerHandler for new updates
065 * and forward them to the server
066 *
067 * @param session the ProtocolSession that will be used to send updates.
068 * @param serverId the Identifier of the server.
069 * @param handler handler for which the ServerWriter is created.
070 * @param replicationServerDomain The ReplicationServerDomain of this
071 * ServerWriter.
072 */
073 public ServerWriter(ProtocolSession session, short serverId,
074 ServerHandler handler,
075 ReplicationServerDomain replicationServerDomain)
076 {
077 super(handler.toString() + " writer");
078
079 this.serverId = serverId;
080 this.session = session;
081 this.handler = handler;
082 this.replicationServerDomain = replicationServerDomain;
083 }
084
085 /**
086 * Run method for the ServerWriter.
087 * Loops waiting for changes from the ReplicationServerDomain and forward them
088 * to the other servers
089 */
090 public void run()
091 {
092 if (debugEnabled())
093 {
094 if (handler.isReplicationServer())
095 {
096 TRACER.debugInfo("Replication server writer starting " + serverId);
097 }
098 else
099 {
100 TRACER.debugInfo("LDAP server writer starting " + serverId);
101 }
102 }
103 try
104 {
105 while (true)
106 {
107 UpdateMessage update = replicationServerDomain.take(this.handler);
108 if (update == null)
109 return; /* this connection is closing */
110
111 // Ignore update to be sent to a replica with a bad generation ID
112 long referenceGenerationId = replicationServerDomain.getGenerationId();
113 if ((referenceGenerationId != handler.getGenerationId())
114 || (referenceGenerationId == -1)
115 || (handler.getGenerationId() == -1))
116 {
117 logError(ERR_IGNORING_UPDATE_TO.get(
118 update.getDn(),
119 this.handler.getMonitorInstanceName()));
120 continue;
121 }
122
123 /*
124 if (debugEnabled())
125 {
126 TRACER.debugInfo(
127 "In " + replicationServerDomain.getReplicationServer().
128 getMonitorInstanceName() +
129 ", writer to " + this.handler.getMonitorInstanceName() +
130 " publishes msg=[" + update.toString() + "]"+
131 " refgenId=" + referenceGenerationId +
132 " server=" + handler.getServerId() +
133 " generationId=" + handler.getGenerationId());
134 }
135 */
136 session.publish(update);
137 }
138 }
139 catch (NoSuchElementException e)
140 {
141 /*
142 * The remote host has disconnected and this particular Tree is going to
143 * be removed, just ignore the exception and let the thread die as well
144 */
145 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
146 logError(message);
147 }
148 catch (SocketException e)
149 {
150 /*
151 * The remote host has disconnected and this particular Tree is going to
152 * be removed, just ignore the exception and let the thread die as well
153 */
154 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
155 logError(message);
156 }
157 catch (Exception e)
158 {
159 /*
160 * An unexpected error happened.
161 * Log an error and close the connection.
162 */
163 Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
164 " " + stackTraceToSingleLineString(e));
165 logError(message);
166 }
167 finally {
168 try
169 {
170 session.close();
171 } catch (IOException e)
172 {
173 // Can't do much more : ignore
174 }
175 replicationServerDomain.stopServer(handler);
176
177 if (debugEnabled())
178 {
179 if (handler.isReplicationServer())
180 {
181 TRACER.debugInfo("Replication server writer stopping " + serverId);
182 }
183 else
184 {
185 TRACER.debugInfo("LDAP server writer stopping " + serverId);
186 }
187 }
188 }
189 }
190 }