001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.tools.makeldif;
028
029
030
031 import java.io.ByteArrayOutputStream;
032 import java.io.InputStream;
033 import java.io.IOException;
034 import java.nio.ByteBuffer;
035 import java.util.concurrent.LinkedBlockingQueue;
036 import java.util.concurrent.TimeUnit;
037
038 import org.opends.server.types.Entry;
039 import org.opends.server.types.LDIFExportConfig;
040 import org.opends.server.util.LDIFException;
041 import org.opends.server.util.LDIFWriter;
042
043
044
045 /**
046 * This class creates an input stream that can be used to read entries generated
047 * by MakeLDIF as if they were being read from another source like a file. It
048 * has a fixed-size queue that dictates how many entries may be held in memory
049 * at any given time.
050 */
051 public class MakeLDIFInputStream
052 extends InputStream
053 implements EntryWriter
054 {
055 // Indicates whether all of the entries have been generated.
056 private boolean allGenerated;
057
058 // Indicates whether this input stream has been closed.
059 private boolean closed;
060
061 // The byte array output stream that will be used to convert entries to byte
062 // arrays with their LDIF representations.
063 private ByteArrayOutputStream entryOutputStream;
064
065 // The byte array that will hold the LDIF representation of the next entry to
066 // be read.
067 private ByteBuffer entryBytes;
068
069 // The IOException that should be thrown the next time a read is requested.
070 private IOException ioException;
071
072 // The LDIF writer that will be used to write the entries to LDIF.
073 private LDIFWriter ldifWriter;
074
075 // The queue used to hold generated entries until they can be read.
076 private LinkedBlockingQueue<Entry> entryQueue;
077
078 // The background thread being used to actually generate the entries.
079 private MakeLDIFInputStreamThread generatorThread;
080
081 // The template file to use to generate the entries.
082 private TemplateFile templateFile;
083
084
085
086 /**
087 * Creates a new MakeLDIF input stream that will generate entries based on the
088 * provided template file.
089 *
090 * @param templateFile The template file to use to generate the entries.
091 */
092 public MakeLDIFInputStream(TemplateFile templateFile)
093 {
094 this.templateFile = templateFile;
095
096 allGenerated = false;
097 closed = false;
098 entryQueue = new LinkedBlockingQueue<Entry>(10);
099 ioException = null;
100 entryBytes = null;
101
102 entryOutputStream = new ByteArrayOutputStream(8192);
103 LDIFExportConfig exportConfig = new LDIFExportConfig(entryOutputStream);
104
105 try
106 {
107 ldifWriter = new LDIFWriter(exportConfig);
108 }
109 catch (IOException ioe)
110 {
111 // This should never happen.
112 ioException = ioe;
113 }
114
115 generatorThread = new MakeLDIFInputStreamThread(this, templateFile);
116 generatorThread.start();
117 }
118
119
120
121 /**
122 * Closes this input stream so that no more data may be read from it.
123 */
124 public void close()
125 {
126 closed = true;
127 ioException = null;
128 }
129
130
131
132 /**
133 * Reads a single byte of data from this input stream.
134 *
135 * @return The byte read from the input stream, or -1 if the end of the
136 * stream has been reached.
137 *
138 * @throws IOException If a problem has occurred while generating data for
139 * use by this input stream.
140 */
141 public int read()
142 throws IOException
143 {
144 if (closed)
145 {
146 return -1;
147 }
148 else if (ioException != null)
149 {
150 throw ioException;
151 }
152
153 if ((entryBytes == null) || (! entryBytes.hasRemaining()))
154 {
155 if (! getNextEntry())
156 {
157 closed = true;
158 return -1;
159 }
160 }
161
162 return (0xFF & entryBytes.get());
163 }
164
165
166
167 /**
168 * Reads data from this input stream.
169 *
170 * @param b The array into which the data should be read.
171 * @param off The position in the array at which point the data read may be
172 * placed.
173 * @param len The maximum number of bytes that may be read into the
174 * provided array.
175 *
176 * @return The number of bytes read from the input stream into the provided
177 * array, or -1 if the end of the stream has been reached.
178 *
179 * @throws IOException If a problem has occurred while generating data for
180 * use by this input stream.
181 */
182 public int read(byte[] b, int off, int len)
183 throws IOException
184 {
185 if (closed)
186 {
187 return -1;
188 }
189 else if (ioException != null)
190 {
191 throw ioException;
192 }
193
194 if ((entryBytes == null) || (! entryBytes.hasRemaining()))
195 {
196 if (! getNextEntry())
197 {
198 closed = true;
199 return -1;
200 }
201 }
202
203 int bytesRead = Math.min(len, entryBytes.remaining());
204 entryBytes.get(b, off, bytesRead);
205 return bytesRead;
206 }
207
208
209
210 /**
211 * {@inheritDoc}
212 */
213 public boolean writeEntry(Entry entry)
214 throws IOException, MakeLDIFException
215 {
216 while (! closed)
217 {
218 try
219 {
220 if (entryQueue.offer(entry, 500, TimeUnit.MILLISECONDS))
221 {
222 return true;
223 }
224 } catch (InterruptedException ie) {}
225 }
226
227 return false;
228 }
229
230
231
232 /**
233 * {@inheritDoc}
234 */
235 public void closeEntryWriter()
236 {
237 allGenerated = true;
238 }
239
240
241
242 /**
243 * Sets the I/O exception that should be thrown on any subsequent calls to
244 * <CODE>available</CODE> or <CODE>read</CODE>.
245 *
246 * @param ioException The I/O exception that should be thrown.
247 */
248 void setIOException(IOException ioException)
249 {
250 this.ioException = ioException;
251 }
252
253
254
255 /**
256 * Retrieves the next entry and puts it in the entry byte buffer.
257 *
258 * @return <CODE>true</CODE> if the next entry is available, or
259 * <CODE>false</CODE> if there are no more entries or if the input
260 * stream has been closed.
261 */
262 private boolean getNextEntry()
263 {
264 Entry entry = entryQueue.poll();
265 while (entry == null)
266 {
267 if (closed)
268 {
269 return false;
270 }
271 else if (allGenerated)
272 {
273 entry = entryQueue.poll();
274 if (entry == null)
275 {
276 return false;
277 }
278 }
279 else
280 {
281 try
282 {
283 entry = entryQueue.poll(500, TimeUnit.MILLISECONDS);
284 } catch (InterruptedException ie) {}
285 }
286 }
287
288 try
289 {
290 entryOutputStream.reset();
291 ldifWriter.writeEntry(entry);
292 ldifWriter.flush();
293 entryBytes = ByteBuffer.wrap(entryOutputStream.toByteArray());
294 }
295 catch (LDIFException le)
296 {
297 // This should never happen.
298 ioException = new IOException(le.getMessage());
299 return false;
300 }
301 catch (IOException ioe)
302 {
303 // Neither should this.
304 ioException = ioe;
305 return false;
306 }
307
308 return true;
309 }
310 }
311