001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028 import java.util.concurrent.LinkedBlockingQueue;
029 import java.util.concurrent.TimeUnit;
030 import org.opends.messages.Message;
031
032 import static org.opends.server.loggers.ErrorLogger.logError;
033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
035 import static org.opends.messages.ReplicationMessages.*;
036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037
038 import org.opends.server.api.DirectoryThread;
039 import org.opends.server.loggers.debug.DebugTracer;
040 import org.opends.server.replication.protocol.UpdateMessage;
041
042 /**
043 * Thread that is used to get messages from the Replication servers
044 * and replay them in the current server.
045 */
046 public class ListenerThread extends DirectoryThread
047 {
048 /**
049 * The tracer object for the debug logger.
050 */
051 private static final DebugTracer TRACER = getTracer();
052
053 private ReplicationDomain repDomain;
054 private boolean shutdown = false;
055 private boolean done = false;
056 private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
057
058
059 /**
060 * Constructor for the ListenerThread.
061 *
062 * @param repDomain the replication domain that created this thread
063 * @param updateToReplayQueue The update messages queue we must
064 * store messages in
065 */
066 public ListenerThread(ReplicationDomain repDomain,
067 LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
068 {
069 super("Replication Listener thread " +
070 "serverID=" + repDomain.serverId +
071 " domain=" + repDomain.getName());
072 this.repDomain = repDomain;
073 this.updateToReplayQueue = updateToReplayQueue;
074 }
075
076 /**
077 * Shutdown this listener thread.
078 */
079 public void shutdown()
080 {
081 shutdown = true;
082 }
083
084 /**
085 * Run method for this class.
086 */
087 @Override
088 public void run()
089 {
090 UpdateMessage updateMsg = null;
091
092 if (debugEnabled())
093 {
094 TRACER.debugInfo("Replication Listener thread starting.");
095 }
096
097 while (!shutdown)
098 {
099 try
100 {
101 // Loop receiving update messages and puting them in the update message
102 // queue
103 while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
104 {
105 // Put update message into the queue (block until some place in the
106 // queue is available)
107 UpdateToReplay updateToReplay =
108 new UpdateToReplay(updateMsg, repDomain);
109 boolean queued = false;
110 while (!queued && !shutdown)
111 {
112 // Use timedout method (offer) instead of put for being able to
113 // shutdown the thread
114 queued = updateToReplayQueue.offer(updateToReplay,
115 1L, TimeUnit.SECONDS);
116 }
117 if (!queued)
118 {
119 // Shutdown requested but could not push message: ensure this one is
120 // not lost and put it in the queue before dying
121 updateToReplayQueue.offer(updateToReplay);
122 }
123 }
124 if (updateMsg == null)
125 shutdown = true;
126 } catch (Exception e)
127 {
128 /*
129 * catch all exceptions happening in repDomain.receive so that the
130 * thread never dies even in case of problems.
131 */
132 Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
133 stackTraceToSingleLineString(e));
134 logError(message);
135 }
136 }
137
138 // Stop the HeartBeat thread
139 repDomain.getBroker().stopHeartBeat();
140
141 done = true;
142
143 if (debugEnabled())
144 {
145 TRACER.debugInfo("Replication Listener thread stopping.");
146 }
147 }
148
149 /**
150 * Wait for the completion of this thread.
151 */
152 public void waitForShutdown()
153 {
154 try
155 {
156 while (done == false)
157 {
158 Thread.sleep(50);
159 }
160 } catch (InterruptedException e)
161 {
162 // exit the loop if this thread is interrupted.
163 }
164 }
165 }