001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.protocol;
028
029 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
030 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
031
032 import java.io.IOException;
033 import java.io.InputStream;
034 import java.io.OutputStream;
035 import java.net.Socket;
036 import java.net.SocketException;
037 import java.util.zip.DataFormatException;
038
039 import org.opends.server.loggers.debug.DebugTracer;
040 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
041
042 /**
043 * This class Implement a protocol session using a basic socket and relying on
044 * the innate encoding/decoding capabilities of the ReplicationMessage
045 * by using the getBytes() and generateMsg() methods of those classes.
046 */
047 public class SocketSession implements ProtocolSession
048 {
049 /**
050 * The tracer object for the debug logger.
051 */
052 private static final DebugTracer TRACER = getTracer();
053
054 private Socket socket;
055 private InputStream input;
056 private OutputStream output;
057 byte[] rcvLengthBuf = new byte[8];
058
059 /**
060 * The time the last message published to this session.
061 */
062 private long lastPublishTime = 0;
063
064
065 /**
066 * The time the last message was received on this session.
067 */
068 private long lastReceiveTime = 0;
069
070
071 /**
072 * Creates a new SocketSession based on the provided socket.
073 *
074 * @param socket The Socket on which the SocketSession will be based.
075 * @throws IOException When an IException happens on the socket.
076 */
077 public SocketSession(Socket socket) throws IOException
078 {
079 this.socket = socket;
080 /*
081 * Use a window instead of the TCP flow control.
082 * Therefore set a very large value for send and receive buffer sizes.
083 */
084 input = socket.getInputStream();
085 output = socket.getOutputStream();
086 }
087
088 /**
089 * {@inheritDoc}
090 */
091 public void close() throws IOException
092 {
093 if (debugEnabled())
094 {
095 TRACER.debugInfo("Closing SocketSession."
096 + stackTraceToSingleLineString(new Exception()));
097 }
098 socket.close();
099 }
100
101 /**
102 * {@inheritDoc}
103 */
104 public synchronized void publish(ReplicationMessage msg)
105 throws IOException
106 {
107 byte[] buffer = msg.getBytes();
108 String str = String.format("%08x", buffer.length);
109
110 if (debugEnabled())
111 {
112 TRACER.debugInfo("SocketSession publish <" + str + ">");
113 }
114
115 byte[] sendLengthBuf = str.getBytes();
116
117 output.write(sendLengthBuf);
118 output.write(buffer);
119 output.flush();
120
121 lastPublishTime = System.currentTimeMillis();
122 }
123
124 /**
125 * {@inheritDoc}
126 */
127 public ReplicationMessage receive() throws IOException,
128 ClassNotFoundException, DataFormatException
129 {
130 /* Read the first 8 bytes containing the packet length */
131 int length = 0;
132
133 /* Let's start the stop-watch before waiting on read */
134 /* for the heartbeat check to be operationnal */
135 lastReceiveTime = System.currentTimeMillis();
136
137 while (length<8)
138 {
139 int read = input.read(rcvLengthBuf, length, 8-length);
140 if (read == -1)
141 {
142 lastReceiveTime=0;
143 throw new IOException("no more data");
144 }
145 else
146 {
147 length += read;
148 }
149 }
150
151 int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
152
153 try
154 {
155 length = 0;
156 byte[] buffer = new byte[totalLength];
157 while (length < totalLength)
158 {
159 length += input.read(buffer, length, totalLength - length);
160 }
161 /* We do not want the heartbeat to close the session when */
162 /* we are processing a message even a time consuming one. */
163 lastReceiveTime=0;
164 return ReplicationMessage.generateMsg(buffer);
165 }
166 catch (OutOfMemoryError e)
167 {
168 throw new IOException("Packet too large, can't allocate "
169 + totalLength + " bytes.");
170 }
171 }
172
173 /**
174 * {@inheritDoc}
175 */
176 public void stopEncryption()
177 {
178 // There is no security layer.
179 }
180
181 /**
182 * {@inheritDoc}
183 */
184 public boolean isEncrypted()
185 {
186 return false;
187 }
188
189 /**
190 * {@inheritDoc}
191 */
192 public long getLastPublishTime()
193 {
194 return lastPublishTime;
195 }
196
197 /**
198 * {@inheritDoc}
199 */
200 public long getLastReceiveTime()
201 {
202 if (lastReceiveTime==0)
203 {
204 return System.currentTimeMillis();
205 }
206 return lastReceiveTime;
207 }
208
209 /**
210 * {@inheritDoc}
211 */
212 public String getRemoteAddress()
213 {
214 return socket.getInetAddress().getHostAddress();
215 }
216
217 /**
218 * {@inheritDoc}
219 */
220 public void setSoTimeout(int timeout) throws SocketException
221 {
222 socket.setSoTimeout(timeout);
223 }
224 }