001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2007-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import static org.opends.messages.BackendMessages.*;
029 import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_FINAL_STATUS;
030 import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_PROGRESS_REPORT;
031 import static org.opends.messages.ReplicationMessages.*;
032 import static org.opends.server.loggers.ErrorLogger.logError;
033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
035 import static org.opends.server.util.StaticUtils.*;
036
037 import java.io.BufferedReader;
038 import java.io.ByteArrayInputStream;
039 import java.io.ByteArrayOutputStream;
040 import java.io.File;
041 import java.io.IOException;
042 import java.io.StringReader;
043 import java.util.ArrayList;
044 import java.util.HashMap;
045 import java.util.HashSet;
046 import java.util.Iterator;
047 import java.util.LinkedHashSet;
048 import java.util.LinkedHashMap;
049 import java.util.LinkedList;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Timer;
053 import java.util.TimerTask;
054
055 import org.opends.messages.Message;
056 import org.opends.server.admin.Configuration;
057 import org.opends.server.admin.server.ServerManagementContext;
058 import org.opends.server.admin.std.server.BackendCfg;
059 import org.opends.server.admin.std.server.ReplicationServerCfg;
060 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
061 import org.opends.server.admin.std.server.RootCfg;
062 import org.opends.server.admin.std.server.SynchronizationProviderCfg;
063 import org.opends.server.api.Backend;
064 import org.opends.server.api.SynchronizationProvider;
065 import org.opends.server.backends.jeb.BackupManager;
066 import org.opends.server.config.ConfigException;
067 import org.opends.server.core.AddOperation;
068 import org.opends.server.core.DeleteOperation;
069 import org.opends.server.core.DirectoryServer;
070 import org.opends.server.core.ModifyDNOperation;
071 import org.opends.server.core.ModifyOperation;
072 import org.opends.server.core.SearchOperation;
073 import org.opends.server.loggers.debug.DebugTracer;
074 import org.opends.server.protocols.internal.InternalClientConnection;
075 import org.opends.server.replication.plugin.MultimasterReplication;
076 import org.opends.server.replication.plugin.ReplicationServerListener;
077 import org.opends.server.replication.protocol.AddMsg;
078 import org.opends.server.replication.protocol.DeleteMsg;
079 import org.opends.server.replication.protocol.ModifyDNMsg;
080 import org.opends.server.replication.protocol.ModifyMsg;
081 import org.opends.server.replication.protocol.UpdateMessage;
082 import org.opends.server.types.Attribute;
083 import org.opends.server.types.AttributeType;
084 import org.opends.server.types.AttributeValue;
085 import org.opends.server.types.BackupConfig;
086 import org.opends.server.types.BackupDirectory;
087 import org.opends.server.types.ConditionResult;
088 import org.opends.server.types.Control;
089 import org.opends.server.types.DN;
090 import org.opends.server.types.DebugLogLevel;
091 import org.opends.server.types.DirectoryException;
092 import org.opends.server.types.DereferencePolicy;
093 import org.opends.server.types.Entry;
094 import org.opends.server.types.IndexType;
095 import org.opends.server.types.InitializationException;
096 import org.opends.server.types.LDIFExportConfig;
097 import org.opends.server.types.LDIFImportConfig;
098 import org.opends.server.types.LDIFImportResult;
099 import org.opends.server.types.RawAttribute;
100 import org.opends.server.types.RestoreConfig;
101 import org.opends.server.types.ResultCode;
102 import org.opends.server.types.SearchFilter;
103 import org.opends.server.types.SearchScope;
104 import org.opends.server.types.SearchResultEntry;
105 import org.opends.server.types.ObjectClass;
106 import org.opends.server.util.AddChangeRecordEntry;
107 import org.opends.server.util.DeleteChangeRecordEntry;
108 import org.opends.server.util.LDIFReader;
109 import org.opends.server.util.LDIFWriter;
110 import org.opends.server.util.ModifyChangeRecordEntry;
111 import org.opends.server.util.ModifyDNChangeRecordEntry;
112 import org.opends.server.util.Validator;
113 import static org.opends.server.config.ConfigConstants.ATTR_OBJECTCLASSES_LC;
114 import org.opends.server.protocols.internal.InternalSearchOperation;
115 import static org.opends.server.util.ServerConstants.*;
116
117
118 /**
119 * This class defines a backend that stores its information in an
120 * associated replication server object.
121 * This is primarily intended to take advantage of the backup/restore/
122 * import/export of the backend API, and to provide an LDAP access
123 * to the replication server database.
124 * <BR><BR>
125 * Entries stored in this backend are held in the DB associated with
126 * the replication server.
127 * <BR><BR>
128 * Currently are only implemented the create and restore backup features.
129 *
130 */
131 public class ReplicationBackend
132 extends Backend
133 {
134 /**
135 * The tracer object for the debug logger.
136 */
137 private static final DebugTracer TRACER = getTracer();
138
139 private static final String BASE_DN = "dc=replicationchanges";
140
141 // The base DNs for this backend.
142 private DN[] baseDNs;
143
144 // The base DNs for this backend, in a hash set.
145 private HashSet<DN> baseDNSet;
146
147 // The set of supported controls for this backend.
148 private HashSet<String> supportedControls;
149
150 // The set of supported features for this backend.
151 private HashSet<String> supportedFeatures;
152
153 private ReplicationServer server;
154
155 /**
156 * The configuration of this backend.
157 */
158 private BackendCfg cfg;
159
160 /**
161 * The number of milliseconds between job progress reports.
162 */
163 private long progressInterval = 10000;
164
165 /**
166 * The current number of entries exported.
167 */
168 private long exportedCount = 0;
169
170 /**
171 * The current number of entries skipped.
172 */
173 private long skippedCount = 0;
174
175 //Objectclass for getEntry root entries.
176 private HashMap<ObjectClass,String> rootObjectclasses;
177
178 //Attributes used for getEntry root entries.
179 private LinkedHashMap<AttributeType,List<Attribute>> attributes;
180
181 //Operational attributes used for getEntry root entries.
182 private Map<AttributeType,List<Attribute>> operationalAttributes;
183
184
185 /**
186 * Creates a new backend with the provided information. All backend
187 * implementations must implement a default constructor that use
188 * <CODE>super()</CODE> to invoke this constructor.
189 */
190 public ReplicationBackend()
191 {
192 super();
193 // Perform all initialization in initializeBackend.
194 }
195
196
197 /**
198 * Set the base DNs for this backend. This is used by the unit tests
199 * to set the base DNs without having to provide a configuration
200 * object when initializing the backend.
201 * @param baseDNs The set of base DNs to be served by this memory backend.
202 */
203 public void setBaseDNs(DN[] baseDNs)
204 {
205 this.baseDNs = baseDNs;
206 }
207
208
209
210 /**
211 * {@inheritDoc}
212 */
213 @Override()
214 public void configureBackend(Configuration config) throws ConfigException
215 {
216 if (config != null)
217 {
218 Validator.ensureTrue(config instanceof BackendCfg);
219 cfg = (BackendCfg)config;
220 DN[] baseDNs = new DN[cfg.getBaseDN().size()];
221 cfg.getBaseDN().toArray(baseDNs);
222 setBaseDNs(baseDNs);
223 }
224 }
225
226
227
228 /**
229 * {@inheritDoc}
230 */
231 @Override()
232 public synchronized void initializeBackend()
233 throws ConfigException, InitializationException
234 {
235 if ((baseDNs == null) || (baseDNs.length != 1))
236 {
237 Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get();
238 throw new ConfigException(message);
239 }
240
241 baseDNSet = new HashSet<DN>();
242 for (DN dn : baseDNs)
243 {
244 baseDNSet.add(dn);
245 }
246
247 supportedControls = new HashSet<String>();
248 supportedFeatures = new HashSet<String>();
249
250 for (DN dn : baseDNs)
251 {
252 try
253 {
254 DirectoryServer.registerBaseDN(dn, this, true);
255 }
256 catch (Exception e)
257 {
258 if (debugEnabled())
259 {
260 TRACER.debugCaught(DebugLogLevel.ERROR, e);
261 }
262
263 Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(
264 dn.toString(), getExceptionMessage(e));
265 throw new InitializationException(message, e);
266 }
267 }
268 rootObjectclasses = new LinkedHashMap<ObjectClass,String>(3);
269 rootObjectclasses.put(DirectoryServer.getTopObjectClass(), OC_TOP);
270 ObjectClass domainOC = DirectoryServer.getObjectClass("domain", true);
271 rootObjectclasses.put(domainOC, "domain");
272 ObjectClass objectclassOC =
273 DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true);
274 rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC);
275 attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
276 AttributeType changeType =
277 DirectoryServer.getAttributeType("changetype", true);
278 LinkedHashSet<AttributeValue> valueSet =
279 new LinkedHashSet<AttributeValue>(1);
280 valueSet.add(new AttributeValue(changeType, "add"));
281 Attribute a = new Attribute(changeType, "changetype", valueSet);
282 ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
283 attrList.add(a);
284 attributes.put(changeType, attrList);
285 operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>();
286 }
287
288
289
290 /**
291 * {@inheritDoc}
292 */
293 @Override()
294 public synchronized void finalizeBackend()
295 {
296 for (DN dn : baseDNs)
297 {
298 try
299 {
300 DirectoryServer.deregisterBaseDN(dn);
301 }
302 catch (Exception e)
303 {
304 if (debugEnabled())
305 {
306 TRACER.debugCaught(DebugLogLevel.ERROR, e);
307 }
308 }
309 }
310 }
311
312
313
314 /**
315 * {@inheritDoc}
316 */
317 @Override()
318 public DN[] getBaseDNs()
319 {
320 return baseDNs;
321 }
322
323
324
325 /**
326 * {@inheritDoc}
327 */
328 @Override()
329 public synchronized long getEntryCount()
330 {
331 if (server==null)
332 {
333 try
334 {
335 server = getReplicationServer();
336 if (server == null)
337 {
338 return 0;
339 }
340 }
341 catch(Exception e)
342 {
343 return 0;
344 }
345 }
346
347 //This method only returns the number of actual change entries, the
348 //domain and any baseDN entries are not counted.
349 long retNum=0;
350 Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator();
351 if (rcachei != null)
352 {
353 while (rcachei.hasNext())
354 {
355 ReplicationServerDomain rsd = rcachei.next();
356 retNum += rsd.getChangesCount();
357 }
358 }
359 return retNum;
360
361 }
362
363
364
365 /**
366 * {@inheritDoc}
367 */
368 @Override()
369 public boolean isLocal()
370 {
371 return true;
372 }
373
374
375
376 /**
377 * {@inheritDoc}
378 */
379 @Override()
380 public boolean isIndexed(AttributeType attributeType, IndexType indexType)
381 {
382 return true;
383 }
384
385
386
387 /**
388 * {@inheritDoc}
389 */
390 @Override()
391 public synchronized Entry getEntry(DN entryDN)
392 {
393 Entry e = null;
394 try {
395 if(baseDNSet.contains(entryDN)) {
396 return new Entry(entryDN, rootObjectclasses, attributes,
397 operationalAttributes);
398 } else {
399 InternalClientConnection conn =
400 InternalClientConnection.getRootConnection();
401 SearchFilter filter=
402 SearchFilter.createFilterFromString("(changetype=*)");
403 InternalSearchOperation searchOperation =
404 new InternalSearchOperation(conn,
405 InternalClientConnection.nextOperationID(),
406 InternalClientConnection.nextMessageID(), null, entryDN,
407 SearchScope.BASE_OBJECT,
408 DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false,
409 filter, null, null);
410 search(searchOperation);
411 LinkedList<SearchResultEntry> resultEntries =
412 searchOperation.getSearchEntries();
413 if(resultEntries.size() != 0) {
414 e=resultEntries.getFirst();
415 }
416 }
417 } catch (DirectoryException ex) {
418 e=null;
419 }
420 return e;
421
422 }
423
424
425
426 /**
427 * {@inheritDoc}
428 */
429 @Override()
430 public synchronized boolean entryExists(DN entryDN)
431 {
432 return getEntry(entryDN) != null;
433 }
434
435
436
437 /**
438 * {@inheritDoc}
439 */
440 @Override()
441 public synchronized void addEntry(Entry entry, AddOperation addOperation)
442 throws DirectoryException
443 {
444 Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get();
445 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
446 }
447
448
449
450 /**
451 * {@inheritDoc}
452 */
453 @Override()
454 public synchronized void deleteEntry(DN entryDN,
455 DeleteOperation deleteOperation)
456 throws DirectoryException
457 {
458 Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get();
459 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
460 }
461
462
463
464 /**
465 * {@inheritDoc}
466 */
467 @Override()
468 public synchronized void replaceEntry(Entry entry,
469 ModifyOperation modifyOperation)
470 throws DirectoryException
471 {
472 Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get();
473 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
474 }
475
476
477
478 /**
479 * {@inheritDoc}
480 */
481 @Override()
482 public synchronized void renameEntry(DN currentDN, Entry entry,
483 ModifyDNOperation modifyDNOperation)
484 throws DirectoryException
485 {
486 Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get();
487 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
488 }
489
490
491
492 /**
493 * {@inheritDoc}
494 */
495 @Override()
496 public HashSet<String> getSupportedControls()
497 {
498 return supportedControls;
499 }
500
501
502
503 /**
504 * {@inheritDoc}
505 */
506 @Override()
507 public HashSet<String> getSupportedFeatures()
508 {
509 return supportedFeatures;
510 }
511
512
513
514 /**
515 * {@inheritDoc}
516 */
517 @Override()
518 public boolean supportsLDIFExport()
519 {
520 return true;
521 }
522
523
524
525 /**
526 * {@inheritDoc}
527 */
528 @Override()
529 public synchronized void exportLDIF(LDIFExportConfig exportConfig)
530 throws DirectoryException
531 {
532 List<DN> includeBranches = exportConfig.getIncludeBranches();
533 DN baseDN;
534 ArrayList<ReplicationServerDomain> exportContainers =
535 new ArrayList<ReplicationServerDomain>();
536 if(server == null) {
537 Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get();
538 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message);
539 }
540 Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
541 if (rsdi != null)
542 {
543 while (rsdi.hasNext())
544 {
545 ReplicationServerDomain rc = rsdi.next();
546
547 // Skip containers that are not covered by the include branches.
548 baseDN = DN.decode(rc.getBaseDn().toString() + "," + BASE_DN);
549
550 if (includeBranches == null || includeBranches.isEmpty())
551 {
552 exportContainers.add(rc);
553 }
554 else
555 {
556 for (DN includeBranch : includeBranches)
557 {
558 if (includeBranch.isDescendantOf(baseDN) ||
559 includeBranch.isAncestorOf(baseDN))
560 {
561 exportContainers.add(rc);
562 }
563 }
564 }
565 }
566 }
567
568 // Make a note of the time we started.
569 long startTime = System.currentTimeMillis();
570
571 // Start a timer for the progress report.
572 Timer timer = new Timer();
573 TimerTask progressTask = new ProgressTask();
574 timer.scheduleAtFixedRate(progressTask, progressInterval,
575 progressInterval);
576
577 // Create the LDIF writer.
578 LDIFWriter ldifWriter;
579 try
580 {
581 ldifWriter = new LDIFWriter(exportConfig);
582 }
583 catch (Exception e)
584 {
585 if (debugEnabled())
586 {
587 TRACER.debugCaught(DebugLogLevel.ERROR, e);
588 }
589
590 Message message =
591 ERR_BACKEND_CANNOT_CREATE_LDIF_WRITER.get(String.valueOf(e));
592 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
593 message, e);
594 }
595
596 exportRootChanges(exportContainers, exportConfig, ldifWriter);
597
598 // Iterate through the containers.
599 try
600 {
601 for (ReplicationServerDomain exportContainer : exportContainers)
602 {
603 if (exportConfig.isCancelled())
604 {
605 break;
606 }
607 processContainer(exportContainer, exportConfig, ldifWriter, null);
608 }
609 }
610 finally
611 {
612 timer.cancel();
613
614 // Close the LDIF writer
615 try
616 {
617 ldifWriter.close();
618 }
619 catch (Exception e)
620 {
621 if (debugEnabled())
622 {
623 TRACER.debugCaught(DebugLogLevel.ERROR, e);
624 }
625 }
626 }
627
628 long finishTime = System.currentTimeMillis();
629 long totalTime = (finishTime - startTime);
630
631 float rate = 0;
632 if (totalTime > 0)
633 {
634 rate = 1000f*exportedCount / totalTime;
635 }
636
637 Message message = NOTE_JEB_EXPORT_FINAL_STATUS.get(
638 exportedCount, skippedCount, totalTime/1000, rate);
639 logError(message);
640 }
641
642 /*
643 * Exports the root changes of the export, and one entry by domain.
644 */
645 private void exportRootChanges(List<ReplicationServerDomain> exportContainers,
646 LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
647 {
648 Map<AttributeType,List<Attribute>> attributes =
649 new HashMap<AttributeType,List<Attribute>>();
650 ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>();
651
652 AttributeType ocType=
653 DirectoryServer.getAttributeType("objectclass", true);
654 LinkedHashSet<AttributeValue> ocValues =
655 new LinkedHashSet<AttributeValue>();
656 ocValues.add(new AttributeValue(ocType, "top"));
657 ocValues.add(new AttributeValue(ocType, "domain"));
658 Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues);
659 ldapAttrList.add(ocAttr);
660 attributes.put(ocType, ldapAttrList);
661
662 try
663 {
664 AddChangeRecordEntry changeRecord =
665 new AddChangeRecordEntry(DN.decode(BASE_DN),
666 attributes);
667 ldifWriter.writeChangeRecord(changeRecord);
668 }
669 catch (Exception e) {}
670
671 for (ReplicationServerDomain exportContainer : exportContainers)
672 {
673 if (exportConfig != null && exportConfig.isCancelled())
674 {
675 break;
676 }
677
678 attributes.clear();
679 ldapAttrList.clear();
680
681 ldapAttrList.add(ocAttr);
682
683 AttributeType stateType=
684 DirectoryServer.getAttributeType("state", true);
685 LinkedHashSet<AttributeValue> stateValues =
686 new LinkedHashSet<AttributeValue>();
687 stateValues.add(new AttributeValue(stateType,
688 exportContainer.getDbServerState().toString()));
689 TRACER.debugInfo("State=" +
690 exportContainer.getDbServerState().toString());
691 Attribute stateAttr = new Attribute(ocType, "state", stateValues);
692 ldapAttrList.add(stateAttr);
693
694 AttributeType genidType=
695 DirectoryServer.getAttributeType("generation-id", true);
696 LinkedHashSet<AttributeValue> genidValues =
697 new LinkedHashSet<AttributeValue>();
698 genidValues.add(new AttributeValue(genidType,
699 String.valueOf(exportContainer.getGenerationId())+
700 exportContainer.getBaseDn()));
701 Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues);
702 ldapAttrList.add(genidAttr);
703 attributes.put(genidType, ldapAttrList);
704
705 try
706 {
707 AddChangeRecordEntry changeRecord =
708 new AddChangeRecordEntry(DN.decode(
709 exportContainer.getBaseDn() + "," + BASE_DN),
710 attributes);
711 ldifWriter.writeChangeRecord(changeRecord);
712 }
713 catch (Exception e)
714 {
715 if (debugEnabled())
716 {
717 TRACER.debugCaught(DebugLogLevel.ERROR, e);
718 }
719 Message message = ERR_BACKEND_EXPORT_ENTRY.get(
720 exportContainer.getBaseDn() + "," + BASE_DN,
721 String.valueOf(e));
722 logError(message);
723 }
724 }
725 }
726
727 /**
728 * Processes the changes for a given ReplicationServerDomain.
729 */
730 private void processContainer(ReplicationServerDomain rsd,
731 LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
732 SearchOperation searchOperation)
733 {
734 // Walk through the servers
735 for (Short serverId : rsd.getServers())
736 {
737 if (exportConfig != null && exportConfig.isCancelled())
738 {
739 break;
740 }
741
742 ReplicationIterator ri = rsd.getChangelogIterator(serverId,
743 null);
744
745 if (ri != null)
746 {
747 try
748 {
749 // Walk through the changes
750 while (ri.getChange() != null)
751 {
752 if (exportConfig != null && exportConfig.isCancelled())
753 {
754 break;
755 }
756 UpdateMessage msg = ri.getChange();
757 processChange(msg, exportConfig, ldifWriter, searchOperation);
758 if (!ri.next())
759 break;
760 }
761 }
762 finally
763 {
764 ri.releaseCursor();
765 }
766 }
767 }
768 }
769
770 /**
771 * Export one change.
772 */
773 private void processChange(UpdateMessage msg,
774 LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
775 SearchOperation searchOperation)
776 {
777 InternalClientConnection conn =
778 InternalClientConnection.getRootConnection();
779 Entry entry = null;
780 DN dn = null;
781
782 try
783 {
784 if (msg instanceof AddMsg)
785 {
786 AddMsg addMsg = (AddMsg)msg;
787 AddOperation addOperation = (AddOperation)msg.createOperation(conn);
788
789 dn = DN.decode("puid=" + addMsg.getParentUid() + "," +
790 "changeNumber=" + msg.getChangeNumber().toString() + "," +
791 msg.getDn() +","+ BASE_DN);
792
793 Map<AttributeType,List<Attribute>> attributes =
794 new HashMap<AttributeType,List<Attribute>>();
795
796 for (RawAttribute a : addOperation.getRawAttributes())
797 {
798 Attribute attr = a.toAttribute();
799 AttributeType attrType = attr.getAttributeType();
800 List<Attribute> attrs = attributes.get(attrType);
801 if (attrs == null)
802 {
803 attrs = new ArrayList<Attribute>(1);
804 attrs.add(attr);
805 attributes.put(attrType, attrs);
806 }
807 else
808 {
809 attrs.add(attr);
810 }
811 }
812
813 AddChangeRecordEntry changeRecord =
814 new AddChangeRecordEntry(dn, attributes);
815 if (exportConfig != null)
816 {
817 ldifWriter.writeChangeRecord(changeRecord);
818 }
819 else
820 {
821 Writer writer = new Writer();
822 LDIFWriter ldifWriter2 = writer.getLDIFWriter();
823 ldifWriter2.writeChangeRecord(changeRecord);
824 LDIFReader reader = writer.getLDIFReader();
825 entry = reader.readEntry();
826 }
827 }
828 else if (msg instanceof DeleteMsg)
829 {
830 DeleteMsg delMsg = (DeleteMsg)msg;
831
832 dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
833 "changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
834 msg.getDn() +","+ BASE_DN);
835
836 DeleteChangeRecordEntry changeRecord =
837 new DeleteChangeRecordEntry(dn);
838 if (exportConfig != null)
839 {
840 ldifWriter.writeChangeRecord(changeRecord);
841 }
842 else
843 {
844 Writer writer = new Writer();
845 LDIFWriter ldifWriter2 = writer.getLDIFWriter();
846 ldifWriter2.writeChangeRecord(changeRecord);
847 LDIFReader reader = writer.getLDIFReader();
848 entry = reader.readEntry();
849 }
850 }
851 else if (msg instanceof ModifyMsg)
852 {
853 ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
854
855 dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
856 "changeNumber=" + msg.getChangeNumber().toString()+ "," +
857 msg.getDn() +","+ BASE_DN);
858 op.setInternalOperation(true);
859
860 ModifyChangeRecordEntry changeRecord =
861 new ModifyChangeRecordEntry(dn, op.getRawModifications());
862 if (exportConfig != null)
863 {
864 ldifWriter.writeChangeRecord(changeRecord);
865 }
866 else
867 {
868 Writer writer = new Writer();
869 LDIFWriter ldifWriter2 = writer.getLDIFWriter();
870 ldifWriter2.writeChangeRecord(changeRecord);
871 LDIFReader reader = writer.getLDIFReader();
872 entry = reader.readEntry();
873 }
874 }
875 else if (msg instanceof ModifyDNMsg)
876 {
877 ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
878
879 dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
880 "changeNumber=" + msg.getChangeNumber().toString()+ "," +
881 msg.getDn() +","+ BASE_DN);
882 op.setInternalOperation(true);
883
884 ModifyDNChangeRecordEntry changeRecord =
885 new ModifyDNChangeRecordEntry(dn, op.getNewRDN(), op.deleteOldRDN(),
886 op.getNewSuperior());
887
888 if (exportConfig != null)
889 {
890 ldifWriter.writeChangeRecord(changeRecord);
891 }
892 else
893 {
894 Writer writer = new Writer();
895 LDIFWriter ldifWriter2 = writer.getLDIFWriter();
896 ldifWriter2.writeChangeRecord(changeRecord);
897 LDIFReader reader = writer.getLDIFReader();
898 Entry modDNEntry = reader.readEntry();
899 entry = modDNEntry;
900 }
901 }
902
903 if (exportConfig != null)
904 {
905 this.exportedCount++;
906 }
907 else
908 {
909 // Get the base DN, scope, and filter for the search.
910 DN searchBaseDN = searchOperation.getBaseDN();
911 SearchScope scope = searchOperation.getScope();
912 SearchFilter filter = searchOperation.getFilter();
913
914 boolean ms = entry.matchesBaseAndScope(searchBaseDN, scope);
915 boolean mf = filter.matchesEntry(entry);
916 if ( ms && mf )
917 {
918 searchOperation.returnEntry(entry, new LinkedList<Control>());
919 }
920 }
921 }
922 catch (Exception e)
923 {
924 this.skippedCount++;
925 if (debugEnabled())
926 {
927 TRACER.debugCaught(DebugLogLevel.ERROR, e);
928 }
929 Message message = null;
930 if (exportConfig != null)
931 {
932 message = ERR_BACKEND_EXPORT_ENTRY.get(
933 dn.toNormalizedString(), String.valueOf(e));
934 }
935 else
936 {
937 message = ERR_BACKEND_SEARCH_ENTRY.get(
938 dn.toNormalizedString(), e.getLocalizedMessage());
939 }
940 logError(message);
941 }
942 }
943
944
945
946 /**
947 * {@inheritDoc}
948 */
949 @Override()
950 public boolean supportsLDIFImport()
951 {
952 return false;
953 }
954
955
956
957 /**
958 * {@inheritDoc}
959 */
960 @Override()
961 public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig)
962 throws DirectoryException
963 {
964 Message message = ERR_REPLICATONBACKEND_IMPORT_LDIF_NOT_SUPPORTED.get();
965 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
966 }
967
968
969
970 /**
971 * {@inheritDoc}
972 */
973 @Override()
974 public boolean supportsBackup()
975 {
976 // This backend does not provide a backup/restore mechanism.
977 return true;
978 }
979
980
981
982 /**
983 * {@inheritDoc}
984 */
985 @Override()
986 public boolean supportsBackup(BackupConfig backupConfig,
987 StringBuilder unsupportedReason)
988 {
989 return true;
990 }
991
992
993
994 /**
995 * {@inheritDoc}
996 */
997 @Override()
998 public void createBackup(BackupConfig backupConfig)
999 throws DirectoryException
1000 {
1001 BackupManager backupManager = new BackupManager(getBackendID());
1002 File backendDir = getFileForPath(getReplicationServerCfg()
1003 .getReplicationDBDirectory());
1004 backupManager.createBackup(backendDir, backupConfig);
1005 }
1006
1007
1008
1009 /**
1010 * {@inheritDoc}
1011 */
1012 @Override()
1013 public void removeBackup(BackupDirectory backupDirectory,
1014 String backupID)
1015 throws DirectoryException
1016 {
1017 BackupManager backupManager =
1018 new BackupManager(getBackendID());
1019 backupManager.removeBackup(backupDirectory, backupID);
1020 }
1021
1022
1023
1024 /**
1025 * {@inheritDoc}
1026 */
1027 @Override()
1028 public boolean supportsRestore()
1029 {
1030 return true;
1031 }
1032
1033
1034
1035 /**
1036 * {@inheritDoc}
1037 */
1038 @Override()
1039 public void restoreBackup(RestoreConfig restoreConfig)
1040 throws DirectoryException
1041 {
1042 BackupManager backupManager =
1043 new BackupManager(getBackendID());
1044 File backendDir = getFileForPath(getReplicationServerCfg()
1045 .getReplicationDBDirectory());
1046 backupManager.restoreBackup(backendDir, restoreConfig);
1047 }
1048
1049
1050
1051 /**
1052 * {@inheritDoc}
1053 */
1054 @Override()
1055 public long numSubordinates(DN entryDN, boolean subtree)
1056 throws DirectoryException
1057 {
1058 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
1059 ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
1060 }
1061
1062
1063
1064 /**
1065 * {@inheritDoc}
1066 */
1067 @Override()
1068 public ConditionResult hasSubordinates(DN entryDN)
1069 throws DirectoryException
1070 {
1071 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
1072 ERR_HAS_SUBORDINATES_NOT_SUPPORTED.get());
1073 }
1074
1075 /**
1076 * Set the replication server associated with this backend.
1077 * @param server The replication server.
1078 */
1079 public void setServer(ReplicationServer server)
1080 {
1081 this.server = server;
1082 }
1083
1084 /**
1085 * This class reports progress of the export job at fixed intervals.
1086 */
1087 private final class ProgressTask extends TimerTask
1088 {
1089 /**
1090 * The number of entries that had been exported at the time of the
1091 * previous progress report.
1092 */
1093 private long previousCount = 0;
1094
1095 /**
1096 * The time in milliseconds of the previous progress report.
1097 */
1098 private long previousTime;
1099
1100 /**
1101 * Create a new export progress task.
1102 */
1103 public ProgressTask()
1104 {
1105 previousTime = System.currentTimeMillis();
1106 }
1107
1108 /**
1109 * The action to be performed by this timer task.
1110 */
1111 public void run()
1112 {
1113 long latestCount = exportedCount;
1114 long deltaCount = (latestCount - previousCount);
1115 long latestTime = System.currentTimeMillis();
1116 long deltaTime = latestTime - previousTime;
1117
1118 if (deltaTime == 0)
1119 {
1120 return;
1121 }
1122
1123 float rate = 1000f*deltaCount / deltaTime;
1124
1125 Message message =
1126 NOTE_JEB_EXPORT_PROGRESS_REPORT.get(latestCount, skippedCount, rate);
1127 logError(message);
1128
1129 previousCount = latestCount;
1130 previousTime = latestTime;
1131 }
1132 };
1133
1134
1135
1136 /**
1137 * {@inheritDoc}
1138 */
1139 @Override()
1140 public synchronized void search(SearchOperation searchOperation)
1141 throws DirectoryException
1142 {
1143 // Get the base DN, scope, and filter for the search.
1144 DN searchBaseDN = searchOperation.getBaseDN();
1145 DN baseDN;
1146 ArrayList<ReplicationServerDomain> searchContainers =
1147 new ArrayList<ReplicationServerDomain>();
1148
1149 //This check is for GroupManager initialization. It currently doesn't
1150 //come into play because the replication server variable is null in
1151 //the check above. But if the order of initialization of the server variable
1152 //is ever changed, the following check will keep replication change entries
1153 //from being added to the groupmanager cache erroneously.
1154 List<Control> requestControls = searchOperation.getRequestControls();
1155 if (requestControls != null)
1156 {
1157 for (Control c : requestControls)
1158 {
1159 if (c.getOID().equals(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE))
1160 {
1161 return;
1162 }
1163 }
1164 }
1165 // Make sure the base entry exists if it's supposed to be in this backend.
1166 if (!handlesEntry(searchBaseDN))
1167 {
1168 DN matchedDN = searchBaseDN.getParentDNInSuffix();
1169 while (matchedDN != null)
1170 {
1171 if (handlesEntry(matchedDN))
1172 {
1173 break;
1174 }
1175 matchedDN = matchedDN.getParentDNInSuffix();
1176 }
1177
1178 Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST.
1179 get(String.valueOf(searchBaseDN));
1180 throw new DirectoryException(
1181 ResultCode.NO_SUCH_OBJECT, message, matchedDN, null);
1182 }
1183
1184 if (server==null)
1185 {
1186 server = getReplicationServer();
1187
1188 if (server == null)
1189 {
1190 if (baseDNSet.contains(searchBaseDN))
1191 {
1192 // Get the base DN, scope, and filter for the search.
1193 SearchScope scope = searchOperation.getScope();
1194 SearchFilter filter = searchOperation.getFilter();
1195 Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
1196 operationalAttributes);
1197
1198 if (re.matchesBaseAndScope(searchBaseDN, scope) &&
1199 filter.matchesEntry(re))
1200 {
1201 searchOperation.returnEntry(re, new LinkedList<Control>());
1202 }
1203 return;
1204 }
1205 else
1206 {
1207 Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST.
1208 get(String.valueOf(searchBaseDN));
1209 throw new DirectoryException(
1210 ResultCode.NO_SUCH_OBJECT, message, null, null);
1211 }
1212 }
1213 }
1214
1215 // Get the base DN, scope, and filter for the search.
1216 SearchScope scope = searchOperation.getScope();
1217 SearchFilter filter = searchOperation.getFilter();
1218 Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
1219 operationalAttributes);
1220
1221 if (re.matchesBaseAndScope(searchBaseDN, scope) &&
1222 filter.matchesEntry(re))
1223 {
1224 searchOperation.returnEntry(re, new LinkedList<Control>());
1225 }
1226
1227 // Walk through all entries and send the ones that match.
1228 Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
1229 if (rsdi != null)
1230 {
1231 while (rsdi.hasNext())
1232 {
1233 ReplicationServerDomain rsd = rsdi.next();
1234
1235 // Skip containers that are not covered by the include branches.
1236 baseDN = DN.decode(rsd.getBaseDn().toString() + "," + BASE_DN);
1237
1238 if (searchBaseDN.isDescendantOf(baseDN) ||
1239 searchBaseDN.isAncestorOf(baseDN))
1240 {
1241 searchContainers.add(rsd);
1242 }
1243 }
1244 }
1245
1246 for (ReplicationServerDomain exportContainer : searchContainers)
1247 {
1248 processContainer(exportContainer, null, null, searchOperation);
1249 }
1250 }
1251
1252
1253 /**
1254 * Retrieves the replication server associated to this backend.
1255 *
1256 * @return The server retrieved
1257 * @throws DirectoryException When it occurs.
1258 */
1259 private ReplicationServer getReplicationServer() throws DirectoryException
1260 {
1261 ReplicationServer replicationServer = null;
1262
1263 DirectoryServer.getSynchronizationProviders();
1264 for (SynchronizationProvider<?> provider :
1265 DirectoryServer.getSynchronizationProviders())
1266 {
1267 if (provider instanceof MultimasterReplication)
1268 {
1269 MultimasterReplication mmp = (MultimasterReplication)provider;
1270 ReplicationServerListener list = mmp.getReplicationServerListener();
1271 if (list != null)
1272 {
1273 replicationServer = list.getReplicationServer();
1274 break;
1275 }
1276 }
1277 }
1278 return replicationServer;
1279 }
1280
1281 // Find the replication server configuration associated with this
1282 // replication backend.
1283 private ReplicationServerCfg getReplicationServerCfg()
1284 throws DirectoryException {
1285 RootCfg root = ServerManagementContext.getInstance().getRootConfiguration();
1286
1287 for (String name : root.listSynchronizationProviders()) {
1288 SynchronizationProviderCfg cfg;
1289 try {
1290 cfg = root.getSynchronizationProvider(name);
1291 } catch (ConfigException e) {
1292 throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1293 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e);
1294 }
1295 if (cfg instanceof ReplicationSynchronizationProviderCfg) {
1296 ReplicationSynchronizationProviderCfg scfg =
1297 (ReplicationSynchronizationProviderCfg) cfg;
1298 try {
1299 return scfg.getReplicationServer();
1300 } catch (ConfigException e) {
1301 throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1302 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e);
1303 }
1304 }
1305 }
1306
1307 // No replication server found.
1308 throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1309 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get());
1310 }
1311
1312 /**
1313 * Writer class to read/write from/to a bytearray.
1314 */
1315 private static final class Writer
1316 {
1317 // The underlying output stream.
1318 private final ByteArrayOutputStream stream;
1319
1320 // The underlying LDIF config.
1321 private final LDIFExportConfig config;
1322
1323 // The LDIF writer.
1324 private final LDIFWriter writer;
1325
1326 /**
1327 * Create a new string writer.
1328 */
1329 public Writer() {
1330 this.stream = new ByteArrayOutputStream();
1331 this.config = new LDIFExportConfig(stream);
1332 try {
1333 this.writer = new LDIFWriter(config);
1334 } catch (IOException e) {
1335 // Should not happen.
1336 throw new RuntimeException(e);
1337 }
1338 }
1339
1340 /**
1341 * Get the LDIF writer.
1342 *
1343 * @return Returns the LDIF writer.
1344 */
1345 public LDIFWriter getLDIFWriter() {
1346 return writer;
1347 }
1348
1349 /**
1350 * Close the writer and get a string reader for the LDIF content.
1351 *
1352 * @return Returns the string contents of the writer.
1353 * @throws Exception
1354 * If an error occurred closing the writer.
1355 */
1356 public BufferedReader getLDIFBufferedReader() throws Exception {
1357 writer.close();
1358 String ldif = stream.toString("UTF-8");
1359 StringReader reader = new StringReader(ldif);
1360 return new BufferedReader(reader);
1361 }
1362
1363 /**
1364 * Close the writer and get an LDIF reader for the LDIF content.
1365 *
1366 * @return Returns an LDIF Reader.
1367 * @throws Exception
1368 * If an error occurred closing the writer.
1369 */
1370 public LDIFReader getLDIFReader() throws Exception {
1371 writer.close();
1372 ByteArrayInputStream istream = new
1373 ByteArrayInputStream(stream.toByteArray());
1374 String ldif = stream.toString("UTF-8");
1375 ldif = ldif.replace("\n-\n", "\n");
1376 istream = new ByteArrayInputStream(ldif.getBytes());
1377 LDIFImportConfig config = new LDIFImportConfig(istream);
1378 return new LDIFReader(config);
1379 }
1380 }
1381
1382
1383
1384 /**
1385 * {@inheritDoc}
1386 */
1387 public void preloadEntryCache() throws UnsupportedOperationException {
1388 throw new UnsupportedOperationException("Operation not supported.");
1389 }
1390 }
1391