001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028 import static org.opends.messages.ReplicationMessages.*;
029 import static org.opends.messages.ToolMessages.*;
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033 import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
034 import static org.opends.server.replication.protocol.OperationContext.*;
035 import static org.opends.server.util.ServerConstants.*;
036 import static org.opends.server.util.StaticUtils.createEntry;
037 import static org.opends.server.util.StaticUtils.getFileForPath;
038 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
039
040 import java.io.File;
041 import java.io.IOException;
042 import java.io.OutputStream;
043 import java.net.SocketTimeoutException;
044 import java.util.ArrayList;
045 import java.util.Collection;
046 import java.util.HashSet;
047 import java.util.LinkedHashMap;
048 import java.util.LinkedHashSet;
049 import java.util.LinkedList;
050 import java.util.List;
051 import java.util.NoSuchElementException;
052 import java.util.SortedMap;
053 import java.util.TreeMap;
054 import java.util.concurrent.LinkedBlockingQueue;
055 import java.util.concurrent.atomic.AtomicInteger;
056 import java.util.zip.Adler32;
057 import java.util.zip.CheckedOutputStream;
058 import java.util.zip.DataFormatException;
059
060 import org.opends.messages.Message;
061 import org.opends.messages.MessageBuilder;
062 import org.opends.server.admin.server.ConfigurationChangeListener;
063 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
064 import org.opends.server.admin.std.server.ReplicationDomainCfg;
065 import org.opends.server.api.AlertGenerator;
066 import org.opends.server.api.Backend;
067 import org.opends.server.api.DirectoryThread;
068 import org.opends.server.api.SynchronizationProvider;
069 import org.opends.server.backends.jeb.BackendImpl;
070 import org.opends.server.backends.task.Task;
071 import org.opends.server.config.ConfigException;
072 import org.opends.server.core.AddOperation;
073 import org.opends.server.core.DeleteOperation;
074 import org.opends.server.core.DirectoryServer;
075 import org.opends.server.core.LockFileManager;
076 import org.opends.server.core.ModifyDNOperation;
077 import org.opends.server.core.ModifyDNOperationBasis;
078 import org.opends.server.core.ModifyOperation;
079 import org.opends.server.core.ModifyOperationBasis;
080 import org.opends.server.loggers.debug.DebugTracer;
081 import org.opends.server.protocols.asn1.ASN1Exception;
082 import org.opends.server.protocols.asn1.ASN1OctetString;
083 import org.opends.server.protocols.internal.InternalClientConnection;
084 import org.opends.server.protocols.internal.InternalSearchOperation;
085 import org.opends.server.protocols.ldap.LDAPAttribute;
086 import org.opends.server.protocols.ldap.LDAPFilter;
087 import org.opends.server.protocols.ldap.LDAPModification;
088 import org.opends.server.replication.common.ChangeNumber;
089 import org.opends.server.replication.common.ChangeNumberGenerator;
090 import org.opends.server.replication.common.ServerState;
091 import org.opends.server.replication.protocol.AckMessage;
092 import org.opends.server.replication.protocol.AddContext;
093 import org.opends.server.replication.protocol.AddMsg;
094 import org.opends.server.replication.protocol.DeleteContext;
095 import org.opends.server.replication.protocol.DoneMessage;
096 import org.opends.server.replication.protocol.EntryMessage;
097 import org.opends.server.replication.protocol.ErrorMessage;
098 import org.opends.server.replication.protocol.HeartbeatMessage;
099 import org.opends.server.replication.protocol.InitializeRequestMessage;
100 import org.opends.server.replication.protocol.InitializeTargetMessage;
101 import org.opends.server.replication.protocol.ModifyContext;
102 import org.opends.server.replication.protocol.ModifyDNMsg;
103 import org.opends.server.replication.protocol.ModifyDnContext;
104 import org.opends.server.replication.protocol.OperationContext;
105 import org.opends.server.replication.protocol.ReplSessionSecurity;
106 import org.opends.server.replication.protocol.ReplicationMessage;
107 import org.opends.server.replication.protocol.ResetGenerationId;
108 import org.opends.server.replication.protocol.RoutableMessage;
109 import org.opends.server.replication.protocol.UpdateMessage;
110 import org.opends.server.tasks.InitializeTargetTask;
111 import org.opends.server.tasks.InitializeTask;
112 import org.opends.server.tasks.TaskUtils;
113 import org.opends.server.types.ExistingFileBehavior;
114 import org.opends.server.types.AbstractOperation;
115 import org.opends.server.types.Attribute;
116 import org.opends.server.types.AttributeType;
117 import org.opends.server.types.AttributeValue;
118 import org.opends.server.types.ConfigChangeResult;
119 import org.opends.server.types.Control;
120 import org.opends.server.types.DN;
121 import org.opends.server.types.DereferencePolicy;
122 import org.opends.server.types.DirectoryException;
123 import org.opends.server.types.Entry;
124 import org.opends.server.types.LDAPException;
125 import org.opends.server.types.LDIFExportConfig;
126 import org.opends.server.types.LDIFImportConfig;
127 import org.opends.server.types.Modification;
128 import org.opends.server.types.ModificationType;
129 import org.opends.server.types.Operation;
130 import org.opends.server.types.RDN;
131 import org.opends.server.types.RawModification;
132 import org.opends.server.types.ResultCode;
133 import org.opends.server.types.SearchFilter;
134 import org.opends.server.types.SearchResultEntry;
135 import org.opends.server.types.SearchScope;
136 import org.opends.server.types.SynchronizationProviderResult;
137 import org.opends.server.types.operation.PluginOperation;
138 import org.opends.server.types.operation.PostOperationOperation;
139 import org.opends.server.types.operation.PreOperationAddOperation;
140 import org.opends.server.types.operation.PreOperationDeleteOperation;
141 import org.opends.server.types.operation.PreOperationModifyDNOperation;
142 import org.opends.server.types.operation.PreOperationModifyOperation;
143 import org.opends.server.types.operation.PreOperationOperation;
144 import org.opends.server.workflowelement.localbackend.*;
145
146 /**
147 * This class implements the bulk part of the.of the Directory Server side
148 * of the replication code.
149 * It contains the root method for publishing a change,
150 * processing a change received from the replicationServer service,
151 * handle conflict resolution,
152 * handle protocol messages from the replicationServer.
153 */
154 public class ReplicationDomain extends DirectoryThread
155 implements ConfigurationChangeListener<ReplicationDomainCfg>,
156 AlertGenerator
157 {
158 /**
159 * The fully-qualified name of this class.
160 */
161 private static final String CLASS_NAME =
162 "org.opends.server.replication.plugin.ReplicationDomain";
163
164 /**
165 * The attribute used to mark conflicting entries.
166 * The value of this attribute should be the dn that this entry was
167 * supposed to have when it was marked as conflicting.
168 */
169 public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
170
171 /**
172 * The tracer object for the debug logger.
173 */
174 private static final DebugTracer TRACER = getTracer();
175
176 private ReplicationMonitor monitor;
177
178 private ReplicationBroker broker;
179 // Thread waiting for incoming update messages for this domain and pushing
180 // them to the global incoming update message queue for later processing by
181 // replay threads.
182 private ListenerThread listenerThread;
183 // The update to replay message queue where the listener thread is going to
184 // push incoming update messages.
185 private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
186 private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
187 new TreeMap<ChangeNumber, UpdateMessage>();
188 private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
189 private AtomicInteger numSentUpdates = new AtomicInteger(0);
190 private AtomicInteger numProcessedUpdates = new AtomicInteger();
191 private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
192 private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
193 private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
194 private int debugCount = 0;
195 private PersistentServerState state;
196 private int numReplayedPostOpCalled = 0;
197
198 private int maxReceiveQueue = 0;
199 private int maxSendQueue = 0;
200 private int maxReceiveDelay = 0;
201 private int maxSendDelay = 0;
202
203 private long generationId = -1;
204 private boolean generationIdSavedStatus = false;
205
206 ChangeNumberGenerator generator;
207
208 /**
209 * This object is used to store the list of update currently being
210 * done on the local database.
211 * Is is usefull to make sure that the local operations are sent in a
212 * correct order to the replication server and that the ServerState
213 * is not updated too early.
214 */
215 private PendingChanges pendingChanges;
216
217 /**
218 * It contain the updates that were done on other servers, transmitted
219 * by the replication server and that are currently replayed.
220 * It is usefull to make sure that dependencies between operations
221 * are correctly fullfilled and to to make sure that the ServerState is
222 * not updated too early.
223 */
224 private RemotePendingChanges remotePendingChanges;
225
226 /**
227 * The time in milliseconds between heartbeats from the replication
228 * server. Zero means heartbeats are off.
229 */
230 private long heartbeatInterval = 0;
231 short serverId;
232
233 // The context related to an import or export being processed
234 // Null when none is being processed.
235 private IEContext ieContext = null;
236
237 private Collection<String> replicationServers;
238
239 private DN baseDN;
240
241 private boolean shutdown = false;
242
243 private InternalClientConnection conn =
244 InternalClientConnection.getRootConnection();
245
246 private boolean solveConflictFlag = true;
247
248 private boolean disabled = false;
249 private boolean stateSavingDisabled = false;
250
251 private int window = 100;
252
253 /**
254 * The isolation policy that this domain is going to use.
255 * This field describes the behavior of the domain when an update is
256 * attempted and the domain could not connect to any Replication Server.
257 * Possible values are accept-updates or deny-updates, but other values
258 * may be added in the futur.
259 */
260 private IsolationPolicy isolationpolicy;
261
262 /**
263 * The DN of the configuration entry of this domain.
264 */
265 private DN configDn;
266
267 /**
268 * A boolean indicating if the thread used to save the persistentServerState
269 * is terminated.
270 */
271 private boolean done = true;
272
273 /**
274 * This class contain the context related to an import or export
275 * launched on the domain.
276 */
277 private class IEContext
278 {
279 // The task that initiated the operation.
280 Task initializeTask;
281 // The input stream for the import
282 ReplLDIFInputStream ldifImportInputStream = null;
283 // The target in the case of an export
284 short exportTarget = RoutableMessage.UNKNOWN_SERVER;
285 // The source in the case of an import
286 short importSource = RoutableMessage.UNKNOWN_SERVER;
287
288 // The total entry count expected to be processed
289 long entryCount = 0;
290 // The count for the entry not yet processed
291 long entryLeftCount = 0;
292
293 boolean checksumOutput = false;
294
295 // The exception raised when any
296 DirectoryException exception = null;
297 long checksumOutputValue = (long)0;
298
299 /**
300 * Initializes the import/export counters with the provider value.
301 * @param count The value with which to initialize the counters.
302 */
303 public void setCounters(long total, long left)
304 throws DirectoryException
305 {
306 entryCount = total;
307 entryLeftCount = left;
308
309 if (initializeTask != null)
310 {
311 if (initializeTask instanceof InitializeTask)
312 {
313 ((InitializeTask)initializeTask).setTotal(entryCount);
314 ((InitializeTask)initializeTask).setLeft(entryCount);
315 }
316 else if (initializeTask instanceof InitializeTargetTask)
317 {
318 ((InitializeTargetTask)initializeTask).setTotal(entryCount);
319 ((InitializeTargetTask)initializeTask).setLeft(entryCount);
320 }
321 }
322 }
323
324 /**
325 * Update the counters of the task for each entry processed during
326 * an import or export.
327 */
328 public void updateCounters()
329 throws DirectoryException
330 {
331 entryLeftCount--;
332
333 if (initializeTask != null)
334 {
335 if (initializeTask instanceof InitializeTask)
336 {
337 ((InitializeTask)initializeTask).setLeft(entryLeftCount);
338 }
339 else if (initializeTask instanceof InitializeTargetTask)
340 {
341 ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
342 }
343 }
344 }
345
346 /**
347 * {@inheritDoc}
348 */
349 public String toString()
350 {
351 return new String("[ Entry count=" + this.entryCount +
352 ", Entry left count=" + this.entryLeftCount + "]");
353 }
354 }
355
356 /**
357 * This thread is launched when we want to export data to another server that
358 * has requested to be initialized with the data of our backend.
359 */
360 private class ExportThread extends DirectoryThread
361 {
362 // Id of server that will receive updates
363 private short target;
364
365 /**
366 * Constructor for the ExportThread.
367 *
368 * @param target Id of server that will receive updates
369 */
370 public ExportThread(short target)
371 {
372 super("Export thread");
373 this.target = target;
374 }
375
376 /**
377 * Run method for this class.
378 */
379 public void run()
380 {
381 if (debugEnabled())
382 {
383 TRACER.debugInfo("Export thread starting.");
384 }
385
386 try
387 {
388 initializeRemote(target, target, null);
389 } catch (DirectoryException de)
390 {
391 // An error message has been sent to the peer
392 // Nothing more to do locally
393 }
394 if (debugEnabled())
395 {
396 TRACER.debugInfo("Export thread stopping.");
397 }
398 }
399 }
400
401 /**
402 * Creates a new ReplicationDomain using configuration from configEntry.
403 *
404 * @param configuration The configuration of this ReplicationDomain.
405 * @param updateToReplayQueue The queue for update messages to replay.
406 * @throws ConfigException In case of invalid configuration.
407 */
408 public ReplicationDomain(ReplicationDomainCfg configuration,
409 LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
410 throws ConfigException
411 {
412 super("replicationDomain_" + configuration.getBaseDN());
413
414 // Read the configuration parameters.
415 replicationServers = configuration.getReplicationServer();
416 serverId = (short) configuration.getServerId();
417 baseDN = configuration.getBaseDN();
418 window = configuration.getWindowSize();
419 heartbeatInterval = configuration.getHeartbeatInterval();
420 isolationpolicy = configuration.getIsolationPolicy();
421 configDn = configuration.dn();
422 this.updateToReplayQueue = updateToReplayQueue;
423
424 /*
425 * Modify conflicts are solved for all suffixes but the schema suffix
426 * because we don't want to store extra information in the schema
427 * ldif files.
428 * This has no negative impact because the changes on schema should
429 * not produce conflicts.
430 */
431 if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
432 {
433 solveConflictFlag = false;
434 }
435 else
436 {
437 solveConflictFlag = true;
438 }
439
440 /*
441 * Create a new Persistent Server State that will be used to store
442 * the last ChangeNmber seen from all LDAP servers in the topology.
443 */
444 state = new PersistentServerState(baseDN, serverId);
445
446 /*
447 * Create a replication monitor object responsible for publishing
448 * monitoring information below cn=monitor.
449 */
450 monitor = new ReplicationMonitor(this);
451 DirectoryServer.registerMonitorProvider(monitor);
452
453 Backend backend = retrievesBackend(baseDN);
454 if (backend == null)
455 {
456 throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
457 baseDN.toNormalizedString()));
458 }
459
460 try
461 {
462 generationId = loadGenerationId();
463 }
464 catch (DirectoryException e)
465 {
466 logError(ERR_LOADING_GENERATION_ID.get(
467 baseDN.toNormalizedString(), e.getLocalizedMessage()));
468 }
469
470 /*
471 * create the broker object used to publish and receive changes
472 */
473 broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
474 maxReceiveDelay, maxSendQueue, maxSendDelay, window,
475 heartbeatInterval, generationId,
476 new ReplSessionSecurity(configuration));
477
478 broker.start(replicationServers);
479
480 /*
481 * ChangeNumberGenerator is used to create new unique ChangeNumbers
482 * for each operation done on this replication domain.
483 *
484 * The generator time is adjusted to the time of the last CN received from
485 * remote other servers.
486 */
487 generator =
488 new ChangeNumberGenerator(serverId, state);
489
490 pendingChanges =
491 new PendingChanges(generator,
492 broker, state);
493
494 remotePendingChanges = new RemotePendingChanges(generator, state);
495
496 // listen for changes on the configuration
497 configuration.addChangeListener(this);
498
499 // register as an AltertGenerator
500 DirectoryServer.registerAlertGenerator(this);
501 }
502
503
504 /**
505 * Returns the base DN of this ReplicationDomain.
506 *
507 * @return The base DN of this ReplicationDomain
508 */
509 public DN getBaseDN()
510 {
511 return baseDN;
512 }
513
514 /**
515 * Implement the handleConflictResolution phase of the deleteOperation.
516 *
517 * @param deleteOperation The deleteOperation.
518 * @return A SynchronizationProviderResult indicating if the operation
519 * can continue.
520 */
521 public SynchronizationProviderResult handleConflictResolution(
522 PreOperationDeleteOperation deleteOperation)
523 {
524 if ((!deleteOperation.isSynchronizationOperation())
525 && (!brokerIsConnected(deleteOperation)))
526 {
527 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
528 return new SynchronizationProviderResult.StopProcessing(
529 ResultCode.UNWILLING_TO_PERFORM, msg);
530 }
531
532 DeleteContext ctx =
533 (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
534 Entry deletedEntry = deleteOperation.getEntryToDelete();
535
536 if (ctx != null)
537 {
538 /*
539 * This is a replication operation
540 * Check that the modified entry has the same entryuuid
541 * has was in the original message.
542 */
543 String operationEntryUUID = ctx.getEntryUid();
544 String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
545 if (!operationEntryUUID.equals(modifiedEntryUUID))
546 {
547 /*
548 * The changes entry is not the same entry as the one on
549 * the original change was performed.
550 * Probably the original entry was renamed and replaced with
551 * another entry.
552 * We must not let the change proceed, return a negative
553 * result and set the result code to NO_SUCH_OBJET.
554 * When the operation will return, the thread that started the
555 * operation will try to find the correct entry and restart a new
556 * operation.
557 */
558 return new SynchronizationProviderResult.StopProcessing(
559 ResultCode.NO_SUCH_OBJECT, null);
560 }
561 }
562 else
563 {
564 // There is no replication context attached to the operation
565 // so this is not a replication operation.
566 ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
567 String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
568 ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
569 deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
570 }
571 return new SynchronizationProviderResult.ContinueProcessing();
572 }
573
574 /**
575 * Implement the handleConflictResolution phase of the addOperation.
576 *
577 * @param addOperation The AddOperation.
578 * @return A SynchronizationProviderResult indicating if the operation
579 * can continue.
580 */
581 public SynchronizationProviderResult handleConflictResolution(
582 PreOperationAddOperation addOperation)
583 {
584 if ((!addOperation.isSynchronizationOperation())
585 && (!brokerIsConnected(addOperation)))
586 {
587 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
588 return new SynchronizationProviderResult.StopProcessing(
589 ResultCode.UNWILLING_TO_PERFORM, msg);
590 }
591
592 if (addOperation.isSynchronizationOperation())
593 {
594 AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
595 /*
596 * If an entry with the same entry uniqueID already exist then
597 * this operation has already been replayed in the past.
598 */
599 String uuid = ctx.getEntryUid();
600 if (findEntryDN(uuid) != null)
601 {
602 return new SynchronizationProviderResult.StopProcessing(
603 ResultCode.CANCELED, null);
604 }
605
606 /* The parent entry may have been renamed here since the change was done
607 * on the first server, and another entry have taken the former dn
608 * of the parent entry
609 */
610
611 String parentUid = ctx.getParentUid();
612 // root entry have no parent,
613 // there is no need to check for it.
614 if (parentUid != null)
615 {
616 // There is a potential of perfs improvement here
617 // if we could avoid the following parent entry retrieval
618 DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
619
620 if (parentDnFromCtx == null)
621 {
622 // The parent does not exist with the specified unique id
623 // stop the operation with NO_SUCH_OBJECT and let the
624 // conflict resolution or the dependency resolution solve this.
625 return new SynchronizationProviderResult.StopProcessing(
626 ResultCode.NO_SUCH_OBJECT, null);
627 }
628 else
629 {
630 DN entryDN = addOperation.getEntryDN();
631 DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
632 if ((parentDnFromEntryDn != null)
633 && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
634 {
635 // parentEntry has been renamed
636 // replication name conflict resolution is expected to fix that
637 // later in the flow
638 return new SynchronizationProviderResult.StopProcessing(
639 ResultCode.NO_SUCH_OBJECT, null);
640 }
641 }
642 }
643 }
644 return new SynchronizationProviderResult.ContinueProcessing();
645 }
646
647 /**
648 * Check that the broker associated to this ReplicationDomain has found
649 * a Replication Server and that this LDAP server is therefore able to
650 * process operations.
651 * If not set the ResultCode and the response message,
652 * interrupt the operation, and return false
653 *
654 * @param op The Operation that needs to be checked.
655 *
656 * @return true when it OK to process the Operation, false otherwise.
657 * When false is returned the resultCode and the reponse message
658 * is also set in the Operation.
659 */
660 private boolean brokerIsConnected(PreOperationOperation op)
661 {
662 if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
663 {
664 // this policy imply that we always accept updates.
665 return true;
666 }
667 if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
668 {
669 // this isolation policy specifies that the updates are denied
670 // when the broker is not connected.
671 return broker.isConnected();
672 }
673 // we should never get there as the only possible policies are
674 // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
675 return true;
676 }
677
678
679 /**
680 * Implement the handleConflictResolution phase of the ModifyDNOperation.
681 *
682 * @param modifyDNOperation The ModifyDNOperation.
683 * @return A SynchronizationProviderResult indicating if the operation
684 * can continue.
685 */
686 public SynchronizationProviderResult handleConflictResolution(
687 PreOperationModifyDNOperation modifyDNOperation)
688 {
689 if ((!modifyDNOperation.isSynchronizationOperation())
690 && (!brokerIsConnected(modifyDNOperation)))
691 {
692 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
693 return new SynchronizationProviderResult.StopProcessing(
694 ResultCode.UNWILLING_TO_PERFORM, msg);
695 }
696
697 ModifyDnContext ctx =
698 (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
699 if (ctx != null)
700 {
701 /*
702 * This is a replication operation
703 * Check that the modified entry has the same entryuuid
704 * as was in the original message.
705 */
706 String modifiedEntryUUID =
707 Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
708 if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
709 {
710 /*
711 * The modified entry is not the same entry as the one on
712 * the original change was performed.
713 * Probably the original entry was renamed and replaced with
714 * another entry.
715 * We must not let the change proceed, return a negative
716 * result and set the result code to NO_SUCH_OBJET.
717 * When the operation will return, the thread that started the
718 * operation will try to find the correct entry and restart a new
719 * operation.
720 */
721 return new SynchronizationProviderResult.StopProcessing(
722 ResultCode.NO_SUCH_OBJECT, null);
723 }
724 if (modifyDNOperation.getNewSuperior() != null)
725 {
726 /*
727 * Also check that the current id of the
728 * parent is the same as when the operation was performed.
729 */
730 String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
731 if ((newParentId != null) &&
732 (!newParentId.equals(ctx.getNewParentId())))
733 {
734 return new SynchronizationProviderResult.StopProcessing(
735 ResultCode.NO_SUCH_OBJECT, null);
736 }
737 }
738 }
739 else
740 {
741 // There is no replication context attached to the operation
742 // so this is not a replication operation.
743 ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
744 String newParentId = null;
745 if (modifyDNOperation.getNewSuperior() != null)
746 {
747 newParentId = findEntryId(modifyDNOperation.getNewSuperior());
748 }
749
750 Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
751 String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
752 ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
753 modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
754 }
755 return new SynchronizationProviderResult.ContinueProcessing();
756 }
757
758 /**
759 * Handle the conflict resolution.
760 * Called by the core server after locking the entry and before
761 * starting the actual modification.
762 * @param modifyOperation the operation
763 * @return code indicating is operation must proceed
764 */
765 public SynchronizationProviderResult handleConflictResolution(
766 PreOperationModifyOperation modifyOperation)
767 {
768 if ((!modifyOperation.isSynchronizationOperation())
769 && (!brokerIsConnected(modifyOperation)))
770 {
771 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
772 return new SynchronizationProviderResult.StopProcessing(
773 ResultCode.UNWILLING_TO_PERFORM, msg);
774 }
775
776 ModifyContext ctx =
777 (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
778
779 Entry modifiedEntry = modifyOperation.getModifiedEntry();
780 if (ctx == null)
781 {
782 // There is no replication context attached to the operation
783 // so this is not a replication operation.
784 ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
785 String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
786 if (modifiedEntryUUID == null)
787 modifiedEntryUUID = modifyOperation.getEntryDN().toString();
788 ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
789 modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
790 }
791 else
792 {
793 // This is a replayed operation, it is necessary to
794 // - check if the entry has been renamed
795 // - check for conflicts
796 String modifiedEntryUUID = ctx.getEntryUid();
797 String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
798 if ((currentEntryUUID != null) &&
799 (!currentEntryUUID.equals(modifiedEntryUUID)))
800 {
801 /*
802 * The current modified entry is not the same entry as the one on
803 * the original modification was performed.
804 * Probably the original entry was renamed and replaced with
805 * another entry.
806 * We must not let the modification proceed, return a negative
807 * result and set the result code to NO_SUCH_OBJET.
808 * When the operation will return, the thread that started the
809 * operation will try to find the correct entry and restart a new
810 * operation.
811 */
812 return new SynchronizationProviderResult.StopProcessing(
813 ResultCode.NO_SUCH_OBJECT, null);
814 }
815
816 /*
817 * Solve the conflicts between modify operations
818 */
819 Historical historicalInformation = Historical.load(modifiedEntry);
820 modifyOperation.setAttachment(Historical.HISTORICAL,
821 historicalInformation);
822
823 if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
824 {
825 numResolvedModifyConflicts.incrementAndGet();
826 }
827
828 if (modifyOperation.getModifications().isEmpty())
829 {
830 /*
831 * This operation becomes a no-op due to conflict resolution
832 * stop the processing and send an OK result
833 */
834 return new SynchronizationProviderResult.StopProcessing(
835 ResultCode.SUCCESS, null);
836 }
837 }
838 return new SynchronizationProviderResult.ContinueProcessing();
839 }
840
841 /**
842 * The preOperation phase for the add Operation.
843 * Its job is to generate the replication context associated to the
844 * operation. It is necessary to do it in this phase because contrary to
845 * the other operations, the entry uid is not set when the handleConflict
846 * phase is called.
847 *
848 * @param addOperation The Add Operation.
849 */
850 public void doPreOperation(PreOperationAddOperation addOperation)
851 {
852 AddContext ctx = new AddContext(generateChangeNumber(addOperation),
853 Historical.getEntryUuid(addOperation),
854 findEntryId(addOperation.getEntryDN().getParentDNInSuffix()));
855
856 addOperation.setAttachment(SYNCHROCONTEXT, ctx);
857 }
858
859 /**
860 * Receives an update message from the replicationServer.
861 * also responsible for updating the list of pending changes
862 * @return the received message - null if none
863 */
864 public UpdateMessage receive()
865 {
866 UpdateMessage update = null;
867
868 while (update == null)
869 {
870 InitializeRequestMessage initMsg = null;
871 ReplicationMessage msg;
872 try
873 {
874 msg = broker.receive();
875 if (msg == null)
876 {
877 // The server is in the shutdown process
878 return null;
879 }
880
881 if (debugEnabled())
882 if (!(msg instanceof HeartbeatMessage))
883 TRACER.debugVerbose("Message received <" + msg + ">");
884
885 if (msg instanceof AckMessage)
886 {
887 AckMessage ack = (AckMessage) msg;
888 receiveAck(ack);
889 }
890 else if (msg instanceof InitializeRequestMessage)
891 {
892 // Another server requests us to provide entries
893 // for a total update
894 initMsg = (InitializeRequestMessage)msg;
895 }
896 else if (msg instanceof InitializeTargetMessage)
897 {
898 // Another server is exporting its entries to us
899 InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
900
901 try
902 {
903 // This must be done while we are still holding the
904 // broker lock because we are now going to receive a
905 // bunch of entries from the remote server and we
906 // want the import thread to catch them and
907 // not the ListenerThread.
908 initialize(importMsg);
909 }
910 catch(DirectoryException de)
911 {
912 // Returns an error message to notify the sender
913 ErrorMessage errorMsg =
914 new ErrorMessage(importMsg.getsenderID(),
915 de.getMessageObject());
916 MessageBuilder mb = new MessageBuilder();
917 mb.append(de.getMessageObject());
918 TRACER.debugInfo(Message.toString(mb.toMessage()));
919 broker.publish(errorMsg);
920 }
921 }
922 else if (msg instanceof ErrorMessage)
923 {
924 if (ieContext != null)
925 {
926 // This is an error termination for the 2 following cases :
927 // - either during an export
928 // - or before an import really started
929 // For example, when we publish a request and the
930 // replicationServer did not find any import source.
931 abandonImportExport((ErrorMessage)msg);
932 }
933 else
934 {
935 /* We can receive an error message from the replication server
936 * in the following cases :
937 * - we connected with an incorrect generation id
938 */
939 ErrorMessage errorMsg = (ErrorMessage)msg;
940 logError(ERR_ERROR_MSG_RECEIVED.get(
941 errorMsg.getDetails()));
942 }
943 }
944 else if (msg instanceof UpdateMessage)
945 {
946 update = (UpdateMessage) msg;
947 receiveUpdate(update);
948 }
949 }
950 catch (SocketTimeoutException e)
951 {
952 // just retry
953 }
954 // Test if we have received and export request message and
955 // if that's the case handle it now.
956 // This must be done outside of the portion of code protected
957 // by the broker lock so that we keep receiveing update
958 // when we are doing and export and so that a possible
959 // closure of the socket happening when we are publishing the
960 // entries to the remote can be handled by the other
961 // replay thread when they call this method and therefore the
962 // broker.receive() method.
963 if (initMsg != null)
964 {
965 // Do this work in a thread to allow replay thread continue working
966 ExportThread exportThread = new ExportThread(initMsg.getsenderID());
967 exportThread.start();
968 }
969 }
970 return update;
971 }
972
973 /**
974 * Do the necessary processing when an UpdateMessage was received.
975 *
976 * @param update The received UpdateMessage.
977 */
978 public void receiveUpdate(UpdateMessage update)
979 {
980 remotePendingChanges.putRemoteUpdate(update);
981 numRcvdUpdates.incrementAndGet();
982 }
983
984 /**
985 * Do the necessary processing when an AckMessage is received.
986 *
987 * @param ack The AckMessage that was received.
988 */
989 public void receiveAck(AckMessage ack)
990 {
991 UpdateMessage update;
992 ChangeNumber changeNumber = ack.getChangeNumber();
993
994 synchronized (waitingAckMsgs)
995 {
996 update = waitingAckMsgs.remove(changeNumber);
997 }
998 if (update != null)
999 {
1000 synchronized (update)
1001 {
1002 update.notify();
1003 }
1004 }
1005 }
1006
1007 /**
1008 * Check if an operation must be synchronized.
1009 * Also update the list of pending changes and the server RUV
1010 * @param op the operation
1011 */
1012 public void synchronize(PostOperationOperation op)
1013 {
1014 ResultCode result = op.getResultCode();
1015 if ((result == ResultCode.SUCCESS) && op.isSynchronizationOperation())
1016 {
1017 numReplayedPostOpCalled++;
1018 }
1019 UpdateMessage msg = null;
1020
1021 // Note that a failed non-replication operation might not have a change
1022 // number.
1023 ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
1024
1025 boolean isAssured = isAssured(op);
1026
1027 if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
1028 {
1029 // Generate a replication message for a successful non-replication
1030 // operation.
1031 msg = UpdateMessage.generateMsg(op, isAssured);
1032
1033 if (msg == null)
1034 {
1035 /*
1036 * This is an operation type that we do not know about
1037 * It should never happen.
1038 */
1039 pendingChanges.remove(curChangeNumber);
1040 Message message =
1041 ERR_UNKNOWN_TYPE.get(op.getOperationType().toString());
1042 logError(message);
1043 return;
1044 }
1045 }
1046
1047 if (result == ResultCode.SUCCESS)
1048 {
1049 try
1050 {
1051 if (op.isSynchronizationOperation())
1052 {
1053 remotePendingChanges.commit(curChangeNumber);
1054 }
1055 else
1056 {
1057 pendingChanges.commit(curChangeNumber, msg);
1058 }
1059 }
1060 catch (NoSuchElementException e)
1061 {
1062 Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get(
1063 curChangeNumber.toString(), op.toString());
1064 logError(message);
1065 return;
1066 }
1067
1068 if (msg != null && isAssured)
1069 {
1070 synchronized (waitingAckMsgs)
1071 {
1072 // Add the assured message to the list of update that are
1073 // waiting acknowledgements
1074 waitingAckMsgs.put(curChangeNumber, msg);
1075 }
1076 }
1077
1078 if (generationIdSavedStatus != true)
1079 {
1080 this.saveGenerationId(generationId);
1081 }
1082 }
1083 else if (!op.isSynchronizationOperation())
1084 {
1085 // Remove an unsuccessful non-replication operation from the pending
1086 // changes list.
1087 if (curChangeNumber != null)
1088 {
1089 pendingChanges.remove(curChangeNumber);
1090 }
1091 }
1092
1093 if (!op.isSynchronizationOperation())
1094 {
1095 int pushedChanges = pendingChanges.pushCommittedChanges();
1096 numSentUpdates.addAndGet(pushedChanges);
1097 }
1098
1099 // Wait for acknowledgement of an assured message.
1100 if (msg != null && isAssured)
1101 {
1102 synchronized (msg)
1103 {
1104 while (waitingAckMsgs.containsKey(msg.getChangeNumber()))
1105 {
1106 // TODO : should have a configurable timeout to get
1107 // out of this loop
1108 try
1109 {
1110 msg.wait(1000);
1111 } catch (InterruptedException e)
1112 { }
1113 }
1114 }
1115 }
1116 }
1117
1118 /**
1119 * get the number of updates received by the replication plugin.
1120 *
1121 * @return the number of updates received
1122 */
1123 public int getNumRcvdUpdates()
1124 {
1125 if (numRcvdUpdates != null)
1126 return numRcvdUpdates.get();
1127 else
1128 return 0;
1129 }
1130
1131 /**
1132 * Get the number of updates sent by the replication plugin.
1133 *
1134 * @return the number of updates sent
1135 */
1136 public int getNumSentUpdates()
1137 {
1138 if (numSentUpdates != null)
1139 return numSentUpdates.get();
1140 else
1141 return 0;
1142 }
1143
1144 /**
1145 * Get the number of updates in the pending list.
1146 *
1147 * @return The number of updates in the pending list
1148 */
1149 public int getPendingUpdatesCount()
1150 {
1151 if (pendingChanges != null)
1152 return pendingChanges.size();
1153 else
1154 return 0;
1155 }
1156
1157 /**
1158 * Increment the number of processed updates.
1159 */
1160 public void incProcessedUpdates()
1161 {
1162 numProcessedUpdates.incrementAndGet();
1163 }
1164
1165 /**
1166 * get the number of updates replayed by the replication.
1167 *
1168 * @return The number of updates replayed by the replication
1169 */
1170 public int getNumProcessedUpdates()
1171 {
1172 if (numProcessedUpdates != null)
1173 return numProcessedUpdates.get();
1174 else
1175 return 0;
1176 }
1177
1178 /**
1179 * get the number of updates replayed successfully by the replication.
1180 *
1181 * @return The number of updates replayed successfully
1182 */
1183 public int getNumReplayedPostOpCalled()
1184 {
1185 return numReplayedPostOpCalled;
1186 }
1187
1188 /**
1189 * get the ServerState.
1190 *
1191 * @return the ServerState
1192 */
1193 public ServerState getServerState()
1194 {
1195 return state;
1196 }
1197
1198 /**
1199 * Get the debugCount.
1200 *
1201 * @return Returns the debugCount.
1202 */
1203 public int getDebugCount()
1204 {
1205 return debugCount;
1206 }
1207
1208 /**
1209 * Send an Ack message.
1210 *
1211 * @param changeNumber The ChangeNumber for which the ack must be sent.
1212 */
1213 public void ack(ChangeNumber changeNumber)
1214 {
1215 broker.publish(new AckMessage(changeNumber));
1216 }
1217
1218 /**
1219 * {@inheritDoc}
1220 */
1221 @Override
1222 public void run()
1223 {
1224 done = false;
1225
1226 // Create the listener thread
1227 listenerThread = new ListenerThread(this, updateToReplayQueue);
1228 listenerThread.start();
1229
1230 while (shutdown == false)
1231 {
1232 try
1233 {
1234 synchronized (this)
1235 {
1236 this.wait(1000);
1237 if (!disabled && !stateSavingDisabled )
1238 {
1239 // save the RUV
1240 state.save();
1241 }
1242 }
1243 } catch (InterruptedException e)
1244 { }
1245 }
1246 state.save();
1247
1248 done = true;
1249 }
1250
1251 /**
1252 * Shutdown this ReplicationDomain.
1253 */
1254 public void shutdown()
1255 {
1256 // stop the flush thread
1257 shutdown = true;
1258
1259 // Stop the listener thread
1260 if (listenerThread != null)
1261 {
1262 listenerThread.shutdown();
1263 }
1264
1265 synchronized (this)
1266 {
1267 this.notify();
1268 }
1269
1270 DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
1271
1272 DirectoryServer.deregisterAlertGenerator(this);
1273
1274 // stop the ReplicationBroker
1275 broker.stop();
1276
1277 // Wait for the listener thread to stop
1278 if (listenerThread != null)
1279 listenerThread.waitForShutdown();
1280
1281 // wait for completion of the persistentServerState thread.
1282 try
1283 {
1284 while (!done)
1285 {
1286 Thread.sleep(50);
1287 }
1288 } catch (InterruptedException e)
1289 {
1290 // stop waiting when interrupted.
1291 }
1292 }
1293
1294 /**
1295 * Get the name of the replicationServer to which this domain is currently
1296 * connected.
1297 *
1298 * @return the name of the replicationServer to which this domain
1299 * is currently connected.
1300 */
1301 public String getReplicationServer()
1302 {
1303 if (broker != null)
1304 return broker.getReplicationServer();
1305 else
1306 return "Not connected";
1307 }
1308
1309 /**
1310 * Create and replay a synchronized Operation from an UpdateMessage.
1311 *
1312 * @param msg The UpdateMessage to be replayed.
1313 */
1314 public void replay(UpdateMessage msg)
1315 {
1316 Operation op = null;
1317 boolean done = false;
1318 boolean dependency = false;
1319 ChangeNumber changeNumber = null;
1320 int retryCount = 10;
1321 boolean firstTry = true;
1322
1323 // Try replay the operation, then flush (replaying) any pending operation
1324 // whose dependency has been replayed until no more left.
1325 do
1326 {
1327 try
1328 {
1329 while ((!dependency) && (!done) && (retryCount-- > 0))
1330 {
1331 op = msg.createOperation(conn);
1332
1333 op.setInternalOperation(true);
1334 op.setSynchronizationOperation(true);
1335 changeNumber = OperationContext.getChangeNumber(op);
1336 ((AbstractOperation) op).run();
1337
1338 // Try replay the operation
1339 ResultCode result = op.getResultCode();
1340
1341 if (result != ResultCode.SUCCESS)
1342 {
1343 if (op instanceof ModifyOperation)
1344 {
1345 ModifyOperation newOp = (ModifyOperation) op;
1346 dependency = remotePendingChanges.checkDependencies(newOp);
1347 if ((!dependency) && (!firstTry))
1348 {
1349 done = solveNamingConflict(newOp, msg);
1350 }
1351 } else if (op instanceof DeleteOperation)
1352 {
1353 DeleteOperation newOp = (DeleteOperation) op;
1354 dependency = remotePendingChanges.checkDependencies(newOp);
1355 if ((!dependency) && (!firstTry))
1356 {
1357 done = solveNamingConflict(newOp, msg);
1358 }
1359 } else if (op instanceof AddOperation)
1360 {
1361 AddOperation newOp = (AddOperation) op;
1362 AddMsg addMsg = (AddMsg) msg;
1363 dependency = remotePendingChanges.checkDependencies(newOp);
1364 if ((!dependency) && (!firstTry))
1365 {
1366 done = solveNamingConflict(newOp, addMsg);
1367 }
1368 } else if (op instanceof ModifyDNOperationBasis)
1369 {
1370 ModifyDNMsg newMsg = (ModifyDNMsg) msg;
1371 dependency = remotePendingChanges.checkDependencies(newMsg);
1372 if ((!dependency) && (!firstTry))
1373 {
1374 ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
1375 done = solveNamingConflict(newOp, msg);
1376 }
1377 } else
1378 {
1379 done = true; // unknown type of operation ?!
1380 }
1381 if (done)
1382 {
1383 // the update became a dummy update and the result
1384 // of the conflict resolution phase is to do nothing.
1385 // however we still need to push this change to the serverState
1386 updateError(changeNumber);
1387 }
1388 } else
1389 {
1390 done = true;
1391 }
1392 firstTry = false;
1393 }
1394
1395 if (!done && !dependency)
1396 {
1397 // Continue with the next change but the servers could now become
1398 // inconsistent.
1399 // Let the repair tool know about this.
1400 Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
1401 op.getErrorMessage().toString());
1402 logError(message);
1403 numUnresolvedNamingConflicts.incrementAndGet();
1404
1405 updateError(changeNumber);
1406 }
1407 } catch (ASN1Exception e)
1408 {
1409 Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1410 String.valueOf(msg) + stackTraceToSingleLineString(e));
1411 logError(message);
1412 } catch (LDAPException e)
1413 {
1414 Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1415 String.valueOf(msg) + stackTraceToSingleLineString(e));
1416 logError(message);
1417 } catch (DataFormatException e)
1418 {
1419 Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1420 String.valueOf(msg) + stackTraceToSingleLineString(e));
1421 logError(message);
1422 } catch (Exception e)
1423 {
1424 if (changeNumber != null)
1425 {
1426 /*
1427 * An Exception happened during the replay process.
1428 * Continue with the next change but the servers will now start
1429 * to be inconsistent.
1430 * Let the repair tool know about this.
1431 */
1432 Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
1433 stackTraceToSingleLineString(e), op.toString());
1434 logError(message);
1435 updateError(changeNumber);
1436 } else
1437 {
1438 Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
1439 String.valueOf(msg) + stackTraceToSingleLineString(e));
1440 logError(message);
1441 }
1442 } finally
1443 {
1444 if (!dependency)
1445 {
1446 broker.updateWindowAfterReplay();
1447 if (msg.isAssured())
1448 ack(msg.getChangeNumber());
1449 incProcessedUpdates();
1450 }
1451 }
1452
1453 // Now replay any pending update that had a dependency and whose
1454 // dependency has been replayed, do that until no more updates of that
1455 // type left...
1456 msg = remotePendingChanges.getNextUpdate();
1457
1458 // Prepare restart of loop
1459 done = false;
1460 dependency = false;
1461 changeNumber = null;
1462 retryCount = 10;
1463 firstTry = true;
1464
1465 } while (msg != null);
1466 }
1467
1468 /**
1469 * This method is called when an error happens while replaying
1470 * an operation.
1471 * It is necessary because the postOperation does not always get
1472 * called when error or Exceptions happen during the operation replay.
1473 *
1474 * @param changeNumber the ChangeNumber of the operation with error.
1475 */
1476 public void updateError(ChangeNumber changeNumber)
1477 {
1478 remotePendingChanges.commit(changeNumber);
1479 }
1480
1481 /**
1482 * Generate a new change number and insert it in the pending list.
1483 *
1484 * @param operation The operation for which the change number must be
1485 * generated.
1486 * @return The new change number.
1487 */
1488 private ChangeNumber generateChangeNumber(PluginOperation operation)
1489 {
1490 return pendingChanges.putLocalOperation(operation);
1491 }
1492
1493
1494 /**
1495 * Find the Unique Id of the entry with the provided DN by doing a
1496 * search of the entry and extracting its uniqueID from its attributes.
1497 *
1498 * @param dn The dn of the entry for which the unique Id is searched.
1499 *
1500 * @return The unique Id of the entry whith the provided DN.
1501 */
1502 private String findEntryId(DN dn)
1503 {
1504 if (dn == null)
1505 return null;
1506 try
1507 {
1508 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
1509 attrs.add(ENTRYUIDNAME);
1510 InternalSearchOperation search = conn.processSearch(dn,
1511 SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
1512 0, 0, false,
1513 SearchFilter.createFilterFromString("objectclass=*"),
1514 attrs);
1515
1516 if (search.getResultCode() == ResultCode.SUCCESS)
1517 {
1518 LinkedList<SearchResultEntry> result = search.getSearchEntries();
1519 if (!result.isEmpty())
1520 {
1521 SearchResultEntry resultEntry = result.getFirst();
1522 if (resultEntry != null)
1523 {
1524 return Historical.getEntryUuid(resultEntry);
1525 }
1526 }
1527 }
1528 } catch (DirectoryException e)
1529 {
1530 // never happens because the filter is always valid.
1531 }
1532 return null;
1533 }
1534
1535 /**
1536 * find the current dn of an entry from its entry uuid.
1537 *
1538 * @param uuid the Entry Unique ID.
1539 * @return The curernt dn of the entry or null if there is no entry with
1540 * the specified uuid.
1541 */
1542 private DN findEntryDN(String uuid)
1543 {
1544 try
1545 {
1546 InternalSearchOperation search = conn.processSearch(baseDN,
1547 SearchScope.WHOLE_SUBTREE,
1548 SearchFilter.createFilterFromString("entryuuid="+uuid));
1549 if (search.getResultCode() == ResultCode.SUCCESS)
1550 {
1551 LinkedList<SearchResultEntry> result = search.getSearchEntries();
1552 if (!result.isEmpty())
1553 {
1554 SearchResultEntry resultEntry = result.getFirst();
1555 if (resultEntry != null)
1556 {
1557 return resultEntry.getDN();
1558 }
1559 }
1560 }
1561 } catch (DirectoryException e)
1562 {
1563 // never happens because the filter is always valid.
1564 }
1565 return null;
1566 }
1567
1568 /**
1569 * Solve a conflict detected when replaying a modify operation.
1570 *
1571 * @param op The operation that triggered the conflict detection.
1572 * @param msg The operation that triggered the conflict detection.
1573 * @return true if the process is completed, false if it must continue..
1574 */
1575 private boolean solveNamingConflict(ModifyOperation op,
1576 UpdateMessage msg)
1577 {
1578 ResultCode result = op.getResultCode();
1579 ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
1580 String entryUid = ctx.getEntryUid();
1581
1582 if (result == ResultCode.NO_SUCH_OBJECT)
1583 {
1584 /*
1585 * The operation is a modification but
1586 * the entry has been renamed on a different master in the same time.
1587 * search if the entry has been renamed, and return the new dn
1588 * of the entry.
1589 */
1590 DN newdn = findEntryDN(entryUid);
1591 if (newdn != null)
1592 {
1593 // There is an entry with the same unique id as this modify operation
1594 // replay the modify using the current dn of this entry.
1595 msg.setDn(newdn.toString());
1596 numResolvedNamingConflicts.incrementAndGet();
1597 return false;
1598 }
1599 else
1600 {
1601 // This entry does not exist anymore.
1602 // It has probably been deleted, stop the processing of this operation
1603 numResolvedNamingConflicts.incrementAndGet();
1604 return true;
1605 }
1606 }
1607 else
1608 {
1609 // The other type of errors can not be caused by naming conflicts.
1610 // Log a message for the repair tool.
1611 Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1612 op.toString(), ctx.getChangeNumber().toString(),
1613 result.toString(), op.getErrorMessage().toString());
1614 logError(message);
1615 return true;
1616 }
1617 }
1618
1619 /**
1620 * Solve a conflict detected when replaying a delete operation.
1621 *
1622 * @param op The operation that triggered the conflict detection.
1623 * @param msg The operation that triggered the conflict detection.
1624 * @return true if the process is completed, false if it must continue..
1625 */
1626 private boolean solveNamingConflict(DeleteOperation op,
1627 UpdateMessage msg)
1628 {
1629 ResultCode result = op.getResultCode();
1630 DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
1631 String entryUid = ctx.getEntryUid();
1632
1633 if (result == ResultCode.NO_SUCH_OBJECT)
1634 {
1635 /*
1636 * Find if the entry is still in the database.
1637 */
1638 DN currentDn = findEntryDN(entryUid);
1639 if (currentDn == null)
1640 {
1641 /*
1642 * The entry has already been deleted, either because this delete
1643 * has already been replayed or because another concurrent delete
1644 * has already done the job.
1645 * In any case, there is is nothing more to do.
1646 */
1647 numResolvedNamingConflicts.incrementAndGet();
1648 return true;
1649 }
1650 else
1651 {
1652 /*
1653 * This entry has been renamed, replay the delete using its new DN.
1654 */
1655 msg.setDn(currentDn.toString());
1656 numResolvedNamingConflicts.incrementAndGet();
1657 return false;
1658 }
1659 }
1660 else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
1661 {
1662 /*
1663 * This may happen when we replay a DELETE done on a master
1664 * but children of this entry have been added on another master.
1665 *
1666 * Rename all the children by adding entryuuid in dn and delete this entry.
1667 *
1668 * The action taken here must be consistent with the actions
1669 * done in the solveNamingConflict(AddOperation) method
1670 * when we are adding an entry whose parent entry has already been deleted.
1671 */
1672 findAndRenameChild(entryUid, op.getEntryDN(), op);
1673 numUnresolvedNamingConflicts.incrementAndGet();
1674 return false;
1675 }
1676 else
1677 {
1678 // The other type of errors can not be caused by naming conflicts.
1679 // Log a message for the repair tool.
1680 Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1681 op.toString(), ctx.getChangeNumber().toString(),
1682 result.toString(), op.getErrorMessage().toString());
1683 logError(message);
1684 return true;
1685 }
1686 }
1687
1688 /**
1689 * Solve a conflict detected when replaying a Modify DN operation.
1690 *
1691 * @param op The operation that triggered the conflict detection.
1692 * @param msg The operation that triggered the conflict detection.
1693 * @return true if the process is completed, false if it must continue.
1694 * @throws Exception When the operation is not valid.
1695 */
1696 private boolean solveNamingConflict(ModifyDNOperation op,
1697 UpdateMessage msg) throws Exception
1698 {
1699 ResultCode result = op.getResultCode();
1700 ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
1701 String entryUid = ctx.getEntryUid();
1702 String newSuperiorID = ctx.getNewParentId();
1703
1704 /*
1705 * four possible cases :
1706 * - the modified entry has been renamed
1707 * - the new parent has been renamed
1708 * - the operation is replayed for the second time.
1709 * - the entry has been deleted
1710 * action :
1711 * - change the target dn and the new parent dn and
1712 * restart the operation,
1713 * - don't do anything if the operation is replayed.
1714 */
1715
1716 // Construct the new DN to use for the entry.
1717 DN entryDN = op.getEntryDN();
1718 DN newSuperior = findEntryDN(newSuperiorID);
1719 RDN newRDN = op.getNewRDN();
1720 DN parentDN;
1721
1722 if (newSuperior == null)
1723 {
1724 parentDN = entryDN.getParent();
1725 }
1726 else
1727 {
1728 parentDN = newSuperior;
1729 }
1730
1731 if ((parentDN == null) || parentDN.isNullDN())
1732 {
1733 /* this should never happen
1734 * can't solve any conflict in this case.
1735 */
1736 throw new Exception("operation parameters are invalid");
1737 }
1738
1739 DN newDN = parentDN.concat(newRDN);
1740
1741 // get the current DN of this entry in the database.
1742 DN currentDN = findEntryDN(entryUid);
1743
1744 if (currentDN == null)
1745 {
1746 // The entry targetted by the Modify DN is not in the database
1747 // anymore.
1748 // This is a conflict between a delete and this modify DN.
1749 // The entry has been deleted anymore so we can safely assume
1750 // that the operation is completed.
1751 numResolvedNamingConflicts.incrementAndGet();
1752 return true;
1753 }
1754
1755 // if the newDN and the current DN match then the operation
1756 // is a no-op (this was probably a second replay)
1757 // don't do anything.
1758 if (newDN.equals(currentDN))
1759 {
1760 numResolvedNamingConflicts.incrementAndGet();
1761 return true;
1762 }
1763
1764 // If we could not find the new parent entry, we missed this entry
1765 // earlier or it has disappeared from the database
1766 // Log this information for the repair tool and mark the entry
1767 // as conflicting.
1768 // stop the processing.
1769 if (newSuperior == null)
1770 {
1771 markConflictEntry(op, currentDN, newDN);
1772 numUnresolvedNamingConflicts.incrementAndGet();
1773 return true;
1774 }
1775
1776 if ((result == ResultCode.NO_SUCH_OBJECT) ||
1777 (result == ResultCode.OBJECTCLASS_VIOLATION))
1778 {
1779 /*
1780 * The entry or it's new parent has not been found
1781 * reconstruct the operation with the DN we just built
1782 */
1783 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
1784 msg.setDn(currentDN.toString());
1785 modifyDnMsg.setNewSuperior(newSuperior.toString());
1786 numResolvedNamingConflicts.incrementAndGet();
1787 return false;
1788 }
1789 else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
1790 {
1791 /*
1792 * This may happen when two modifyDn operation
1793 * are done on different servers but with the same target DN
1794 * add the conflict object class to the entry
1795 * and rename it using its entryuuid.
1796 */
1797 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
1798 markConflictEntry(op, op.getEntryDN(), newDN);
1799 modifyDnMsg.setNewRDN(generateConflictRDN(entryUid,
1800 modifyDnMsg.getNewRDN()));
1801 modifyDnMsg.setNewSuperior(newSuperior.toString());
1802 numUnresolvedNamingConflicts.incrementAndGet();
1803 return false;
1804 }
1805 else
1806 {
1807 // The other type of errors can not be caused by naming conflicts.
1808 // Log a message for the repair tool.
1809 Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1810 op.toString(), ctx.getChangeNumber().toString(),
1811 result.toString(), op.getErrorMessage().toString());
1812 logError(message);
1813 return true;
1814 }
1815 }
1816
1817
1818 /**
1819 * Solve a conflict detected when replaying a ADD operation.
1820 *
1821 * @param op The operation that triggered the conflict detection.
1822 * @param msg The message that triggered the conflict detection.
1823 * @return true if the process is completed, false if it must continue.
1824 * @throws Exception When the operation is not valid.
1825 */
1826 private boolean solveNamingConflict(AddOperation op,
1827 AddMsg msg) throws Exception
1828 {
1829 ResultCode result = op.getResultCode();
1830 AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
1831 String entryUid = ctx.getEntryUid();
1832 String parentUniqueId = ctx.getParentUid();
1833
1834 if (result == ResultCode.NO_SUCH_OBJECT)
1835 {
1836 /*
1837 * This can happen if the parent has been renamed or deleted
1838 * find the parent dn and calculate a new dn for the entry
1839 */
1840 if (parentUniqueId == null)
1841 {
1842 /*
1843 * This entry is the base dn of the backend.
1844 * It is quite surprising that the operation result be NO_SUCH_OBJECT.
1845 * There is nothing more we can do except TODO log a
1846 * message for the repair tool to look at this problem.
1847 */
1848 return true;
1849 }
1850 DN parentDn = findEntryDN(parentUniqueId);
1851 if (parentDn == null)
1852 {
1853 /*
1854 * The parent has been deleted
1855 * rename the entry as a conflicting entry.
1856 * The action taken here must be consistent with the actions
1857 * done when in the solveNamingConflict(DeleteOperation) method
1858 * when we are deleting an entry that have some child entries.
1859 */
1860 addConflict(msg);
1861
1862 msg.setDn(generateConflictRDN(entryUid,
1863 op.getEntryDN().getRDN().toString()) + ","
1864 + baseDN);
1865 // reset the parent uid so that the check done is the handleConflict
1866 // phase does not fail.
1867 msg.setParentUid(null);
1868 numUnresolvedNamingConflicts.incrementAndGet();
1869 return false;
1870 }
1871 else
1872 {
1873 RDN entryRdn = DN.decode(msg.getDn()).getRDN();
1874 msg.setDn(entryRdn + "," + parentDn);
1875 numResolvedNamingConflicts.incrementAndGet();
1876 return false;
1877 }
1878 }
1879 else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
1880 {
1881 /*
1882 * This can happen if
1883 * - two adds are done on different servers but with the
1884 * same target DN.
1885 * - the same ADD is being replayed for the second time on this server.
1886 * if the nsunique ID already exist, assume this is a replay and
1887 * don't do anything
1888 * if the entry unique id do not exist, generate conflict.
1889 */
1890 if (findEntryDN(entryUid) != null)
1891 {
1892 // entry already exist : this is a replay
1893 return true;
1894 }
1895 else
1896 {
1897 addConflict(msg);
1898 msg.setDn(generateConflictRDN(entryUid, msg.getDn()));
1899 numUnresolvedNamingConflicts.incrementAndGet();
1900 return false;
1901 }
1902 }
1903 else
1904 {
1905 // The other type of errors can not be caused by naming conflicts.
1906 // log a message for the repair tool.
1907 Message message = ERR_ERROR_REPLAYING_OPERATION.get(
1908 op.toString(), ctx.getChangeNumber().toString(),
1909 result.toString(), op.getErrorMessage().toString());
1910 logError(message);
1911 return true;
1912 }
1913 }
1914
1915 /**
1916 * Find all the entries below the provided DN and rename them
1917 * so that they stay below the baseDn of this replicationDomain and
1918 * use the conflicting name and attribute.
1919 *
1920 * @param entryUid The unique ID of the entry whose child must be renamed.
1921 * @param entryDN The DN of the entry whose child must be renamed.
1922 * @param conflictOp The Operation that generated the conflict.
1923 */
1924 private void findAndRenameChild(
1925 String entryUid, DN entryDN, Operation conflictOp)
1926 {
1927 // Find an rename child entries.
1928 InternalClientConnection conn =
1929 InternalClientConnection.getRootConnection();
1930
1931 try
1932 {
1933 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
1934 attrs.add(ENTRYUIDNAME);
1935
1936 SearchFilter ALLMATCH;
1937 ALLMATCH = SearchFilter.createFilterFromString("(objectClass=*)");
1938 InternalSearchOperation op =
1939 conn.processSearch(entryDN, SearchScope.SINGLE_LEVEL,
1940 DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, ALLMATCH,
1941 attrs);
1942
1943 if (op.getResultCode() == ResultCode.SUCCESS)
1944 {
1945 LinkedList<SearchResultEntry> entries = op.getSearchEntries();
1946 if (entries != null)
1947 {
1948 for (SearchResultEntry entry : entries)
1949 {
1950 markConflictEntry(conflictOp, entry.getDN(), entryDN);
1951 renameConflictEntry(conflictOp, entry.getDN(),
1952 Historical.getEntryUuid(entry));
1953 }
1954 }
1955 }
1956 else
1957 {
1958 // log error and information for the REPAIR tool.
1959 MessageBuilder mb = new MessageBuilder();
1960 mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get());
1961 mb.append(String.valueOf(entryDN));
1962 mb.append(" ");
1963 mb.append(String.valueOf(conflictOp));
1964 mb.append(" ");
1965 mb.append(String.valueOf(op.getResultCode()));
1966 logError(mb.toMessage());
1967 }
1968 } catch (DirectoryException e)
1969 {
1970 // log errror and information for the REPAIR tool.
1971 MessageBuilder mb = new MessageBuilder();
1972 mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get());
1973 mb.append(String.valueOf(entryDN));
1974 mb.append(" ");
1975 mb.append(String.valueOf(conflictOp));
1976 mb.append(" ");
1977 mb.append(e.getLocalizedMessage());
1978 logError(mb.toMessage());
1979 }
1980 }
1981
1982
1983 /**
1984 * Rename an entry that was conflicting so that it stays below the
1985 * baseDN of the replicationDomain.
1986 *
1987 * @param conflictOp The Operation that caused the conflict.
1988 * @param dn The DN of the entry to be renamed.
1989 * @param uid The uniqueID of the entry to be renamed.
1990 */
1991 private void renameConflictEntry(Operation conflictOp, DN dn, String uid)
1992 {
1993 InternalClientConnection conn =
1994 InternalClientConnection.getRootConnection();
1995
1996 ModifyDNOperation newOp = conn.processModifyDN(
1997 dn, generateDeleteConflictDn(uid, dn),false, baseDN);
1998
1999 if (newOp.getResultCode() != ResultCode.SUCCESS)
2000 {
2001 // log information for the repair tool.
2002 MessageBuilder mb = new MessageBuilder();
2003 mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get());
2004 mb.append(String.valueOf(dn));
2005 mb.append(" ");
2006 mb.append(String.valueOf(conflictOp));
2007 mb.append(" ");
2008 mb.append(String.valueOf(newOp.getResultCode()));
2009 logError(mb.toMessage());
2010 }
2011 }
2012
2013
2014 /**
2015 * Generate a modification to add the conflict attribute to an entry
2016 * whose Dn is now conflicting with another entry.
2017 *
2018 * @param op The operation causing the conflict.
2019 * @param currentDN The current DN of the operation to mark as conflicting.
2020 * @param conflictDN The newDn on which the conflict happened.
2021 */
2022 private void markConflictEntry(Operation op, DN currentDN, DN conflictDN)
2023 {
2024 // create new internal modify operation and run it.
2025 InternalClientConnection conn =
2026 InternalClientConnection.getRootConnection();
2027
2028 AttributeType attrType =
2029 DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
2030 LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
2031 values.add(new AttributeValue(attrType, conflictDN.toString()));
2032 Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values);
2033 List<Modification> mods = new ArrayList<Modification>();
2034 Modification mod = new Modification(ModificationType.REPLACE, attr);
2035 mods.add(mod);
2036 ModifyOperation newOp = conn.processModify(currentDN, mods);
2037 if (newOp.getResultCode() != ResultCode.SUCCESS)
2038 {
2039 // Log information for the repair tool.
2040 MessageBuilder mb = new MessageBuilder();
2041 mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
2042 mb.append(String.valueOf(op));
2043 mb.append(" ");
2044 mb.append(String.valueOf(newOp.getResultCode()));
2045 logError(mb.toMessage());
2046 }
2047
2048 // Generate an alert to let the administratot know that some
2049 // conflict could not be solved.
2050 Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString());
2051 DirectoryServer.sendAlertNotification(this,
2052 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2053 }
2054
2055 /**
2056 * Add the conflict attribute to an entry that could
2057 * not be added because it is conflicting with another entry.
2058 *
2059 * @param msg The conflicting Add Operation.
2060 *
2061 * @throws ASN1Exception When an encoding error happenned manipulating the
2062 * msg.
2063 */
2064 private void addConflict(AddMsg msg) throws ASN1Exception
2065 {
2066 // Generate an alert to let the administratot know that some
2067 // conflict could not be solved.
2068 Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
2069 DirectoryServer.sendAlertNotification(this,
2070 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2071
2072 // Add the conflict attribute
2073 msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn());
2074 }
2075
2076 /**
2077 * Generate the Dn to use for a conflicting entry.
2078 *
2079 * @param entryUid The unique identifier of the entry involved in the
2080 * conflict.
2081 * @param rdn Original rdn.
2082 * @return The generated RDN for a conflicting entry.
2083 */
2084 private String generateConflictRDN(String entryUid, String rdn)
2085 {
2086 return "entryuuid=" + entryUid + "+" + rdn;
2087 }
2088
2089 /**
2090 * Generate the RDN to use for a conflicting entry whose father was deleted.
2091 *
2092 * @param entryUid The unique identifier of the entry involved in the
2093 * conflict.
2094 * @param dn The original DN of the entry.
2095 *
2096 * @return The generated RDN for a conflicting entry.
2097 * @throws DirectoryException
2098 */
2099 private RDN generateDeleteConflictDn(String entryUid, DN dn)
2100 {
2101 String newRDN = "entryuuid=" + entryUid + "+" + dn.getRDN();
2102 RDN rdn = null;
2103 try
2104 {
2105 rdn = RDN.decode(newRDN);
2106 } catch (DirectoryException e)
2107 {
2108 // cannot happen
2109 }
2110 return rdn;
2111 }
2112
2113 /**
2114 * Check if an operation must be processed as an assured operation.
2115 *
2116 * @param op the operation to be checked.
2117 * @return true if the operations must be processed as an assured operation.
2118 */
2119 private boolean isAssured(PostOperationOperation op)
2120 {
2121 // TODO : should have a filtering mechanism for checking
2122 // operation that are assured and operations that are not.
2123 return false;
2124 }
2125
2126 /**
2127 * Get the maximum receive window size.
2128 *
2129 * @return The maximum receive window size.
2130 */
2131 public int getMaxRcvWindow()
2132 {
2133 if (broker != null)
2134 return broker.getMaxRcvWindow();
2135 else
2136 return 0;
2137 }
2138
2139 /**
2140 * Get the current receive window size.
2141 *
2142 * @return The current receive window size.
2143 */
2144 public int getCurrentRcvWindow()
2145 {
2146 if (broker != null)
2147 return broker.getCurrentRcvWindow();
2148 else
2149 return 0;
2150 }
2151
2152 /**
2153 * Get the maximum send window size.
2154 *
2155 * @return The maximum send window size.
2156 */
2157 public int getMaxSendWindow()
2158 {
2159 if (broker != null)
2160 return broker.getMaxSendWindow();
2161 else
2162 return 0;
2163 }
2164
2165 /**
2166 * Get the current send window size.
2167 *
2168 * @return The current send window size.
2169 */
2170 public int getCurrentSendWindow()
2171 {
2172 if (broker != null)
2173 return broker.getCurrentSendWindow();
2174 else
2175 return 0;
2176 }
2177
2178 /**
2179 * Get the number of times the replication connection was lost.
2180 * @return The number of times the replication connection was lost.
2181 */
2182 public int getNumLostConnections()
2183 {
2184 if (broker != null)
2185 return broker.getNumLostConnections();
2186 else
2187 return 0;
2188 }
2189
2190 /**
2191 * Get the number of modify conflicts successfully resolved.
2192 * @return The number of modify conflicts successfully resolved.
2193 */
2194 public int getNumResolvedModifyConflicts()
2195 {
2196 return numResolvedModifyConflicts.get();
2197 }
2198
2199 /**
2200 * Get the number of namign conflicts successfully resolved.
2201 * @return The number of naming conflicts successfully resolved.
2202 */
2203 public int getNumResolvedNamingConflicts()
2204 {
2205 return numResolvedNamingConflicts.get();
2206 }
2207
2208 /**
2209 * Get the number of unresolved conflicts.
2210 * @return The number of unresolved conflicts.
2211 */
2212 public int getNumUnresolvedNamingConflicts()
2213 {
2214 return numUnresolvedNamingConflicts.get();
2215 }
2216
2217 /**
2218 * Get the server ID.
2219 * @return The server ID.
2220 */
2221 public int getServerId()
2222 {
2223 return serverId;
2224 }
2225
2226 /**
2227 * Check if the domain solve conflicts.
2228 *
2229 * @return a boolean indicating if the domain should sove conflicts.
2230 */
2231 public boolean solveConflict()
2232 {
2233 return solveConflictFlag;
2234 }
2235
2236 /**
2237 * Disable the replication on this domain.
2238 * The session to the replication server will be stopped.
2239 * The domain will not be destroyed but call to the pre-operation
2240 * methods will result in failure.
2241 * The listener thread will be destroyed.
2242 * The monitor informations will still be accessible.
2243 */
2244 public void disable()
2245 {
2246 state.save();
2247 state.clearInMemory();
2248 disabled = true;
2249
2250 // Stop the listener thread
2251 listenerThread.shutdown();
2252
2253 broker.stop(); // This will cut the session and wake up the listener
2254
2255 // Wait for the listener thread to stop
2256 listenerThread.waitForShutdown();
2257 }
2258
2259 /**
2260 * Do what necessary when the data have changed : load state, load
2261 * generation Id.
2262 * @exception DirectoryException Thrown when an error occurs.
2263 */
2264 protected void loadDataState()
2265 throws DirectoryException
2266 {
2267 state.clearInMemory();
2268 state.loadState();
2269 generator.adjust(state.getMaxChangeNumber(serverId));
2270 // Retrieves the generation ID associated with the data imported
2271 generationId = loadGenerationId();
2272 }
2273
2274 /**
2275 * Enable back the domain after a previous disable.
2276 * The domain will connect back to a replication Server and
2277 * will recreate threads to listen for messages from the Sycnhronization
2278 * server.
2279 * The generationId will be retrieved or computed if necessary.
2280 * The ServerState will also be read again from the local database.
2281 */
2282 public void enable()
2283 {
2284 try
2285 {
2286 loadDataState();
2287 }
2288 catch (Exception e)
2289 {
2290 /* TODO should mark that replicationServer service is
2291 * not available, log an error and retry upon timeout
2292 * should we stop the modifications ?
2293 */
2294 logError(ERR_LOADING_GENERATION_ID.get(
2295 baseDN.toNormalizedString(), e.getLocalizedMessage()));
2296 return;
2297 }
2298
2299 // After an on-line import, the value of the generationId is new
2300 // and it is necessary for the broker to send this new value as part
2301 // of the serverStart message.
2302 broker.setGenerationId(generationId);
2303
2304 broker.start(replicationServers);
2305
2306 // Create the listener thread
2307 listenerThread = new ListenerThread(this, updateToReplayQueue);
2308 listenerThread.start();
2309
2310 disabled = false;
2311 }
2312
2313 /**
2314 * Compute the data generationId associated with the current data present
2315 * in the backend for this domain.
2316 * @return The computed generationId.
2317 * @throws DirectoryException When an error occurs.
2318 */
2319 public long computeGenerationId() throws DirectoryException
2320 {
2321 Backend backend = retrievesBackend(baseDN);
2322 long bec = backend.numSubordinates(baseDN, true) + 1;
2323 this.acquireIEContext();
2324 ieContext.checksumOutput = true;
2325 ieContext.entryCount = (bec<1000?bec:1000);
2326 ieContext.entryLeftCount = ieContext.entryCount;
2327 exportBackend();
2328 long genId = ieContext.checksumOutputValue;
2329
2330 if (debugEnabled())
2331 TRACER.debugInfo("Computed generationId: #entries=" + bec +
2332 " generationId=" + ieContext.checksumOutputValue);
2333 ieContext.checksumOutput = false;
2334 this.releaseIEContext();
2335 return genId;
2336 }
2337
2338 /**
2339 * Returns the generationId set for this domain.
2340 *
2341 * @return The generationId.
2342 */
2343 public long getGenerationId()
2344 {
2345 return generationId;
2346 }
2347
2348 /**
2349 * The attribute name used to store the state in the backend.
2350 */
2351 protected static final String REPLICATION_GENERATION_ID =
2352 "ds-sync-generation-id";
2353
2354 /**
2355 * Stores the value of the generationId.
2356 * @param generationId The value of the generationId.
2357 * @return a ResultCode indicating if the method was successfull.
2358 */
2359 public ResultCode saveGenerationId(long generationId)
2360 {
2361 // The generationId is stored in the root entry of the domain.
2362 ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
2363
2364 ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
2365 ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
2366 values.add(value);
2367
2368 LDAPAttribute attr =
2369 new LDAPAttribute(REPLICATION_GENERATION_ID, values);
2370 LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
2371 ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
2372 mods.add(mod);
2373
2374 ModifyOperationBasis op =
2375 new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
2376 InternalClientConnection.nextMessageID(),
2377 new ArrayList<Control>(0), asn1BaseDn,
2378 mods);
2379 op.setInternalOperation(true);
2380 op.setSynchronizationOperation(true);
2381 op.setDontSynchronize(true);
2382
2383 op.run();
2384
2385 ResultCode result = op.getResultCode();
2386 if (result != ResultCode.SUCCESS)
2387 {
2388 generationIdSavedStatus = false;
2389 if (result != ResultCode.NO_SUCH_OBJECT)
2390 {
2391 // The case where the backend is empty (NO_SUCH_OBJECT)
2392 // is not an error case.
2393 Message message = ERR_UPDATING_GENERATION_ID.get(
2394 op.getResultCode().getResultCodeName() + " " +
2395 op.getErrorMessage(),
2396 baseDN.toString());
2397 logError(message);
2398 }
2399 }
2400 else
2401 {
2402 generationIdSavedStatus = true;
2403 }
2404 return result;
2405 }
2406
2407
2408 /**
2409 * Load the GenerationId from the root entry of the domain
2410 * from the REPLICATION_GENERATION_ID attribute in database
2411 * to memory, or compute it if not found.
2412 *
2413 * @return generationId The retrieved value of generationId
2414 * @throws DirectoryException When an error occurs.
2415 */
2416 public long loadGenerationId()
2417 throws DirectoryException
2418 {
2419 long generationId=-1;
2420
2421 if (debugEnabled())
2422 TRACER.debugInfo(
2423 "Attempt to read generation ID from DB " + baseDN.toString());
2424
2425 ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
2426 boolean found = false;
2427 LDAPFilter filter;
2428 try
2429 {
2430 filter = LDAPFilter.decode("objectclass=*");
2431 }
2432 catch (LDAPException e)
2433 {
2434 // can not happen
2435 return -1;
2436 }
2437
2438 /*
2439 * Search the database entry that is used to periodically
2440 * save the ServerState
2441 */
2442 InternalSearchOperation search = null;
2443 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
2444 attributes.add(REPLICATION_GENERATION_ID);
2445 search = conn.processSearch(asn1BaseDn,
2446 SearchScope.BASE_OBJECT,
2447 DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
2448 filter,attributes);
2449 if (((search.getResultCode() != ResultCode.SUCCESS)) &&
2450 ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
2451 {
2452 Message message = ERR_SEARCHING_GENERATION_ID.get(
2453 search.getResultCode().getResultCodeName() + " " +
2454 search.getErrorMessage(),
2455 baseDN.toString());
2456 logError(message);
2457 }
2458
2459 SearchResultEntry resultEntry = null;
2460 if (search.getResultCode() == ResultCode.SUCCESS)
2461 {
2462 LinkedList<SearchResultEntry> result = search.getSearchEntries();
2463 resultEntry = result.getFirst();
2464 if (resultEntry != null)
2465 {
2466 AttributeType synchronizationGenIDType =
2467 DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
2468 List<Attribute> attrs =
2469 resultEntry.getAttribute(synchronizationGenIDType);
2470 if (attrs != null)
2471 {
2472 Attribute attr = attrs.get(0);
2473 LinkedHashSet<AttributeValue> values = attr.getValues();
2474 if (values.size()>1)
2475 {
2476 Message message = ERR_LOADING_GENERATION_ID.get(
2477 baseDN.toString(), "#Values=" + values.size() +
2478 " Must be exactly 1 in entry " +
2479 resultEntry.toLDIFString());
2480 logError(message);
2481 }
2482 else if (values.size() == 1)
2483 {
2484 found=true;
2485 try
2486 {
2487 generationId = Long.decode(values.iterator().next().
2488 getStringValue());
2489 }
2490 catch(Exception e)
2491 {
2492 Message message = ERR_LOADING_GENERATION_ID.get(
2493 baseDN.toString(), e.getLocalizedMessage());
2494 logError(message);
2495 }
2496 }
2497 }
2498 }
2499 }
2500
2501 if (!found)
2502 {
2503 generationId = computeGenerationId();
2504 saveGenerationId(generationId);
2505
2506 if (debugEnabled())
2507 TRACER.debugInfo("Generation ID created for domain base DN=" +
2508 baseDN.toString() +
2509 " generationId=" + generationId);
2510 }
2511 else
2512 {
2513 generationIdSavedStatus = true;
2514 if (debugEnabled())
2515 TRACER.debugInfo(
2516 "Generation ID successfully read from domain base DN=" + baseDN +
2517 " generationId=" + generationId);
2518 }
2519 return generationId;
2520 }
2521
2522 /**
2523 * Reset the generationId of this domain in the whole topology.
2524 * A message is sent to the Replication Servers for them to reset
2525 * their change dbs.
2526 *
2527 * @param generationIdNewValue The new value of the generation Id.
2528 */
2529 public void resetGenerationId(Long generationIdNewValue)
2530 {
2531 if (debugEnabled())
2532 TRACER.debugInfo(
2533 this.getName() + "resetGenerationId" + generationIdNewValue);
2534
2535 ResetGenerationId genIdMessage = null;
2536 if (generationIdNewValue == null)
2537 {
2538 genIdMessage = new ResetGenerationId(this.generationId);
2539 }
2540 else
2541 {
2542 genIdMessage = new ResetGenerationId(generationIdNewValue);
2543 }
2544 broker.publish(genIdMessage);
2545 }
2546
2547 /**
2548 * Do whatever is needed when a backup is started.
2549 * We need to make sure that the serverState is correclty save.
2550 */
2551 public void backupStart()
2552 {
2553 state.save();
2554 }
2555
2556 /**
2557 * Do whatever is needed when a backup is finished.
2558 */
2559 public void backupEnd()
2560 {
2561 // Nothing is needed at the moment
2562 }
2563
2564 /*
2565 * Total Update >>
2566 */
2567
2568 /**
2569 * Receives bytes related to an entry in the context of an import to
2570 * initialize the domain (called by ReplLDIFInputStream).
2571 *
2572 * @return The bytes. Null when the Done or Err message has been received
2573 */
2574 public byte[] receiveEntryBytes()
2575 {
2576 ReplicationMessage msg;
2577 while (true)
2578 {
2579 try
2580 {
2581 msg = broker.receive();
2582
2583 if (debugEnabled())
2584 TRACER.debugVerbose(
2585 " sid:" + this.serverId +
2586 " base DN:" + this.baseDN +
2587 " Import EntryBytes received " + msg);
2588 if (msg == null)
2589 {
2590 // The server is in the shutdown process
2591 return null;
2592 }
2593
2594 if (msg instanceof EntryMessage)
2595 {
2596 EntryMessage entryMsg = (EntryMessage)msg;
2597 byte[] entryBytes = entryMsg.getEntryBytes();
2598 ieContext.updateCounters();
2599 return entryBytes;
2600 }
2601 else if (msg instanceof DoneMessage)
2602 {
2603 // This is the normal termination of the import
2604 // No error is stored and the import is ended
2605 // by returning null
2606 return null;
2607 }
2608 else if (msg instanceof ErrorMessage)
2609 {
2610 // This is an error termination during the import
2611 // The error is stored and the import is ended
2612 // by returning null
2613 ErrorMessage errorMsg = (ErrorMessage)msg;
2614 ieContext.exception = new DirectoryException(
2615 ResultCode.OTHER,
2616 errorMsg.getDetails());
2617 return null;
2618 }
2619 else
2620 {
2621 // Other messages received during an import are trashed
2622 }
2623 }
2624 catch(Exception e)
2625 {
2626 // TODO: i18n
2627 ieContext.exception = new DirectoryException(ResultCode.OTHER,
2628 Message.raw("received an unexpected message type" +
2629 e.getLocalizedMessage()));
2630 }
2631 }
2632 }
2633
2634 /**
2635 * Processes an error message received while an import/export is
2636 * on going.
2637 * @param errorMsg The error message received.
2638 */
2639 protected void abandonImportExport(ErrorMessage errorMsg)
2640 {
2641 // FIXME TBD Treat the case where the error happens while entries
2642 // are being exported
2643
2644 if (debugEnabled())
2645 TRACER.debugVerbose(
2646 " abandonImportExport:" + this.serverId +
2647 " base DN:" + this.baseDN +
2648 " Error Msg received " + errorMsg);
2649
2650 if (ieContext != null)
2651 {
2652 ieContext.exception = new DirectoryException(ResultCode.OTHER,
2653 errorMsg.getDetails());
2654
2655 if (ieContext.initializeTask instanceof InitializeTask)
2656 {
2657 // Update the task that initiated the import
2658 ((InitializeTask)ieContext.initializeTask).
2659 updateTaskCompletionState(ieContext.exception);
2660
2661 releaseIEContext();
2662 }
2663 }
2664 }
2665
2666 /**
2667 * Clears all the entries from the JE backend determined by the
2668 * be id passed into the method.
2669 *
2670 * @param createBaseEntry Indicate whether to automatically create the base
2671 * entry and add it to the backend.
2672 * @param beID The be id to clear.
2673 * @param dn The suffix of the backend to create if the the createBaseEntry
2674 * boolean is true.
2675 * @throws Exception If an unexpected problem occurs.
2676 */
2677 public static void clearJEBackend(boolean createBaseEntry, String beID,
2678 String dn) throws Exception
2679 {
2680 BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
2681
2682 // FIXME Should setBackendEnabled be part of TaskUtils ?
2683 TaskUtils.disableBackend(beID);
2684
2685 try
2686 {
2687 String lockFile = LockFileManager.getBackendLockFileName(backend);
2688 StringBuilder failureReason = new StringBuilder();
2689
2690 if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
2691 {
2692 throw new RuntimeException(failureReason.toString());
2693 }
2694
2695 try
2696 {
2697 backend.clearBackend();
2698 }
2699 finally
2700 {
2701 LockFileManager.releaseLock(lockFile, failureReason);
2702 }
2703 }
2704 finally
2705 {
2706 TaskUtils.enableBackend(beID);
2707 }
2708
2709 if (createBaseEntry)
2710 {
2711 DN baseDN = DN.decode(dn);
2712 Entry e = createEntry(baseDN);
2713 backend = (BackendImpl)DirectoryServer.getBackend(beID);
2714 backend.addEntry(e, null);
2715 }
2716 }
2717
2718 /**
2719 * Export the entries from the backend.
2720 * The ieContext must have been set before calling.
2721 *
2722 * @throws DirectoryException when an error occurred
2723 */
2724 protected void exportBackend()
2725 throws DirectoryException
2726 {
2727 Backend backend = retrievesBackend(this.baseDN);
2728
2729 // Acquire a shared lock for the backend.
2730 try
2731 {
2732 String lockFile = LockFileManager.getBackendLockFileName(backend);
2733 StringBuilder failureReason = new StringBuilder();
2734 if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
2735 {
2736 Message message = ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
2737 backend.getBackendID(), String.valueOf(failureReason));
2738 logError(message);
2739 throw new DirectoryException(
2740 ResultCode.OTHER, message, null);
2741 }
2742 }
2743 catch (Exception e)
2744 {
2745 Message message =
2746 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
2747 backend.getBackendID(), e.getLocalizedMessage());
2748 logError(message);
2749 throw new DirectoryException(
2750 ResultCode.OTHER, message, null);
2751 }
2752
2753 OutputStream os;
2754 ReplLDIFOutputStream ros;
2755
2756 if (ieContext.checksumOutput)
2757 {
2758 ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
2759 os = new CheckedOutputStream(ros, new Adler32());
2760 try
2761 {
2762 os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)).
2763 getBytes());
2764 }
2765 catch(Exception e)
2766 {
2767 // Should never happen
2768 }
2769 }
2770 else
2771 {
2772 ros = new ReplLDIFOutputStream(this, (short)-1);
2773 os = ros;
2774 }
2775 LDIFExportConfig exportConfig = new LDIFExportConfig(os);
2776
2777 // baseDN branch is the only one included in the export
2778 List<DN> includeBranches = new ArrayList<DN>(1);
2779 includeBranches.add(this.baseDN);
2780 exportConfig.setIncludeBranches(includeBranches);
2781
2782 // For the checksum computing mode, only consider the 'stable' attributes
2783 if (ieContext.checksumOutput)
2784 {
2785 String includeAttributeStrings[] =
2786 {"objectclass", "sn", "cn", "entryuuid"};
2787 HashSet<AttributeType> includeAttributes;
2788 includeAttributes = new HashSet<AttributeType>();
2789 for (String attrName : includeAttributeStrings)
2790 {
2791 AttributeType attrType = DirectoryServer.getAttributeType(attrName);
2792 if (attrType == null)
2793 {
2794 attrType = DirectoryServer.getDefaultAttributeType(attrName);
2795 }
2796 includeAttributes.add(attrType);
2797 }
2798 exportConfig.setIncludeAttributes(includeAttributes);
2799 }
2800
2801 // Launch the export.
2802 try
2803 {
2804 backend.exportLDIF(exportConfig);
2805 }
2806 catch (DirectoryException de)
2807 {
2808 if ((ieContext != null) && (ieContext.checksumOutput) &&
2809 (ros.getNumExportedEntries() >= ieContext.entryCount))
2810 {
2811 // This is the normal end when computing the generationId
2812 // We can interrupt the export only by an IOException
2813 }
2814 else
2815 {
2816 Message message =
2817 ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
2818 logError(message);
2819 throw new DirectoryException(
2820 ResultCode.OTHER, message, null);
2821 }
2822 }
2823 catch (Exception e)
2824 {
2825 Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(
2826 stackTraceToSingleLineString(e));
2827 logError(message);
2828 throw new DirectoryException(
2829 ResultCode.OTHER, message, null);
2830 }
2831 finally
2832 {
2833
2834 if ((ieContext != null) && (ieContext.checksumOutput))
2835 {
2836 ieContext.checksumOutputValue =
2837 ((CheckedOutputStream)os).getChecksum().getValue();
2838 }
2839 else
2840 {
2841 // Clean up after the export by closing the export config.
2842 // Will also flush the export and export the remaining entries.
2843 // This is a real export where writer has been initialized.
2844 exportConfig.close();
2845 }
2846
2847 // Release the shared lock on the backend.
2848 try
2849 {
2850 String lockFile = LockFileManager.getBackendLockFileName(backend);
2851 StringBuilder failureReason = new StringBuilder();
2852 if (! LockFileManager.releaseLock(lockFile, failureReason))
2853 {
2854 Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
2855 backend.getBackendID(), String.valueOf(failureReason));
2856 logError(message);
2857 throw new DirectoryException(
2858 ResultCode.OTHER, message, null);
2859 }
2860 }
2861 catch (Exception e)
2862 {
2863 Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
2864 backend.getBackendID(), stackTraceToSingleLineString(e));
2865 logError(message);
2866 throw new DirectoryException(
2867 ResultCode.OTHER, message, null);
2868 }
2869 }
2870 }
2871
2872 /**
2873 * Retrieves the backend related to the domain.
2874 *
2875 * @return The backend of that domain.
2876 * @param baseDN The baseDN to retrieve the backend
2877 */
2878 protected static Backend retrievesBackend(DN baseDN)
2879 {
2880 // Retrieves the backend related to this domain
2881 return DirectoryServer.getBackend(baseDN);
2882 }
2883
2884 /**
2885 * Get the internal broker to perform some operations on it.
2886 *
2887 * @return The broker for this domain.
2888 */
2889 ReplicationBroker getBroker()
2890 {
2891 return broker;
2892 }
2893
2894 /**
2895 * Exports an entry in LDIF format.
2896 *
2897 * @param lDIFEntry The entry to be exported..
2898 *
2899 * @throws IOException when an error occurred.
2900 */
2901 public void exportLDIFEntry(String lDIFEntry) throws IOException
2902 {
2903 // If an error was raised - like receiving an ErrorMessage
2904 // we just let down the export.
2905 if (ieContext.exception != null)
2906 {
2907 IOException ioe = new IOException(ieContext.exception.getMessage());
2908 ieContext = null;
2909 throw ioe;
2910 }
2911
2912 if (ieContext.checksumOutput == false)
2913 {
2914 EntryMessage entryMessage = new EntryMessage(
2915 serverId, ieContext.exportTarget, lDIFEntry.getBytes());
2916 broker.publish(entryMessage);
2917 }
2918 try
2919 {
2920 ieContext.updateCounters();
2921 }
2922 catch (DirectoryException de)
2923 {
2924 throw new IOException(de.getMessage());
2925 }
2926 }
2927
2928 /**
2929 * Initializes this domain from another source server.
2930 *
2931 * @param source The source from which to initialize
2932 * @param initTask The task that launched the initialization
2933 * and should be updated of its progress.
2934 * @throws DirectoryException when an error occurs
2935 */
2936 public void initializeFromRemote(short source, Task initTask)
2937 throws DirectoryException
2938 {
2939 if (debugEnabled())
2940 TRACER.debugInfo("Entering initializeFromRemote");
2941
2942 acquireIEContext();
2943 ieContext.initializeTask = initTask;
2944
2945 InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
2946 baseDN, serverId, source);
2947
2948 // Publish Init request msg
2949 broker.publish(initializeMsg);
2950
2951 // .. we expect to receive entries or err after that
2952 }
2953
2954 /**
2955 * Verifies that the given string represents a valid source
2956 * from which this server can be initialized.
2957 * @param sourceString The string representing the source
2958 * @return The source as a short value
2959 * @throws DirectoryException if the string is not valid
2960 */
2961 public short decodeSource(String sourceString)
2962 throws DirectoryException
2963 {
2964 short source = 0;
2965 Throwable cause = null;
2966 try
2967 {
2968 source = Integer.decode(sourceString).shortValue();
2969 if ((source >= -1) && (source != serverId))
2970 {
2971 // TODO Verifies serverID is in the domain
2972 // We shold check here that this is a server implied
2973 // in the current domain.
2974 return source;
2975 }
2976 }
2977 catch(Exception e)
2978 {
2979 cause = e;
2980 }
2981
2982 ResultCode resultCode = ResultCode.OTHER;
2983 Message message = ERR_INVALID_IMPORT_SOURCE.get();
2984 if (cause != null)
2985 {
2986 throw new DirectoryException(
2987 resultCode, message, cause);
2988 }
2989 else
2990 {
2991 throw new DirectoryException(
2992 resultCode, message);
2993 }
2994 }
2995
2996 /**
2997 * Verifies that the given string represents a valid source
2998 * from which this server can be initialized.
2999 * @param targetString The string representing the source
3000 * @return The source as a short value
3001 * @throws DirectoryException if the string is not valid
3002 */
3003 public short decodeTarget(String targetString)
3004 throws DirectoryException
3005 {
3006 short target = 0;
3007 Throwable cause;
3008 if (targetString.equalsIgnoreCase("all"))
3009 {
3010 return RoutableMessage.ALL_SERVERS;
3011 }
3012
3013 // So should be a serverID
3014 try
3015 {
3016 target = Integer.decode(targetString).shortValue();
3017 if (target >= 0)
3018 {
3019 // FIXME Could we check now that it is a know server in the domain ?
3020 }
3021 return target;
3022 }
3023 catch(Exception e)
3024 {
3025 cause = e;
3026 }
3027 ResultCode resultCode = ResultCode.OTHER;
3028 Message message = ERR_INVALID_EXPORT_TARGET.get();
3029
3030 if (cause != null)
3031 throw new DirectoryException(
3032 resultCode, message, cause);
3033 else
3034 throw new DirectoryException(
3035 resultCode, message);
3036
3037 }
3038
3039 private synchronized void acquireIEContext()
3040 throws DirectoryException
3041 {
3042 if (ieContext != null)
3043 {
3044 // Rejects 2 simultaneous exports
3045 Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
3046 throw new DirectoryException(ResultCode.OTHER,
3047 message);
3048 }
3049
3050 ieContext = new IEContext();
3051 }
3052
3053 private synchronized void releaseIEContext()
3054 {
3055 ieContext = null;
3056 }
3057
3058 /**
3059 * Process the initialization of some other server or servers in the topology
3060 * specified by the target argument.
3061 * @param target The target that should be initialized
3062 * @param initTask The task that triggers this initialization and that should
3063 * be updated with its progress.
3064 *
3065 * @exception DirectoryException When an error occurs.
3066 */
3067 public void initializeRemote(short target, Task initTask)
3068 throws DirectoryException
3069 {
3070 initializeRemote(target, serverId, initTask);
3071 }
3072
3073 /**
3074 * Process the initialization of some other server or servers in the topology
3075 * specified by the target argument when this initialization specifying the
3076 * server that requests the initialization.
3077 *
3078 * @param target The target that should be initialized.
3079 * @param requestorID The server that initiated the export.
3080 * @param initTask The task that triggers this initialization and that should
3081 * be updated with its progress.
3082 *
3083 * @exception DirectoryException When an error occurs.
3084 */
3085 public void initializeRemote(short target, short requestorID, Task initTask)
3086 throws DirectoryException
3087 {
3088 try
3089 {
3090 Backend backend = retrievesBackend(this.baseDN);
3091
3092 if (!backend.supportsLDIFExport())
3093 {
3094 Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
3095 backend.getBackendID().toString());
3096 logError(message);
3097 throw new DirectoryException(ResultCode.OTHER, message);
3098 }
3099
3100 acquireIEContext();
3101
3102 // The number of entries to be exported is the number of entries under
3103 // the base DN entry and the base entry itself.
3104 long entryCount = backend.numSubordinates(baseDN, true) + 1;
3105 ieContext.exportTarget = target;
3106 if (initTask != null)
3107 {
3108 ieContext.initializeTask = initTask;
3109 }
3110 ieContext.setCounters(entryCount, entryCount);
3111
3112 // Send start message to the peer
3113 InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
3114 baseDN, serverId, ieContext.exportTarget, requestorID, entryCount);
3115
3116 broker.publish(initializeMessage);
3117
3118 exportBackend();
3119
3120 // Notify the peer of the success
3121 DoneMessage doneMsg = new DoneMessage(serverId,
3122 initializeMessage.getDestination());
3123 broker.publish(doneMsg);
3124
3125 releaseIEContext();
3126 }
3127 catch(DirectoryException de)
3128 {
3129 // Notify the peer of the failure
3130 ErrorMessage errorMsg =
3131 new ErrorMessage(target,
3132 de.getMessageObject());
3133 broker.publish(errorMsg);
3134
3135 releaseIEContext();
3136
3137 throw(de);
3138 }
3139 }
3140
3141 /**
3142 * Process backend before import.
3143 * @param backend The backend.
3144 * @throws Exception
3145 */
3146 private void preBackendImport(Backend backend)
3147 throws Exception
3148 {
3149 // Stop saving state
3150 stateSavingDisabled = true;
3151
3152 // FIXME setBackendEnabled should be part of TaskUtils ?
3153 TaskUtils.disableBackend(backend.getBackendID());
3154
3155 // Acquire an exclusive lock for the backend.
3156 String lockFile = LockFileManager.getBackendLockFileName(backend);
3157 StringBuilder failureReason = new StringBuilder();
3158 if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
3159 {
3160 Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
3161 backend.getBackendID(),
3162 String.valueOf(failureReason));
3163 logError(message);
3164 throw new DirectoryException(ResultCode.OTHER, message);
3165 }
3166 }
3167
3168 /**
3169 * Initializes the domain's backend with received entries.
3170 * @param initializeMessage The message that initiated the import.
3171 * @exception DirectoryException Thrown when an error occurs.
3172 */
3173 protected void initialize(InitializeTargetMessage initializeMessage)
3174 throws DirectoryException
3175 {
3176 LDIFImportConfig importConfig = null;
3177 DirectoryException de = null;
3178
3179 Backend backend = retrievesBackend(baseDN);
3180
3181 try
3182 {
3183 if (!backend.supportsLDIFImport())
3184 {
3185 Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
3186 backend.getBackendID().toString());
3187 logError(message);
3188 de = new DirectoryException(ResultCode.OTHER, message);
3189 }
3190 else
3191 {
3192 if (initializeMessage.getRequestorID() == serverId)
3193 {
3194 // The import responds to a request we did so the IEContext
3195 // is already acquired
3196 }
3197 else
3198 {
3199 acquireIEContext();
3200 }
3201
3202 ieContext.importSource = initializeMessage.getsenderID();
3203 ieContext.entryLeftCount = initializeMessage.getEntryCount();
3204 ieContext.setCounters(initializeMessage.getEntryCount(),
3205 initializeMessage.getEntryCount());
3206
3207 preBackendImport(backend);
3208
3209 ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
3210 importConfig =
3211 new LDIFImportConfig(ieContext.ldifImportInputStream);
3212 List<DN> includeBranches = new ArrayList<DN>();
3213 includeBranches.add(this.baseDN);
3214 importConfig.setIncludeBranches(includeBranches);
3215 importConfig.setAppendToExistingData(false);
3216
3217 // TODO How to deal with rejected entries during the import
3218 importConfig.writeRejectedEntries(
3219 getFileForPath("logs" + File.separator +
3220 "replInitRejectedEntries").getAbsolutePath(),
3221 ExistingFileBehavior.OVERWRITE);
3222
3223 // Process import
3224 backend.importLDIF(importConfig);
3225
3226 stateSavingDisabled = false;
3227 }
3228 }
3229 catch(Exception e)
3230 {
3231 de = new DirectoryException(ResultCode.OTHER,
3232 Message.raw(e.getLocalizedMessage()));
3233 }
3234 finally
3235 {
3236 if ((ieContext != null) && (ieContext.exception != null))
3237 de = ieContext.exception;
3238
3239 // Cleanup
3240 if (importConfig != null)
3241 {
3242 importConfig.close();
3243
3244 // Re-enable backend
3245 closeBackendImport(backend);
3246
3247 backend = retrievesBackend(baseDN);
3248 }
3249
3250 // Update the task that initiated the import
3251 if ((ieContext != null ) && (ieContext.initializeTask != null))
3252 {
3253 ((InitializeTask)ieContext.initializeTask).
3254 updateTaskCompletionState(de);
3255 }
3256 releaseIEContext();
3257 }
3258 // Sends up the root error.
3259 if (de != null)
3260 {
3261 throw de;
3262 }
3263 else
3264 {
3265 loadDataState();
3266
3267 if (debugEnabled())
3268 TRACER.debugInfo(
3269 "After import, the replication plugin restarts connections" +
3270 " to all RSs to provide new generation ID=" + generationId);
3271 broker.setGenerationId(generationId);
3272
3273 // Re-exchange generationID and state with RS
3274 broker.reStart();
3275 }
3276 }
3277
3278 /**
3279 * Make post import operations.
3280 * @param backend The backend implied in the import.
3281 * @exception DirectoryException Thrown when an error occurs.
3282 */
3283 protected void closeBackendImport(Backend backend)
3284 throws DirectoryException
3285 {
3286 String lockFile = LockFileManager.getBackendLockFileName(backend);
3287 StringBuilder failureReason = new StringBuilder();
3288
3289 // Release lock
3290 if (!LockFileManager.releaseLock(lockFile, failureReason))
3291 {
3292 Message message = WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(
3293 backend.getBackendID(), String.valueOf(failureReason));
3294 logError(message);
3295 throw new DirectoryException(ResultCode.OTHER, message);
3296 }
3297
3298 TaskUtils.enableBackend(backend.getBackendID());
3299 }
3300
3301 /**
3302 * Retrieves a replication domain based on the baseDN.
3303 *
3304 * @param baseDN The baseDN of the domain to retrieve
3305 * @return The domain retrieved
3306 * @throws DirectoryException When an error occurred or no domain
3307 * match the provided baseDN.
3308 */
3309 public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
3310 throws DirectoryException
3311 {
3312 ReplicationDomain replicationDomain = null;
3313
3314 // Retrieves the domain
3315 DirectoryServer.getSynchronizationProviders();
3316 for (SynchronizationProvider provider :
3317 DirectoryServer.getSynchronizationProviders())
3318 {
3319 if (!( provider instanceof MultimasterReplication))
3320 {
3321 Message message = ERR_INVALID_PROVIDER.get();
3322 throw new DirectoryException(ResultCode.OTHER,
3323 message);
3324 }
3325
3326 // From the domainDN retrieves the replication domain
3327 ReplicationDomain sdomain =
3328 MultimasterReplication.findDomain(baseDN, null);
3329 if (sdomain == null)
3330 {
3331 break;
3332 }
3333 if (replicationDomain != null)
3334 {
3335 // Should never happen
3336 Message message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
3337 throw new DirectoryException(ResultCode.OTHER,
3338 message);
3339 }
3340 replicationDomain = sdomain;
3341 }
3342
3343 if (replicationDomain == null)
3344 {
3345 MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get());
3346 mb.append(" ");
3347 mb.append(String.valueOf(baseDN));
3348 throw new DirectoryException(ResultCode.OTHER,
3349 mb.toMessage());
3350 }
3351 return replicationDomain;
3352 }
3353
3354 /**
3355 * Returns the backend associated to this domain.
3356 * @return The associated backend.
3357 */
3358 public Backend getBackend()
3359 {
3360 return retrievesBackend(baseDN);
3361 }
3362
3363 /**
3364 * Returns a boolean indiciating if an import or export is currently
3365 * processed.
3366 * @return The status
3367 */
3368 public boolean ieRunning()
3369 {
3370 return (ieContext != null);
3371 }
3372 /*
3373 * <<Total Update
3374 */
3375
3376
3377 /**
3378 * Push the modifications contain the in given parameter has
3379 * a modification that would happen on a local server.
3380 * The modifications are not applied to the local database,
3381 * historical information is not updated but a ChangeNumber
3382 * is generated and the ServerState associated to this domain is
3383 * updated.
3384 * @param modifications The modification to push
3385 */
3386 public void synchronizeModifications(List<Modification> modifications)
3387 {
3388 ModifyOperation opBasis =
3389 new ModifyOperationBasis(InternalClientConnection.getRootConnection(),
3390 InternalClientConnection.nextOperationID(),
3391 InternalClientConnection.nextMessageID(),
3392 null, DirectoryServer.getSchemaDN(),
3393 modifications);
3394 LocalBackendModifyOperation op = new LocalBackendModifyOperation(opBasis);
3395
3396 ChangeNumber cn = generateChangeNumber(op);
3397 OperationContext ctx = new ModifyContext(cn, "schema");
3398 op.setAttachment(SYNCHROCONTEXT, ctx);
3399 op.setResultCode(ResultCode.SUCCESS);
3400 synchronize(op);
3401 }
3402
3403 /**
3404 * Check if the provided configuration is acceptable for add.
3405 *
3406 * @param configuration The configuration to check.
3407 * @param unacceptableReasons When the configuration is not acceptable, this
3408 * table is use to return the reasons why this
3409 * configuration is not acceptbale.
3410 *
3411 * @return true if the configuration is acceptable, false other wise.
3412 */
3413 public static boolean isConfigurationAcceptable(
3414 ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
3415 {
3416 // Check that there is not already a domain with the same DN
3417 DN dn = configuration.getBaseDN();
3418 if (MultimasterReplication.findDomain(dn,null) != null)
3419 {
3420 Message message = ERR_SYNC_INVALID_DN.get();
3421 unacceptableReasons.add(message);
3422 return false;
3423 }
3424
3425 // Check that the base DN is configured as a base-dn of the directory server
3426 if (retrievesBackend(dn) == null)
3427 {
3428 Message message = ERR_UNKNOWN_DN.get(dn.toString());
3429 unacceptableReasons.add(message);
3430 return false;
3431 }
3432 return true;
3433 }
3434
3435 /**
3436 * {@inheritDoc}
3437 */
3438 public ConfigChangeResult applyConfigurationChange(
3439 ReplicationDomainCfg configuration)
3440 {
3441 // server id and base dn are readonly.
3442 // isolationPolicy can be set immediately and will apply
3443 // to the next updates.
3444 // The other parameters needs to be renegociated with the ReplicationServer.
3445 // so that requires restarting the session with the ReplicationServer.
3446 replicationServers = configuration.getReplicationServer();
3447 window = configuration.getWindowSize();
3448 heartbeatInterval = configuration.getHeartbeatInterval();
3449 broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
3450 maxSendQueue, maxSendDelay, window, heartbeatInterval);
3451 isolationpolicy = configuration.getIsolationPolicy();
3452
3453 return new ConfigChangeResult(ResultCode.SUCCESS, false);
3454 }
3455
3456 /**
3457 * {@inheritDoc}
3458 */
3459 public boolean isConfigurationChangeAcceptable(
3460 ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
3461 {
3462 return true;
3463 }
3464
3465 /**
3466 * {@inheritDoc}
3467 */
3468 public LinkedHashMap<String, String> getAlerts()
3469 {
3470 LinkedHashMap<String,String> alerts = new LinkedHashMap<String,String>();
3471
3472 alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT,
3473 ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT);
3474 return alerts;
3475 }
3476
3477 /**
3478 * {@inheritDoc}
3479 */
3480 public String getClassName()
3481 {
3482 return CLASS_NAME;
3483
3484 }
3485
3486 /**
3487 * {@inheritDoc}
3488 */
3489 public DN getComponentEntryDN()
3490 {
3491 return configDn;
3492 }
3493
3494 /**
3495 * Check if the domain is connected to a ReplicationServer.
3496 *
3497 * @return true if the server is connected, false if not.
3498 */
3499 public boolean isConnected()
3500 {
3501 if (broker != null)
3502 return broker.isConnected();
3503 else
3504 return false;
3505 }
3506
3507 /**
3508 * Determine whether the connection to the replication server is encrypted.
3509 * @return true if the connection is encrypted, false otherwise.
3510 */
3511 public boolean isSessionEncrypted()
3512 {
3513 if (broker != null)
3514 return broker.isSessionEncrypted();
3515 else
3516 return false;
3517 }
3518 }