001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028 import java.util.concurrent.BlockingQueue;
029 import java.util.concurrent.TimeUnit;
030 import org.opends.messages.Message;
031
032 import static org.opends.server.loggers.ErrorLogger.logError;
033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
035 import static org.opends.messages.ReplicationMessages.*;
036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
037
038 import org.opends.server.api.DirectoryThread;
039 import org.opends.server.loggers.debug.DebugTracer;
040 import org.opends.server.replication.protocol.UpdateMessage;
041
042 /**
043 * Thread that is used to get message from the replication servers (stored
044 * in the updates queue) and replay them in the current server. A configurable
045 * number of this thread is created for the whole MultimasterReplication object
046 * (i.e: these threads are shared accross the ReplicationDomain objects for
047 * replaying the updates they receive)
048 */
049 public class ReplayThread extends DirectoryThread
050 {
051 /**
052 * The tracer object for the debug logger.
053 */
054 private static final DebugTracer TRACER = getTracer();
055
056 private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
057 private boolean shutdown = false;
058 private boolean done = false;
059 private static int count = 0;
060
061 /**
062 * Constructor for the ReplayThread.
063 *
064 * @param updateToReplayQueue The queue of update messages we have to replay
065 */
066 public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
067 {
068 super("Replication Replay thread " + count++);
069 this.updateToReplayQueue = updateToReplayQueue;
070 }
071
072 /**
073 * Shutdown this replay thread.
074 */
075 public void shutdown()
076 {
077 shutdown = true;
078 }
079
080 /**
081 * Run method for this class.
082 */
083 @Override
084 public void run()
085 {
086
087 if (debugEnabled())
088 {
089 TRACER.debugInfo("Replication Replay thread starting.");
090 }
091
092 UpdateToReplay updateToreplay = null;
093
094 while (!shutdown)
095 {
096 try
097 {
098 // Loop getting an updateToReplayQueue from the update message queue and
099 // replaying matching changes
100 while ( (!shutdown) &&
101 ((updateToreplay = updateToReplayQueue.poll(1L,
102 TimeUnit.SECONDS)) != null))
103 {
104 // Find replication domain for that update message
105 UpdateMessage updateMsg = updateToreplay.getUpdateMessage();
106 ReplicationDomain domain = updateToreplay.getReplicationDomain();
107 domain.replay(updateMsg);
108 }
109 } catch (Exception e)
110 {
111 /*
112 * catch all exceptions happening so that the thread never dies even
113 * in case of problems.
114 */
115 Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get(
116 stackTraceToSingleLineString(e));
117 logError(message);
118 }
119 }
120 done = true;
121 if (debugEnabled())
122 {
123 TRACER.debugInfo("Replication Replay thread stopping.");
124 }
125 }
126
127 /**
128 * Wait for the completion of this thread.
129 */
130 public void waitForShutdown()
131 {
132 try
133 {
134 while ((done == false) && (this.isAlive()))
135 {
136 Thread.sleep(50);
137 }
138 } catch (InterruptedException e)
139 {
140 // exit the loop if this thread is interrupted.
141 }
142 }
143 }