001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.protocol;
028
029 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
030 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
031
032 import java.io.IOException;
033 import java.io.InputStream;
034 import java.io.OutputStream;
035 import java.net.Socket;
036 import java.net.SocketException;
037 import java.util.zip.DataFormatException;
038
039 import org.opends.server.loggers.debug.DebugTracer;
040
041 import javax.net.ssl.SSLSocket;
042
043 /**
044 * This class implements a protocol session using TLS.
045 */
046 public class TLSSocketSession implements ProtocolSession
047 {
048 /**
049 * The tracer object for the debug logger.
050 */
051 private static final DebugTracer TRACER = getTracer();
052
053 private Socket plainSocket;
054 private SSLSocket secureSocket;
055 private InputStream input;
056 private OutputStream output;
057 private InputStream plainInput;
058 private OutputStream plainOutput;
059 byte[] rcvLengthBuf = new byte[8];
060
061 /**
062 * The time the last message published to this session.
063 */
064 private long lastPublishTime = 0;
065
066
067 /**
068 * The time the last message was received on this session.
069 */
070 private long lastReceiveTime = 0;
071
072
073 /**
074 * Creates a new TLSSocketSession.
075 *
076 * @param socket The regular Socket on which the SocketSession will be
077 * based.
078 * @param secureSocket The secure Socket on which the SocketSession will be
079 * based.
080 * @throws IOException When an IException happens on the socket.
081 */
082 public TLSSocketSession(Socket socket, SSLSocket secureSocket)
083 throws IOException
084 {
085 plainSocket = socket;
086 this.secureSocket = secureSocket;
087 plainInput = plainSocket.getInputStream();
088 plainOutput = plainSocket.getOutputStream();
089 input = secureSocket.getInputStream();
090 output = secureSocket.getOutputStream();
091 }
092
093
094 /**
095 * {@inheritDoc}
096 */
097 public void close() throws IOException
098 {
099 if (debugEnabled())
100 {
101 TRACER.debugInfo("Closing SocketSession." +
102 Thread.currentThread().getStackTrace());
103 }
104 if (plainSocket != null && !plainSocket.isClosed())
105 {
106 plainSocket.close();
107 }
108 if (secureSocket != null && !secureSocket.isClosed())
109 {
110 secureSocket.close();
111 }
112 }
113
114 /**
115 * {@inheritDoc}
116 */
117 public synchronized void publish(ReplicationMessage msg)
118 throws IOException
119 {
120 byte[] buffer = msg.getBytes();
121 String str = String.format("%08x", buffer.length);
122 byte[] sendLengthBuf = str.getBytes();
123
124 output.write(sendLengthBuf);
125 output.write(buffer);
126 output.flush();
127
128 lastPublishTime = System.currentTimeMillis();
129 }
130
131 /**
132 * {@inheritDoc}
133 */
134 public ReplicationMessage receive() throws IOException,
135 ClassNotFoundException, DataFormatException
136 {
137 /* Read the first 8 bytes containing the packet length */
138 int length = 0;
139
140 /* Let's start the stop-watch before waiting on read */
141 /* for the heartbeat check to be operationnal */
142 lastReceiveTime = System.currentTimeMillis();
143
144 while (length<8)
145 {
146 int read = input.read(rcvLengthBuf, length, 8-length);
147 if (read == -1)
148 {
149 lastReceiveTime=0;
150 throw new IOException("no more data");
151 }
152 else
153 {
154 length += read;
155 }
156 }
157
158 int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
159
160 try
161 {
162 length = 0;
163 byte[] buffer = new byte[totalLength];
164 while (length < totalLength)
165 {
166 length += input.read(buffer, length, totalLength - length);
167 }
168 /* We do not want the heartbeat to close the session when */
169 /* we are processing a message even a time consuming one. */
170 lastReceiveTime=0;
171 return ReplicationMessage.generateMsg(buffer);
172 }
173 catch (OutOfMemoryError e)
174 {
175 throw new IOException("Packet too large, can't allocate "
176 + totalLength + " bytes.");
177 }
178 }
179
180 /**
181 * {@inheritDoc}
182 */
183 public void stopEncryption()
184 {
185 input = plainInput;
186 output = plainOutput;
187 }
188
189 /**
190 * {@inheritDoc}
191 */
192 public boolean isEncrypted()
193 {
194 return !(input == plainInput);
195 }
196
197 /**
198 * {@inheritDoc}
199 */
200 public long getLastPublishTime()
201 {
202 return lastPublishTime;
203 }
204
205 /**
206 * {@inheritDoc}
207 */
208 public long getLastReceiveTime()
209 {
210 if (lastReceiveTime==0)
211 {
212 return System.currentTimeMillis();
213 }
214 return lastReceiveTime;
215 }
216
217 /**
218 * {@inheritDoc}
219 */
220 public String getRemoteAddress()
221 {
222 return plainSocket.getInetAddress().getHostAddress();
223 }
224
225 /**
226 * {@inheritDoc}
227 */
228 public void setSoTimeout(int timeout) throws SocketException
229 {
230 plainSocket.setSoTimeout(timeout);
231 }
232 }