001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.plugin;
028
029 import java.util.ArrayList;
030 import static org.opends.server.replication.plugin.
031 ReplicationRepairRequestControl.*;
032
033 import java.util.HashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.concurrent.LinkedBlockingQueue;
037
038 import org.opends.messages.Message;
039 import org.opends.server.admin.server.ConfigurationAddListener;
040 import org.opends.server.admin.server.ConfigurationChangeListener;
041 import org.opends.server.admin.server.ConfigurationDeleteListener;
042 import org.opends.server.admin.std.server.ReplicationDomainCfg;
043 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
044 import org.opends.server.api.Backend;
045 import org.opends.server.api.BackupTaskListener;
046 import org.opends.server.api.ExportTaskListener;
047 import org.opends.server.api.ImportTaskListener;
048 import org.opends.server.api.RestoreTaskListener;
049 import org.opends.server.api.SynchronizationProvider;
050 import org.opends.server.config.ConfigException;
051 import org.opends.server.core.DirectoryServer;
052 import org.opends.server.types.BackupConfig;
053 import org.opends.server.types.ConfigChangeResult;
054 import org.opends.server.types.Control;
055 import org.opends.server.types.DN;
056 import org.opends.server.types.DirectoryException;
057 import org.opends.server.types.Entry;
058 import org.opends.server.types.LDIFExportConfig;
059 import org.opends.server.types.LDIFImportConfig;
060 import org.opends.server.types.Modification;
061 import org.opends.server.types.Operation;
062 import org.opends.server.types.RestoreConfig;
063 import org.opends.server.types.ResultCode;
064 import org.opends.server.types.SynchronizationProviderResult;
065 import org.opends.server.types.operation.PluginOperation;
066 import org.opends.server.types.operation.PostOperationAddOperation;
067 import org.opends.server.types.operation.PostOperationDeleteOperation;
068 import org.opends.server.types.operation.PostOperationModifyDNOperation;
069 import org.opends.server.types.operation.PostOperationModifyOperation;
070 import org.opends.server.types.operation.PostOperationOperation;
071 import org.opends.server.types.operation.PreOperationAddOperation;
072 import org.opends.server.types.operation.PreOperationDeleteOperation;
073 import org.opends.server.types.operation.PreOperationModifyDNOperation;
074 import org.opends.server.types.operation.PreOperationModifyOperation;
075
076 /**
077 * This class is used to load the Replication code inside the JVM
078 * and to trigger initialization of the replication.
079 *
080 * It also extends the SynchronizationProvider class in order to have some
081 * replication code running during the operation process
082 * as pre-op, conflictRsolution, and post-op.
083 */
084 public class MultimasterReplication
085 extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
086 implements ConfigurationAddListener<ReplicationDomainCfg>,
087 ConfigurationDeleteListener<ReplicationDomainCfg>,
088 ConfigurationChangeListener
089 <ReplicationSynchronizationProviderCfg>,
090 BackupTaskListener, RestoreTaskListener, ImportTaskListener,
091 ExportTaskListener
092 {
093 private ReplicationServerListener replicationServerListener = null;
094 private static Map<DN, ReplicationDomain> domains =
095 new HashMap<DN, ReplicationDomain>() ;
096
097 /**
098 * The queue of received update messages, to be treated by the ReplayThread
099 * threads.
100 */
101 private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
102 new LinkedBlockingQueue<UpdateToReplay>();
103
104 /**
105 * The list of ReplayThread threads.
106 */
107 private static List<ReplayThread> replayThreads =
108 new ArrayList<ReplayThread>();
109
110 /**
111 * The configurable number of replay threads.
112 */
113 private static int replayThreadNumber = 10;
114
115 private boolean isRegistered = false;
116
117 /**
118 * Finds the domain for a given DN.
119 *
120 * @param dn The DN for which the domain must be returned.
121 * @param pluginOp An optional operation for which the check is done.
122 * Can be null is the request has no associated operation.
123 * @return The domain for this DN.
124 */
125 public static ReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
126 {
127 /*
128 * Don't run the special replication code on Operation that are
129 * specifically marked as don't synchronize.
130 */
131 if ((pluginOp != null) && (pluginOp instanceof Operation))
132 {
133 Operation op = ((Operation) pluginOp);
134
135 if (op.dontSynchronize())
136 return null;
137
138 /*
139 * Check if the provided operation is a repair operation and set
140 * the synchronization flags if necessary.
141 * The repair operations are tagged as synchronization operations
142 * so that the core server let the operation modify the entryuuid
143 * and ds-sync-hist attributes.
144 * They are also tagged as dontSynchronize so that the replication
145 * code running later do not generate ChnageNumber, solve conflicts
146 * and forward the operation to the replication server.
147 */
148 for (Control c : op.getRequestControls())
149 {
150 if (c.getOID().equals(OID_REPLICATION_REPAIR_CONTROL))
151 {
152 op.setSynchronizationOperation(true);
153 op.setDontSynchronize(true);
154 // remove this control from the list of controls since
155 // it has now been processed and the local backend will
156 // fail if it finds a control that it does not know about and
157 // that is marked as critical.
158 List<Control> controls = op.getRequestControls();
159 controls.remove(c);
160 return null;
161 }
162 }
163 }
164
165
166 ReplicationDomain domain = null;
167 DN temp = dn;
168 do
169 {
170 domain = domains.get(temp);
171 temp = temp.getParentDNInSuffix();
172 if (temp == null)
173 {
174 break;
175 }
176 } while (domain == null);
177
178 return domain;
179 }
180
181 /**
182 * Creates a new domain from its configEntry, do the
183 * necessary initialization and starts it so that it is
184 * fully operational when this method returns.
185 * @param configuration The entry whith the configuration of this domain.
186 * @return The domain created.
187 * @throws ConfigException When the configuration is not valid.
188 */
189 public static ReplicationDomain createNewDomain(
190 ReplicationDomainCfg configuration)
191 throws ConfigException
192 {
193 ReplicationDomain domain;
194 domain = new ReplicationDomain(configuration, updateToReplayQueue);
195
196 if (domains.size() == 0)
197 {
198 /*
199 * Create the threads that will process incoming update messages
200 */
201 createReplayThreads();
202 }
203
204 domains.put(domain.getBaseDN(), domain);
205 return domain;
206 }
207
208 /**
209 * Deletes a domain.
210 * @param dn : the base DN of the domain to delete.
211 */
212 public static void deleteDomain(DN dn)
213 {
214 ReplicationDomain domain = domains.remove(dn);
215
216 if (domain != null)
217 domain.shutdown();
218
219 // No replay threads running if no replication need
220 if (domains.size() == 0) {
221 stopReplayThreads();
222 }
223 }
224
225 /**
226 * {@inheritDoc}
227 */
228 @Override
229 public void initializeSynchronizationProvider(
230 ReplicationSynchronizationProviderCfg configuration)
231 throws ConfigException
232 {
233 domains.clear();
234 replicationServerListener = new ReplicationServerListener(configuration);
235
236 // Register as an add and delete listener with the root configuration so we
237 // can be notified if Multimaster domain entries are added or removed.
238 configuration.addReplicationDomainAddListener(this);
239 configuration.addReplicationDomainDeleteListener(this);
240
241 // Register as a root configuration listener so that we can be notified if
242 // number of replay threads is changed and apply changes.
243 configuration.addReplicationChangeListener(this);
244
245 replayThreadNumber = configuration.getNumUpdateReplayThreads();
246
247 // Create the list of domains that are already defined.
248 for (String name : configuration.listReplicationDomains())
249 {
250 ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
251 createNewDomain(domain);
252 }
253
254 /*
255 * If any schema changes were made with the server offline, then handle them
256 * now.
257 */
258 List<Modification> offlineSchemaChanges =
259 DirectoryServer.getOfflineSchemaChanges();
260 if ((offlineSchemaChanges != null) && (! offlineSchemaChanges.isEmpty()))
261 {
262 processSchemaChange(offlineSchemaChanges);
263 }
264
265 DirectoryServer.registerBackupTaskListener(this);
266 DirectoryServer.registerRestoreTaskListener(this);
267 DirectoryServer.registerExportTaskListener(this);
268 DirectoryServer.registerImportTaskListener(this);
269
270 DirectoryServer.registerSupportedControl(
271 ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
272 }
273
274 /**
275 * Create the threads that will wait for incoming update messages.
276 */
277 private synchronized static void createReplayThreads()
278 {
279 replayThreads.clear();
280
281 for (int i = 0; i < replayThreadNumber; i++)
282 {
283 ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
284 replayThread.start();
285 replayThreads.add(replayThread);
286 }
287 }
288
289 /**
290 * Stope the threads that are waiting for incoming update messages.
291 */
292 private synchronized static void stopReplayThreads()
293 {
294 // stop the replay threads
295 for (ReplayThread replayThread : replayThreads)
296 {
297 replayThread.shutdown();
298 }
299
300 for (ReplayThread replayThread : replayThreads)
301 {
302 replayThread.waitForShutdown();
303 }
304 replayThreads.clear();
305 }
306
307 /**
308 * {@inheritDoc}
309 */
310 public boolean isConfigurationAddAcceptable(
311 ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
312 {
313 return ReplicationDomain.isConfigurationAcceptable(
314 configuration, unacceptableReasons);
315 }
316
317 /**
318 * {@inheritDoc}
319 */
320 public ConfigChangeResult applyConfigurationAdd(
321 ReplicationDomainCfg configuration)
322 {
323 try
324 {
325 ReplicationDomain rd = createNewDomain(configuration);
326 if (isRegistered)
327 {
328 rd.start();
329 }
330 return new ConfigChangeResult(ResultCode.SUCCESS, false);
331 } catch (ConfigException e)
332 {
333 // we should never get to this point because the configEntry has
334 // already been validated in configAddisAcceptable
335 return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false);
336 }
337 }
338
339 /**
340 * {@inheritDoc}
341 */
342 @Override
343 public void doPostOperation(PostOperationAddOperation addOperation)
344 {
345 DN dn = addOperation.getEntryDN();
346 genericPostOperation(addOperation, dn);
347 }
348
349
350 /**
351 * {@inheritDoc}
352 */
353 @Override
354 public void doPostOperation(PostOperationDeleteOperation deleteOperation)
355 {
356 DN dn = deleteOperation.getEntryDN();
357 genericPostOperation(deleteOperation, dn);
358 }
359
360 /**
361 * {@inheritDoc}
362 */
363 @Override
364 public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation)
365 {
366 DN dn = modifyDNOperation.getEntryDN();
367 genericPostOperation(modifyDNOperation, dn);
368 }
369
370 /**
371 * {@inheritDoc}
372 */
373 @Override
374 public void doPostOperation(PostOperationModifyOperation modifyOperation)
375 {
376 DN dn = modifyOperation.getEntryDN();
377 genericPostOperation(modifyOperation, dn);
378 }
379
380 /**
381 * {@inheritDoc}
382 */
383 @Override
384 public SynchronizationProviderResult handleConflictResolution(
385 PreOperationModifyOperation modifyOperation)
386 {
387 ReplicationDomain domain =
388 findDomain(modifyOperation.getEntryDN(), modifyOperation);
389 if (domain == null)
390 return new SynchronizationProviderResult.ContinueProcessing();
391
392 return domain.handleConflictResolution(modifyOperation);
393 }
394
395 /**
396 * {@inheritDoc}
397 */
398 @Override
399 public SynchronizationProviderResult handleConflictResolution(
400 PreOperationAddOperation addOperation) throws DirectoryException
401 {
402 ReplicationDomain domain =
403 findDomain(addOperation.getEntryDN(), addOperation);
404 if (domain == null)
405 return new SynchronizationProviderResult.ContinueProcessing();
406
407 return domain.handleConflictResolution(addOperation);
408 }
409
410 /**
411 * {@inheritDoc}
412 */
413 @Override
414 public SynchronizationProviderResult handleConflictResolution(
415 PreOperationDeleteOperation deleteOperation) throws DirectoryException
416 {
417 ReplicationDomain domain =
418 findDomain(deleteOperation.getEntryDN(), deleteOperation);
419 if (domain == null)
420 return new SynchronizationProviderResult.ContinueProcessing();
421
422 return domain.handleConflictResolution(deleteOperation);
423 }
424
425 /**
426 * {@inheritDoc}
427 */
428 @Override
429 public SynchronizationProviderResult handleConflictResolution(
430 PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
431 {
432 ReplicationDomain domain =
433 findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
434 if (domain == null)
435 return new SynchronizationProviderResult.ContinueProcessing();
436
437 return domain.handleConflictResolution(modifyDNOperation);
438 }
439
440 /**
441 * {@inheritDoc}
442 */
443 @Override
444 public SynchronizationProviderResult
445 doPreOperation(PreOperationModifyOperation modifyOperation)
446 {
447 DN operationDN = modifyOperation.getEntryDN();
448 ReplicationDomain domain = findDomain(operationDN, modifyOperation);
449
450 if ((domain == null) || (!domain.solveConflict()))
451 return new SynchronizationProviderResult.ContinueProcessing();
452
453 Historical historicalInformation = (Historical)
454 modifyOperation.getAttachment(
455 Historical.HISTORICAL);
456 if (historicalInformation == null)
457 {
458 Entry entry = modifyOperation.getModifiedEntry();
459 historicalInformation = Historical.load(entry);
460 modifyOperation.setAttachment(Historical.HISTORICAL,
461 historicalInformation);
462 }
463
464 historicalInformation.generateState(modifyOperation);
465
466 return new SynchronizationProviderResult.ContinueProcessing();
467 }
468
469 /**
470 * {@inheritDoc}
471 */
472 @Override
473 public SynchronizationProviderResult doPreOperation(
474 PreOperationDeleteOperation deleteOperation) throws DirectoryException
475 {
476 return new SynchronizationProviderResult.ContinueProcessing();
477 }
478
479 /**
480 * {@inheritDoc}
481 */
482 @Override
483 public SynchronizationProviderResult doPreOperation(
484 PreOperationModifyDNOperation modifyDNOperation)
485 throws DirectoryException
486 {
487 return new SynchronizationProviderResult.ContinueProcessing();
488 }
489
490 /**
491 * {@inheritDoc}
492 */
493 @Override
494 public SynchronizationProviderResult doPreOperation(
495 PreOperationAddOperation addOperation)
496 {
497 ReplicationDomain domain =
498 findDomain(addOperation.getEntryDN(), addOperation);
499 if (domain == null)
500 return new SynchronizationProviderResult.ContinueProcessing();
501
502 if (!addOperation.isSynchronizationOperation())
503 domain.doPreOperation(addOperation);
504
505 return new SynchronizationProviderResult.ContinueProcessing();
506 }
507
508
509 /**
510 * {@inheritDoc}
511 */
512 @Override
513 public void finalizeSynchronizationProvider()
514 {
515 isRegistered = false;
516
517 // shutdown all the domains
518 for (ReplicationDomain domain : domains.values())
519 {
520 domain.shutdown();
521 }
522 domains.clear();
523
524 // Stop replay threads
525 stopReplayThreads();
526
527 // shutdown the ReplicationServer Service if necessary
528 if (replicationServerListener != null)
529 replicationServerListener.shutdown();
530
531 DirectoryServer.deregisterBackupTaskListener(this);
532 DirectoryServer.deregisterRestoreTaskListener(this);
533 DirectoryServer.deregisterExportTaskListener(this);
534 DirectoryServer.deregisterImportTaskListener(this);
535 }
536
537 /**
538 * This method is called whenever the server detects a modification
539 * of the schema done by directly modifying the backing files
540 * of the schema backend.
541 * Call the schema Domain if it exists.
542 *
543 * @param modifications The list of modifications that was
544 * applied to the schema.
545 *
546 */
547 @Override
548 public void processSchemaChange(List<Modification> modifications)
549 {
550 ReplicationDomain domain =
551 findDomain(DirectoryServer.getSchemaDN(), null);
552 if (domain != null)
553 domain.synchronizeModifications(modifications);
554 }
555
556 /**
557 * {@inheritDoc}
558 */
559 public void processBackupBegin(Backend backend, BackupConfig config)
560 {
561 for (DN dn : backend.getBaseDNs())
562 {
563 ReplicationDomain domain = findDomain(dn, null);
564 if (domain != null)
565 domain.backupStart();
566 }
567 }
568
569 /**
570 * {@inheritDoc}
571 */
572 public void processBackupEnd(Backend backend, BackupConfig config,
573 boolean successful)
574 {
575 for (DN dn : backend.getBaseDNs())
576 {
577 ReplicationDomain domain = findDomain(dn, null);
578 if (domain != null)
579 domain.backupEnd();
580 }
581 }
582
583 /**
584 * {@inheritDoc}
585 */
586 public void processRestoreBegin(Backend backend, RestoreConfig config)
587 {
588 for (DN dn : backend.getBaseDNs())
589 {
590 ReplicationDomain domain = findDomain(dn, null);
591 if (domain != null)
592 domain.disable();
593 }
594 }
595
596 /**
597 * {@inheritDoc}
598 */
599 public void processRestoreEnd(Backend backend, RestoreConfig config,
600 boolean successful)
601 {
602 for (DN dn : backend.getBaseDNs())
603 {
604 ReplicationDomain domain = findDomain(dn, null);
605 if (domain != null)
606 domain.enable();
607 }
608 }
609
610 /**
611 * {@inheritDoc}
612 */
613 public void processImportBegin(Backend backend, LDIFImportConfig config)
614 {
615 for (DN dn : backend.getBaseDNs())
616 {
617 ReplicationDomain domain = findDomain(dn, null);
618 if (domain != null)
619 domain.disable();
620 }
621 }
622
623 /**
624 * {@inheritDoc}
625 */
626 public void processImportEnd(Backend backend, LDIFImportConfig config,
627 boolean successful)
628 {
629 for (DN dn : backend.getBaseDNs())
630 {
631 ReplicationDomain domain = findDomain(dn, null);
632 if (domain != null)
633 domain.enable();
634 }
635 }
636
637 /**
638 * {@inheritDoc}
639 */
640 public void processExportBegin(Backend backend, LDIFExportConfig config)
641 {
642 for (DN dn : backend.getBaseDNs())
643 {
644 ReplicationDomain domain = findDomain(dn, null);
645 if (domain != null)
646 domain.backupStart();
647 }
648 }
649
650 /**
651 * {@inheritDoc}
652 */
653 public void processExportEnd(Backend backend, LDIFExportConfig config,
654 boolean successful)
655 {
656 for (DN dn : backend.getBaseDNs())
657 {
658 ReplicationDomain domain = findDomain(dn, null);
659 if (domain != null)
660 domain.backupEnd();
661 }
662 }
663
664 /**
665 * {@inheritDoc}
666 */
667 public ConfigChangeResult applyConfigurationDelete(
668 ReplicationDomainCfg configuration)
669 {
670 deleteDomain(configuration.getBaseDN());
671
672 return new ConfigChangeResult(ResultCode.SUCCESS, false);
673 }
674
675 /**
676 * {@inheritDoc}
677 */
678 public boolean isConfigurationDeleteAcceptable(
679 ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
680 {
681 return true;
682 }
683
684 /**
685 * Generic code for all the postOperation entry point.
686 *
687 * @param operation The Operation for which the post-operation is called.
688 * @param dn The Dn for which the post-operation is called.
689 */
690 private void genericPostOperation(PostOperationOperation operation, DN dn)
691 {
692 ReplicationDomain domain = findDomain(dn, operation);
693 if (domain == null)
694 return;
695
696 domain.synchronize(operation);
697
698 return;
699 }
700
701 /**
702 * Returns the replication server listener associated to that Multimaster
703 * Replication.
704 * @return the listener.
705 */
706 public ReplicationServerListener getReplicationServerListener()
707 {
708 return replicationServerListener;
709 }
710
711 /**
712 * {@inheritDoc}
713 */
714 public boolean
715 isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
716 configuration,
717 List<Message> unacceptableReasons)
718 {
719 return true;
720 }
721
722 /**
723 * {@inheritDoc}
724 */
725 public ConfigChangeResult
726 applyConfigurationChange
727 (ReplicationSynchronizationProviderCfg configuration)
728 {
729 int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
730
731 // Stop threads then restart new number of threads
732 stopReplayThreads();
733 replayThreadNumber = numUpdateRepayThread;
734 if (domains.size() > 0)
735 {
736 createReplayThreads();
737 }
738
739 return new ConfigChangeResult(ResultCode.SUCCESS, false);
740 }
741
742 /**
743 * {@inheritDoc}
744 */
745 public void completeSynchronizationProvider()
746 {
747 isRegistered = true;
748
749 // start all the domains
750 for (ReplicationDomain domain : domains.values())
751 {
752 domain.start();
753 }
754 }
755 }