001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028 import org.opends.messages.Message;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.messages.ReplicationMessages.*;
032
033 import java.util.ArrayList;
034 import java.util.LinkedHashSet;
035 import java.util.LinkedList;
036 import java.util.List;
037 import java.util.Iterator;
038
039 import org.opends.server.core.DirectoryServer;
040 import org.opends.server.core.ModifyOperationBasis;
041 import org.opends.server.protocols.asn1.ASN1OctetString;
042 import org.opends.server.protocols.internal.InternalClientConnection;
043 import org.opends.server.protocols.internal.InternalSearchOperation;
044 import org.opends.server.protocols.ldap.LDAPAttribute;
045 import org.opends.server.protocols.ldap.LDAPFilter;
046 import org.opends.server.protocols.ldap.LDAPModification;
047 import org.opends.server.replication.common.ChangeNumber;
048 import org.opends.server.replication.common.ServerState;
049 import org.opends.server.types.Attribute;
050 import org.opends.server.types.AttributeType;
051 import org.opends.server.types.AttributeValue;
052 import org.opends.server.types.Control;
053 import org.opends.server.types.DN;
054 import org.opends.server.types.DereferencePolicy;
055 import org.opends.server.types.DirectoryException;
056 import org.opends.server.types.LDAPException;
057 import org.opends.server.types.ModificationType;
058 import org.opends.server.types.RawModification;
059 import org.opends.server.types.ResultCode;
060 import org.opends.server.types.SearchFilter;
061 import org.opends.server.types.SearchResultEntry;
062 import org.opends.server.types.SearchScope;
063
064 /**
065 * This class implements a ServerState that is stored on the backends
066 * used to store the synchronized data and that is therefore persistent
067 * accross server reboot.
068 */
069 public class PersistentServerState extends ServerState
070 {
071 private DN baseDn;
072 private boolean savedStatus = true;
073 private InternalClientConnection conn =
074 InternalClientConnection.getRootConnection();
075 private ASN1OctetString asn1BaseDn;
076 private short serverId;
077
078 /**
079 * The attribute name used to store the state in the backend.
080 */
081 protected static final String REPLICATION_STATE = "ds-sync-state";
082
083 /**
084 * create a new ServerState.
085 * @param baseDn The baseDN for which the ServerState is created
086 * @param serverId The serverId
087 */
088 public PersistentServerState(DN baseDn, short serverId)
089 {
090 this.baseDn = baseDn;
091 this.serverId = serverId;
092 asn1BaseDn = new ASN1OctetString(baseDn.toString());
093 loadState();
094 }
095
096 /**
097 * {@inheritDoc}
098 */
099 @Override
100 public boolean update(ChangeNumber changeNumber)
101 {
102 savedStatus = false;
103 return super.update(changeNumber);
104 }
105
106 /**
107 * Save this object to persistent storage.
108 */
109 public void save()
110 {
111 if (savedStatus)
112 return;
113
114 savedStatus = true;
115 ResultCode resultCode = updateStateEntry();
116 if (resultCode != ResultCode.SUCCESS)
117 {
118 savedStatus = false;
119 }
120 }
121
122 /**
123 * Load the ServerState from the backing entry in database to memory.
124 */
125 public void loadState()
126 {
127 SearchResultEntry stateEntry = null;
128
129 // try to load the state from the base entry.
130 stateEntry = searchBaseEntry();
131
132 if (stateEntry == null)
133 {
134 // The base entry does not exist yet
135 // in the database or was deleted. Try to read the ServerState
136 // from the configuration instead.
137 stateEntry = searchConfigEntry();
138 }
139
140 if (stateEntry != null)
141 {
142 updateStateFromEntry(stateEntry);
143 }
144
145 /*
146 * In order to make sure that the replication never looses changes,
147 * the server needs to search all the entries that have been
148 * updated after the last write of the ServerState.
149 * Inconsistencies may append after a crash.
150 */
151 checkAndUpdateServerState();
152 }
153
154 /**
155 * Run a search operation to find the base entry
156 * of the replication domain for which this ServerState was created.
157 *
158 * @return Thebasen Entry or null if no entry was found;
159 */
160 private SearchResultEntry searchBaseEntry()
161 {
162 LDAPFilter filter;
163
164 try
165 {
166 filter = LDAPFilter.decode("objectclass=*");
167 } catch (LDAPException e)
168 {
169 // can not happen
170 return null;
171 }
172
173 /*
174 * Search the database entry that is used to periodically
175 * save the ServerState
176 */
177 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
178 attributes.add(REPLICATION_STATE);
179 InternalSearchOperation search = conn.processSearch(asn1BaseDn,
180 SearchScope.BASE_OBJECT,
181 DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
182 filter,attributes);
183 if (((search.getResultCode() != ResultCode.SUCCESS)) &&
184 ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
185 {
186 Message message = ERR_ERROR_SEARCHING_RUV.
187 get(search.getResultCode().getResultCodeName(), search.toString(),
188 search.getErrorMessage(), baseDn.toString());
189 logError(message);
190 return null;
191 }
192
193 SearchResultEntry stateEntry = null;
194 if (search.getResultCode() == ResultCode.SUCCESS)
195 {
196 /*
197 * Read the serverState from the REPLICATION_STATE attribute
198 */
199 LinkedList<SearchResultEntry> result = search.getSearchEntries();
200 if (!result.isEmpty())
201 {
202 stateEntry = result.getFirst();
203 }
204 }
205 return stateEntry;
206 }
207
208 /**
209 * Run a search operation to find the entry with the configuration
210 * of the replication domain for which this ServerState was created.
211 *
212 * @return The configuration Entry or null if no entry was found;
213 */
214 private SearchResultEntry searchConfigEntry()
215 {
216 try
217 {
218 SearchFilter filter =
219 SearchFilter.createFilterFromString(
220 "(&(objectclass=ds-cfg-replication-domain)"
221 +"(ds-cfg-base-dn="+baseDn+"))");
222
223 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
224 attributes.add(REPLICATION_STATE);
225 InternalSearchOperation op =
226 conn.processSearch(DN.decode("cn=config"),
227 SearchScope.SUBORDINATE_SUBTREE,
228 DereferencePolicy.NEVER_DEREF_ALIASES,
229 1, 0, false, filter, attributes);
230
231 if (op.getResultCode() == ResultCode.SUCCESS)
232 {
233 /*
234 * Read the serverState from the REPLICATION_STATE attribute
235 */
236 LinkedList<SearchResultEntry> resultEntries =
237 op.getSearchEntries();
238 if (!resultEntries.isEmpty())
239 {
240 SearchResultEntry resultEntry = resultEntries.getFirst();
241 return resultEntry;
242 }
243 }
244 return null;
245 } catch (DirectoryException e)
246 {
247 // can not happen
248 return null;
249 }
250 }
251
252 /**
253 * Update this ServerState from the provided entry.
254 *
255 * @param resultEntry The entry that should be used to update this
256 * ServerState.
257 */
258 private void updateStateFromEntry(SearchResultEntry resultEntry)
259 {
260 AttributeType synchronizationStateType =
261 DirectoryServer.getAttributeType(REPLICATION_STATE);
262 List<Attribute> attrs =
263 resultEntry.getAttribute(synchronizationStateType);
264 if (attrs != null)
265 {
266 Attribute attr = attrs.get(0);
267 LinkedHashSet<AttributeValue> values = attr.getValues();
268 for (AttributeValue value : values)
269 {
270 ChangeNumber changeNumber =
271 new ChangeNumber(value.getStringValue());
272 update(changeNumber);
273 }
274 }
275 }
276
277 /**
278 * Save the current values of this PersistentState object
279 * in the appropiate entry of the database.
280 *
281 * @return a ResultCode indicating if the method was successfull.
282 */
283 private ResultCode updateStateEntry()
284 {
285 /*
286 * Generate a modify operation on the Server State baseD Entry.
287 */
288 ResultCode result = runUpdateStateEntry(baseDn);
289
290 if (result == ResultCode.NO_SUCH_OBJECT)
291 {
292 // The base entry does not exist yet in the database or
293 // has been deleted, save the state to the config entry instead.
294 SearchResultEntry configEntry = searchConfigEntry();
295 if (configEntry != null)
296 {
297 DN configDN = configEntry.getDN();
298 result = runUpdateStateEntry(configDN);
299 }
300 }
301 return result;
302 }
303
304 /**
305 * Run a modify operation to update the entry whose DN is given as
306 * a parameter with the serverState information.
307 *
308 * @param serverStateEntryDN The DN of the entry to be updated.
309 *
310 * @return A ResultCode indicating if the operation was successful.
311 */
312 private ResultCode runUpdateStateEntry(DN serverStateEntryDN)
313 {
314 ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
315
316 LDAPAttribute attr =
317 new LDAPAttribute(REPLICATION_STATE, values);
318 LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
319 ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
320 mods.add(mod);
321
322 ModifyOperationBasis op =
323 new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
324 InternalClientConnection.nextMessageID(),
325 new ArrayList<Control>(0),
326 new ASN1OctetString(serverStateEntryDN.toString()),
327 mods);
328 op.setInternalOperation(true);
329 op.setSynchronizationOperation(true);
330 op.setDontSynchronize(true);
331 op.run();
332 if (op.getResultCode() != ResultCode.SUCCESS)
333 {
334 Message message = DEBUG_ERROR_UPDATING_RUV.get(
335 op.getResultCode().getResultCodeName().toString(),
336 op.toString(),
337 op.getErrorMessage().toString(),
338 baseDn.toString());
339 logError(message);
340 }
341 return op.getResultCode();
342 }
343
344 /**
345 * Empty the ServerState.
346 * After this call the Server State will be in the same state
347 * as if it was just created.
348 */
349 public void clearInMemory()
350 {
351 super.clear();
352 this.savedStatus = false;
353 }
354
355 /**
356 * Empty the ServerState.
357 * After this call the Server State will be in the same state
358 * as if it was just created.
359 */
360 public void clear()
361 {
362 clearInMemory();
363 save();
364 }
365
366 /**
367 * The ServerState is saved to the database periodically,
368 * therefore in case of crash it is possible that is does not contain
369 * the latest changes that have been processed and saved to the
370 * database.
371 * In order to make sure that we don't loose them, search all the entries
372 * that have been updated after this entry.
373 * This is done by using the HistoricalCsnOrderingMatchingRule
374 * and an ordering index for historical attribute
375 */
376 public final void checkAndUpdateServerState() {
377 Message message;
378 InternalSearchOperation op;
379 ChangeNumber serverStateMaxCn;
380 ChangeNumber dbMaxCn;
381 final AttributeType histType =
382 DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
383
384 // Retrieves the entries that have changed since the
385 // maxCn stored in the serverState
386 synchronized (this)
387 {
388 serverStateMaxCn = this.getMaxChangeNumber(serverId);
389
390 if (serverStateMaxCn == null)
391 return;
392
393 try {
394 op = ReplicationBroker.searchForChangedEntries(baseDn,
395 serverStateMaxCn, null);
396 }
397 catch (Exception e)
398 {
399 return;
400 }
401 if (op.getResultCode() != ResultCode.SUCCESS)
402 {
403 // An error happened trying to search for the updates
404 // Log an error
405 message = ERR_CANNOT_RECOVER_CHANGES.get(
406 baseDn.toNormalizedString());
407 logError(message);
408 }
409 else
410 {
411 dbMaxCn = serverStateMaxCn;
412 for (SearchResultEntry resEntry : op.getSearchEntries())
413 {
414 List<Attribute> attrs = resEntry.getAttribute(histType);
415 Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
416 try
417 {
418 while (true)
419 {
420 AttributeValue attrVal = iav.next();
421 HistVal histVal = new HistVal(attrVal.getStringValue());
422 ChangeNumber cn = histVal.getCn();
423
424 if ((cn != null) && (cn.getServerId() == serverId))
425 {
426 // compare the csn regarding the maxCn we know and
427 // store the biggest
428 if (ChangeNumber.compare(dbMaxCn, cn) < 0)
429 {
430 dbMaxCn = cn;
431 }
432 }
433 }
434 }
435 catch(Exception e)
436 {
437 }
438 }
439
440 if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
441 {
442 // Update the serverState with the new maxCn
443 // present in the database
444 this.update(dbMaxCn);
445 message = NOTE_SERVER_STATE_RECOVERY.get(
446 baseDn.toNormalizedString(), dbMaxCn.toString());
447 logError(message);
448 }
449 }
450 }
451 }
452 }