001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.protocols.internal;
028
029
030
031 import java.io.InputStream;
032 import java.io.IOException;
033 import java.nio.ByteBuffer;
034 import java.util.concurrent.ArrayBlockingQueue;
035
036 import org.opends.server.protocols.ldap.LDAPMessage;
037
038
039
040 /**
041 * This class provides an implementation of a
042 * {@code java.io.InputStream} that can be used to facilitate internal
043 * communication with the Directory Server. On the backend, this
044 * input stream will be populated by ASN.1 elements encoded from LDAP
045 * messages created from internal operation responses.
046 */
047 @org.opends.server.types.PublicAPI(
048 stability=org.opends.server.types.StabilityLevel.UNCOMMITTED,
049 mayInstantiate=false,
050 mayExtend=false,
051 mayInvoke=true)
052 public final class InternalLDAPInputStream
053 extends InputStream
054 {
055 // The queue of LDAP messages providing the data to be made
056 // available to the client.
057 private ArrayBlockingQueue<LDAPMessage> messageQueue;
058
059 // Indicates whether this stream has been closed.
060 private boolean closed;
061
062 // The byte buffer with partial data to be written to the client.
063 private ByteBuffer partialMessageBuffer;
064
065 // The internal LDAP socket serviced by this input stream.
066 private InternalLDAPSocket socket;
067
068
069
070 /**
071 * Creates a new internal LDAP input stream that will service the
072 * provided internal LDAP socket.
073 *
074 * @param socket The internal LDAP socket serviced by this
075 * internal LDAP input stream.
076 */
077 public InternalLDAPInputStream(InternalLDAPSocket socket)
078 {
079 this.socket = socket;
080
081 messageQueue = new ArrayBlockingQueue<LDAPMessage>(10);
082 partialMessageBuffer = null;
083 closed = false;
084 }
085
086
087
088 /**
089 * Adds the provided LDAP message to the set of messages to be
090 * returned to the client. Note that this may block if there is
091 * already a significant backlog of messages to be returned.
092 *
093 * @param message The message to add to the set of messages to be
094 * returned to the client.
095 */
096 @org.opends.server.types.PublicAPI(
097 stability=org.opends.server.types.StabilityLevel.PRIVATE,
098 mayInstantiate=false,
099 mayExtend=false,
100 mayInvoke=false)
101 void addLDAPMessage(LDAPMessage message)
102 {
103 // If the stream is closed, then simply drop the message.
104 if (closed)
105 {
106 return;
107 }
108
109 try
110 {
111 messageQueue.put(message);
112 return;
113 }
114 catch (Exception e)
115 {
116 // This shouldn't happen, but if it does then try three more
117 // times before giving up and dropping the message.
118 for (int i=0; i < 3; i++)
119 {
120 try
121 {
122 messageQueue.put(message);
123 break;
124 } catch (Exception e2) {}
125 }
126
127 return;
128 }
129 }
130
131
132
133 /**
134 * Retrieves the number of bytes that can be read (or skipped over)
135 * from this input stream without blocking.
136 *
137 * @return The number of bytes that can be read (or skipped over)
138 * from this input stream wihtout blocking.
139 */
140 @Override()
141 public synchronized int available()
142 {
143 if (partialMessageBuffer == null)
144 {
145 LDAPMessage message = messageQueue.poll();
146 if ((message == null) || (message instanceof NullLDAPMessage))
147 {
148 if (message instanceof NullLDAPMessage)
149 {
150 closed = true;
151 }
152
153 return 0;
154 }
155 else
156 {
157 partialMessageBuffer =
158 ByteBuffer.wrap(message.encode().encode());
159 return partialMessageBuffer.remaining();
160 }
161 }
162 else
163 {
164 return partialMessageBuffer.remaining();
165 }
166 }
167
168
169
170 /**
171 * Closes this input stream. This will add a special marker
172 * element to the message queue indicating that the end of the
173 * stream has been reached. If the queue is full, thenit will be
174 * cleared before adding the marker element.
175 */
176 @Override()
177 public void close()
178 {
179 socket.close();
180 }
181
182
183
184 /**
185 * Closes this input stream through an internal mechanism that will
186 * not cause an infinite recursion loop by trying to also close the
187 * input stream.
188 */
189 @org.opends.server.types.PublicAPI(
190 stability=org.opends.server.types.StabilityLevel.PRIVATE,
191 mayInstantiate=false,
192 mayExtend=false,
193 mayInvoke=false)
194 void closeInternal()
195 {
196 if (closed)
197 {
198 return;
199 }
200
201 closed = true;
202 NullLDAPMessage nullMessage = new NullLDAPMessage();
203
204 while (! messageQueue.offer(nullMessage))
205 {
206 messageQueue.clear();
207 }
208 }
209
210
211
212 /**
213 * Marks the current position in the input stream. This will not
214 * have any effect, as this input stream inplementation does not
215 * support marking.
216 *
217 * @param readLimit The maximum limit of bytes that can be read
218 * before the mark position becomes invalid.
219 */
220 @Override()
221 public void mark(int readLimit)
222 {
223 // No implementation is required.
224 }
225
226
227
228 /**
229 * Indicates whether this input stream inplementation supports the
230 * use of the {@code mark} and {@code reset} methods. This
231 * implementation does not support that functionality.
232 *
233 * @return {@code false} because this implementation does not
234 * support the use of the {@code mark} and {@code reset}
235 * methods.
236 */
237 @Override()
238 public boolean markSupported()
239 {
240 return false;
241 }
242
243
244
245 /**
246 * Reads the next byte of data from the input stream, blocking if
247 * necessary until there is data available.
248 *
249 * @return The next byte of data read from the input stream, or -1
250 * if the end of the input stream has been reached.
251 *
252 * @throws IOException If a problem occurs while trying to read
253 * data from the stream.
254 */
255 @Override()
256 public synchronized int read()
257 throws IOException
258 {
259 if (partialMessageBuffer != null)
260 {
261 if (partialMessageBuffer.remaining() > 0)
262 {
263 int i = (0xFF & partialMessageBuffer.get());
264 if (partialMessageBuffer.remaining() == 0)
265 {
266 partialMessageBuffer = null;
267 }
268
269 return i;
270 }
271 else
272 {
273 partialMessageBuffer = null;
274 }
275 }
276
277 if (closed)
278 {
279 return -1;
280 }
281
282 try
283 {
284 LDAPMessage message = messageQueue.take();
285 if (message instanceof NullLDAPMessage)
286 {
287 messageQueue.clear();
288 closed = true;
289 return -1;
290 }
291
292 partialMessageBuffer =
293 ByteBuffer.wrap(message.encode().encode());
294 return (0xFF & partialMessageBuffer.get());
295 }
296 catch (Exception e)
297 {
298 throw new IOException(e.getMessage());
299 }
300 }
301
302
303
304 /**
305 * Reads some number of bytes from the input stream, blocking if
306 * necessary until there is data available, and adds them to the
307 * provided array starting at position 0.
308 *
309 * @param b The array to which the data is to be written.
310 *
311 * @return The number of bytes actually written into the
312 * provided array, or -1 if the end of the stream has been
313 * reached.
314 *
315 * @throws IOException If a problem occurs while trying to read
316 * data from the stream.
317 */
318 @Override()
319 public int read(byte[] b)
320 throws IOException
321 {
322 return read(b, 0, b.length);
323 }
324
325
326
327 /**
328 * Reads some number of bytes from the input stream, blocking if
329 * necessary until there is data available, and adds them to the
330 * provided array starting at the specified position.
331 *
332 * @param b The array to which the data is to be written.
333 * @param off The offset in the array at which to start writing
334 * data.
335 * @param len The maximum number of bytes that may be added to the
336 * array.
337 *
338 * @return The number of bytes actually written into the
339 * provided array, or -1 if the end of the stream has been
340 * reached.
341 *
342 * @throws IOException If a problem occurs while trying to read
343 * data from the stream.
344 */
345 @Override()
346 public synchronized int read(byte[] b, int off, int len)
347 throws IOException
348 {
349 if (partialMessageBuffer != null)
350 {
351 int remaining = partialMessageBuffer.remaining();
352 if (remaining > 0)
353 {
354 if (remaining <= len)
355 {
356 // We can fit all the remaining data in the provided array,
357 // so that's all we'll try to put in it.
358 partialMessageBuffer.get(b, off, remaining);
359 partialMessageBuffer = null;
360 return remaining;
361 }
362 else
363 {
364 // The array is too small to hold the rest of the data, so
365 // only take as much as we can.
366 partialMessageBuffer.get(b, off, len);
367 return len;
368 }
369 }
370 else
371 {
372 partialMessageBuffer = null;
373 }
374 }
375
376 if (closed)
377 {
378 return -1;
379 }
380
381 try
382 {
383 LDAPMessage message = messageQueue.take();
384 if (message instanceof NullLDAPMessage)
385 {
386 messageQueue.clear();
387 closed = true;
388 return -1;
389 }
390
391 byte[] encodedMessage = message.encode().encode();
392 if (encodedMessage.length <= len)
393 {
394 // We can fit the entire message in the array.
395 System.arraycopy(encodedMessage, 0, b, off,
396 encodedMessage.length);
397 return encodedMessage.length;
398 }
399 else
400 {
401 // We can only fit part of the message in the array,
402 // so we need to save the rest for later.
403 System.arraycopy(encodedMessage, 0, b, off, len);
404 partialMessageBuffer = ByteBuffer.wrap(encodedMessage);
405 partialMessageBuffer.position(len);
406 return len;
407 }
408 }
409 catch (Exception e)
410 {
411 throw new IOException(e.getMessage());
412 }
413 }
414
415
416
417 /**
418 * Repositions this stream to the position at the time that the
419 * {@code mark} method was called on this stream. This will not
420 * have any effect, as this input stream inplementation does not
421 * support marking.
422 */
423 @Override()
424 public void reset()
425 {
426 // No implementation is required.
427 }
428
429
430
431 /**
432 * Skips over and discards up to the specified number of bytes of
433 * data from this input stream. This implementation will always
434 * skip the requested number of bytes unless the end of the stream
435 * is reached.
436 *
437 * @param n The maximum number of bytes to skip.
438 *
439 * @return The number of bytes actually skipped.
440 *
441 * @throws IOException If a problem occurs while trying to read
442 * data from the input stream.
443 */
444 @Override()
445 public synchronized long skip(long n)
446 throws IOException
447 {
448 byte[] b;
449 if (n > 8192)
450 {
451 b = new byte[8192];
452 }
453 else
454 {
455 b = new byte[(int) n];
456 }
457
458 long totalBytesRead = 0L;
459 while (totalBytesRead < n)
460 {
461 int maxLen = (int) Math.min((n - totalBytesRead), b.length);
462
463 int bytesRead = read(b, 0, maxLen);
464 if (bytesRead < 0)
465 {
466 if (totalBytesRead > 0)
467 {
468 return totalBytesRead;
469 }
470 else
471 {
472 return bytesRead;
473 }
474 }
475 else
476 {
477 totalBytesRead += bytesRead;
478 }
479 }
480
481 return totalBytesRead;
482 }
483
484
485
486 /**
487 * Retrieves a string representation of this internal LDAP socket.
488 *
489 * @return A string representation of this internal LDAP socket.
490 */
491 @Override()
492 public String toString()
493 {
494 return "InternalLDAPInputStream";
495 }
496 }
497