001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028
029 import java.util.NoSuchElementException;
030 import java.util.SortedMap;
031 import java.util.TreeMap;
032
033 import org.opends.server.replication.common.ChangeNumber;
034 import org.opends.server.replication.common.ChangeNumberGenerator;
035 import org.opends.server.replication.common.ServerState;
036 import org.opends.server.replication.protocol.UpdateMessage;
037 import org.opends.server.types.operation.PluginOperation;
038
039 /**
040 * This class is use to store the list of local operations currently
041 * in progress and not yet committed in the database.
042 *
043 * It is used to make sure that operations are sent to the Replication
044 * Server in the order defined by their ChangeNumber.
045 * It is also used to update the ServerState at the appropriate time.
046 *
047 * On object of this class is instanciated for each ReplicationDomain.
048 */
049 public class PendingChanges
050 {
051 /**
052 * A map used to store the pending changes.
053 */
054 private SortedMap<ChangeNumber, PendingChange> pendingChanges =
055 new TreeMap<ChangeNumber, PendingChange>();
056
057 /**
058 * The ChangeNumberGenerator to use to create new unique ChangeNumbers
059 * for each operation done on the replication domain.
060 */
061 private ChangeNumberGenerator changeNumberGenerator;
062
063 /**
064 * The Replicationbroker that will be used to send UpdateMessage.
065 */
066 private ReplicationBroker broker;
067
068 /**
069 * The ServerState that will be updated when UpdateMessage are committed.
070 */
071 private ServerState state;
072
073 /**
074 * Creates a new PendingChanges using the provided ChangeNumberGenerator.
075 *
076 * @param changeNumberGenerator The ChangeNumberGenerator to use to create
077 * new unique ChangeNumbers.
078 * @param broker The Replicationbroker that will be used to send
079 * UpdateMessage.
080 * @param state The ServerState that will be updated when UpdateMessage
081 * are committed.
082 */
083 public PendingChanges(
084 ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
085 ServerState state)
086 {
087 this.changeNumberGenerator = changeNumberGenerator;
088 this.broker = broker;
089 this.state = state;
090 }
091
092 /**
093 * Remove and return an update form the pending changes list.
094 *
095 * @param changeNumber The ChangeNumber of the update to remove.
096 *
097 * @return The UpdateMessage that was just removed.
098 */
099 public synchronized UpdateMessage remove(ChangeNumber changeNumber)
100 {
101 return pendingChanges.remove(changeNumber).getMsg();
102 }
103
104 /**
105 * Returns the number of update currently in the list.
106 *
107 * @return The number of update currently in the list.
108 */
109 public int size()
110 {
111 return pendingChanges.size();
112 }
113
114 /**
115 * Mark an update message as committed.
116 *
117 * @param changeNumber The ChangeNumber of the update message that must be
118 * set as committed.
119 * @param msg The message associated to the update.
120 */
121 public synchronized void commit(ChangeNumber changeNumber,
122 UpdateMessage msg)
123 {
124 PendingChange curChange = pendingChanges.get(changeNumber);
125 if (curChange == null)
126 {
127 throw new NoSuchElementException();
128 }
129 curChange.setCommitted(true);
130
131 curChange.setMsg(msg);
132 }
133
134 /**
135 * Mark an update message as committed.
136 *
137 * @param changeNumber The ChangeNumber of the update message that must be
138 * set as committed.
139 */
140 public synchronized void commit(ChangeNumber changeNumber)
141 {
142 PendingChange curChange = pendingChanges.get(changeNumber);
143 if (curChange == null)
144 {
145 throw new NoSuchElementException();
146 }
147 curChange.setCommitted(true);
148 }
149
150 /**
151 * Add a new UpdateMessage to the pending list from the provided local
152 * operation.
153 *
154 * @param operation The local operation for which an UpdateMessage mus
155 * be added in the pending list.
156 * @return The ChangeNumber now associated to the operation.
157 */
158 public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
159 {
160 ChangeNumber changeNumber;
161
162 changeNumber = changeNumberGenerator.newChangeNumber();
163 PendingChange change = new PendingChange(changeNumber, operation, null);
164 pendingChanges.put(changeNumber, change);
165 return changeNumber;
166
167 }
168
169 /**
170 * Push all committed local changes to the replicationServer service.
171 *
172 * @return The number of pushed updates.
173 */
174 public synchronized int pushCommittedChanges()
175 {
176 int numSentUpdates = 0;
177 if (pendingChanges.isEmpty())
178 return numSentUpdates;
179
180 ChangeNumber firstChangeNumber = pendingChanges.firstKey();
181 PendingChange firstChange = pendingChanges.get(firstChangeNumber);
182
183 while ((firstChange != null) && firstChange.isCommitted())
184 {
185 if ((firstChange.getOp() != null ) &&
186 (firstChange.getOp().isSynchronizationOperation() == false))
187 {
188 numSentUpdates++;
189 broker.publish(firstChange.getMsg());
190 }
191 state.update(firstChangeNumber);
192 pendingChanges.remove(firstChangeNumber);
193
194 if (pendingChanges.isEmpty())
195 {
196 firstChange = null;
197 }
198 else
199 {
200 firstChangeNumber = pendingChanges.firstKey();
201 firstChange = pendingChanges.get(firstChangeNumber);
202 }
203 }
204 return numSentUpdates;
205 }
206 }