001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.loggers;
028
029
030
031 import java.util.ArrayList;
032 import java.util.concurrent.LinkedBlockingQueue;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicBoolean;
035
036 import org.opends.messages.Message;
037 import org.opends.server.api.DirectoryThread;
038 import org.opends.server.api.ServerShutdownListener;
039 import org.opends.server.core.DirectoryServer;
040
041
042
043 /**
044 * A Text Writer which writes log records asynchronously to
045 * character-based stream.
046 */
047 public class AsyncronousTextWriter
048 implements ServerShutdownListener, TextWriter
049 {
050 /**
051 * The wrapped Text Writer.
052 */
053 private final TextWriter writer;
054
055 /** Queue to store unpublished records. */
056 private final LinkedBlockingQueue<String> queue;
057
058 /** The capacity for the queue. */
059 private final int capacity;
060
061 private String name;
062 private AtomicBoolean stopRequested;
063 private WriterThread writerThread;
064
065 private boolean autoFlush;
066
067 /**
068 * Construct a new AsyncronousTextWriter wrapper.
069 *
070 * @param name the name of the thread.
071 * @param capacity the size of the queue before it gets flushed.
072 * @param autoFlush indicates if the underlying writer should be flushed
073 * after the queue is flushed.
074 * @param writer a character stream used for output.
075 */
076 public AsyncronousTextWriter(String name, int capacity, boolean autoFlush,
077 TextWriter writer)
078 {
079 this.name = name;
080 this.autoFlush = autoFlush;
081 this.writer = writer;
082
083 this.queue = new LinkedBlockingQueue<String>(capacity);
084 this.capacity = capacity;
085 this.writerThread = null;
086 this.stopRequested = new AtomicBoolean(false);
087
088 writerThread = new WriterThread();
089 writerThread.start();
090
091 DirectoryServer.registerShutdownListener(this);
092 }
093
094 /**
095 * The publisher thread is responsible for emptying the queue of log records
096 * waiting to published.
097 */
098 private class WriterThread extends DirectoryThread
099 {
100 public WriterThread()
101 {
102 super(name);
103 }
104 /**
105 * the run method of the writerThread. Run until queue is empty
106 * AND we've been asked to terminate
107 */
108 public void run()
109 {
110 ArrayList<String> drainList = new ArrayList<String>(capacity);
111
112 String message = null;
113 while (!stopRequested.get() || !queue.isEmpty()) {
114 try
115 {
116 queue.drainTo(drainList, capacity);
117 if (drainList.isEmpty())
118 {
119 message = queue.poll(10, TimeUnit.SECONDS);
120 if(message != null)
121 {
122 do
123 {
124 writer.writeRecord(message);
125 message = queue.poll();
126 }
127 while(message != null);
128
129 if(autoFlush)
130 {
131 flush();
132 }
133 }
134 }
135 else
136 {
137 for (String record : drainList)
138 {
139 writer.writeRecord(record);
140 }
141 drainList.clear();
142
143 if (autoFlush)
144 {
145 flush();
146 }
147 }
148 }
149 catch (InterruptedException ex) {
150 // Ignore. We'll rerun the loop
151 // and presumably fall out.
152 }
153 }
154 }
155 }
156
157 /**
158 * Write the log record asyncronously.
159 *
160 * @param record the log record to write.
161 */
162 public void writeRecord(String record)
163 {
164 // No writer? Off to the bit bucket.
165 if (writer != null) {
166 while (!stopRequested.get())
167 {
168 // Put request on queue for writer
169 try
170 {
171 queue.put(record);
172 break;
173 }
174 catch(InterruptedException e)
175 {
176 // We expect this to happen. Just ignore it and hopefully
177 // drop out in the next try.
178 }
179 }
180 }
181 }
182
183 /**
184 * {@inheritDoc}
185 */
186 public void flush()
187 {
188 writer.flush();
189 }
190
191 /**
192 * {@inheritDoc}
193 */
194 public long getBytesWritten()
195 {
196 return writer.getBytesWritten();
197 }
198
199 /**
200 * Retrieves the wrapped writer.
201 *
202 * @return The wrapped writer used by this asyncronous writer.
203 */
204 public TextWriter getWrappedWriter()
205 {
206 return writer;
207 }
208
209 /**
210 * {@inheritDoc}
211 */
212 public String getShutdownListenerName()
213 {
214 return "AsyncronousTextWriter Thread " + name;
215 }
216
217 /**
218 * {@inheritDoc}
219 */
220 public void processServerShutdown(Message reason)
221 {
222 // Don't shutdown the wrapped writer on server shutdown as it
223 // might get more write requests before the log publishers are
224 // manually shutdown just before the server process exists.
225 shutdown(false);
226 }
227
228 /**
229 * {@inheritDoc}
230 */
231 public void shutdown()
232 {
233 shutdown(true);
234 }
235
236 /**
237 * Releases any resources held by the writer.
238 *
239 * @param shutdownWrapped If the wrapped writer should be closed as well.
240 */
241 public void shutdown(boolean shutdownWrapped)
242 {
243 stopRequested.set(true);
244
245 // Wait for publisher thread to terminate
246 while (writerThread != null && writerThread.isAlive()) {
247 try {
248 // Interrupt the thread if its blocking
249 writerThread.interrupt();
250 writerThread.join();
251 }
252 catch (InterruptedException ex) {
253 // Ignore; we gotta wait..
254 }
255 }
256
257 // The writer writerThread SHOULD have drained the queue.
258 // If not, handle outstanding requests ourselves,
259 // and push them to the writer.
260 while (!queue.isEmpty()) {
261 String message = queue.poll();
262 writer.writeRecord(message);
263 }
264
265 // Shutdown the wrapped writer.
266 if (shutdownWrapped && writer != null) writer.shutdown();
267
268 DirectoryServer.deregisterShutdownListener(this);
269 }
270
271 /**
272 * Set the auto flush setting for this writer.
273 *
274 * @param autoFlush If the writer should flush the buffer after every line.
275 */
276 public void setAutoFlush(boolean autoFlush)
277 {
278 this.autoFlush = autoFlush;
279 }
280 }