001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.extensions;
028 import org.opends.messages.Message;
029
030
031
032 import java.util.Map;
033
034 import org.opends.server.api.DirectoryThread;
035 import org.opends.server.core.DirectoryServer;
036 import org.opends.server.types.AbstractOperation;
037 import org.opends.server.types.CancelRequest;
038 import org.opends.server.types.DebugLogLevel;
039 import org.opends.server.types.DisconnectReason;
040
041 import static org.opends.server.loggers.ErrorLogger.*;
042 import static org.opends.server.loggers.debug.DebugLogger.*;
043 import org.opends.server.loggers.debug.DebugTracer;
044 import static org.opends.messages.CoreMessages.*;
045 import static org.opends.server.util.StaticUtils.*;
046
047
048 /**
049 * This class defines a data structure for storing and interacting with a
050 * Directory Server worker thread.
051 */
052 public class TraditionalWorkerThread
053 extends DirectoryThread
054 {
055 /**
056 * The tracer object for the debug logger.
057 */
058 private static final DebugTracer TRACER = getTracer();
059
060 // Indicates whether the Directory Server is shutting down and this thread
061 // should stop running.
062 private boolean shutdownRequested;
063
064 // Indicates whether this thread was stopped because the server threadnumber
065 // was reduced.
066 private boolean stoppedByReducedThreadNumber;
067
068 // Indicates whether this thread is currently waiting for work.
069 private boolean waitingForWork;
070
071 // The operation that this worker thread is currently processing.
072 private AbstractOperation operation;
073
074 // The handle to the actual thread for this worker thread.
075 private Thread workerThread;
076
077 // The work queue that this worker thread will service.
078 private TraditionalWorkQueue workQueue;
079
080
081
082 /**
083 * Creates a new worker thread that will service the provided work queue and
084 * process any new requests that are submitted.
085 *
086 * @param workQueue The work queue with which this worker thread is
087 * associated.
088 * @param threadID The thread ID for this worker thread.
089 */
090 public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID)
091 {
092 super("Worker Thread " + threadID);
093
094
095 this.workQueue = workQueue;
096
097 stoppedByReducedThreadNumber = false;
098 shutdownRequested = false;
099 waitingForWork = false;
100 operation = null;
101 workerThread = null;
102 }
103
104
105
106 /**
107 * Indicates that this thread is about to be stopped because the Directory
108 * Server configuration has been updated to reduce the number of worker
109 * threads.
110 */
111 public void setStoppedByReducedThreadNumber()
112 {
113 stoppedByReducedThreadNumber = true;
114 }
115
116
117
118 /**
119 * Indicates whether this worker thread is actively processing a request.
120 * Note that this is a point-in-time determination and if a reliable answer is
121 * expected then the server should impose some external constraint to ensure
122 * that no new requests are enqueued.
123 *
124 * @return {@code true} if this worker thread is actively processing a
125 * request, or {@code false} if it is idle.
126 */
127 public boolean isActive()
128 {
129 return (isAlive() && (operation != null));
130 }
131
132
133
134 /**
135 * Operates in a loop, retrieving the next request from the work queue,
136 * processing it, and then going back to the queue for more.
137 */
138 public void run()
139 {
140 workerThread = currentThread();
141
142 while (! shutdownRequested)
143 {
144 try
145 {
146 waitingForWork = true;
147 operation = null;
148 operation = workQueue.nextOperation(this);
149 waitingForWork = false;
150
151
152 if (operation == null)
153 {
154 // The operation may be null if the server is shutting down. If that
155 // is the case, then break out of the while loop.
156 break;
157 }
158 else
159 {
160 // The operation is not null, so process it. Make sure that when
161 // processing is complete.
162 operation.run();
163 operation.operationCompleted();
164 }
165 }
166 catch (Throwable t)
167 {
168 if (debugEnabled())
169 {
170 TRACER.debugWarning(
171 "Uncaught exception in worker thread while processing " +
172 "operation %s: %s", String.valueOf(operation), t);
173
174 TRACER.debugCaught(DebugLogLevel.ERROR, t);
175 }
176
177 try
178 {
179 Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.
180 get(getName(), String.valueOf(operation),
181 stackTraceToSingleLineString(t));
182 logError(message);
183
184 operation.setResultCode(DirectoryServer.getServerErrorResultCode());
185 operation.appendErrorMessage(message);
186 operation.getClientConnection().sendResponse(operation);
187 }
188 catch (Throwable t2)
189 {
190 if (debugEnabled())
191 {
192 TRACER.debugWarning(
193 "Exception in worker thread while trying to log a " +
194 "message about an uncaught exception %s: %s", t, t2);
195
196 TRACER.debugCaught(DebugLogLevel.ERROR, t2);
197 }
198 }
199
200
201 try
202 {
203 Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(),
204 String.valueOf(operation),
205 stackTraceToSingleLineString(t));
206
207 operation.disconnectClient(DisconnectReason.SERVER_ERROR,
208 true, message);
209 }
210 catch (Throwable t2)
211 {
212 if (debugEnabled())
213 {
214 TRACER.debugCaught(DebugLogLevel.ERROR, t2);
215 }
216 }
217 }
218 }
219
220 // If we have gotten here, then we presume that the server thread is
221 // shutting down. However, if that's not the case then that is a problem
222 // and we will want to log a message.
223 if (stoppedByReducedThreadNumber)
224 {
225 logError(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER.get(getName()));
226 }
227 else if (! workQueue.shutdownRequested())
228 {
229 logError(WARN_UNEXPECTED_WORKER_THREAD_EXIT.get(getName()));
230 }
231
232
233 if (debugEnabled())
234 {
235 TRACER.debugInfo(getName() + " exiting.");
236 }
237 }
238
239
240
241 /**
242 * Indicates that the Directory Server has received a request to stop running
243 * and that this thread should stop running as soon as possible.
244 */
245 public void shutDown()
246 {
247 if (debugEnabled())
248 {
249 TRACER.debugInfo(getName() + " being signaled to shut down.");
250 }
251
252 // Set a flag that indicates that the thread should stop running.
253 shutdownRequested = true;
254
255
256 // Check to see if the thread is waiting for work. If so, then interrupt
257 // it.
258 if (waitingForWork)
259 {
260 try
261 {
262 workerThread.interrupt();
263 }
264 catch (Exception e)
265 {
266 if (debugEnabled())
267 {
268 TRACER.debugWarning(
269 "Caught an exception while trying to interrupt the worker " +
270 "thread waiting for work: %s", e);
271 TRACER.debugCaught(DebugLogLevel.ERROR, e);
272 }
273 }
274 }
275 else
276 {
277 try
278 {
279 CancelRequest cancelRequest =
280 new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
281 operation.cancel(cancelRequest);
282 }
283 catch (Exception e)
284 {
285 if (debugEnabled())
286 {
287 TRACER.debugWarning(
288 "Caught an exception while trying to abandon the " +
289 "operation in progress for the worker thread: %s", e);
290 TRACER.debugCaught(DebugLogLevel.ERROR, e);
291 }
292 }
293 }
294 }
295
296 /**
297 * Retrieves any relevent debug information with which this tread is
298 * associated so they can be included in debug messages.
299 *
300 * @return debug information about this thread as a string.
301 */
302 public Map<String, String> getDebugProperties()
303 {
304 Map<String, String> properties = super.getDebugProperties();
305 properties.put("clientConnection",
306 operation.getClientConnection().toString());
307 properties.put("operation", operation.toString());
308
309 return properties;
310 }
311 }
312