001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import static org.opends.messages.ReplicationMessages.*;
029 import static org.opends.server.loggers.ErrorLogger.logError;
030 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
031 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
032 import static org.opends.server.util.ServerConstants.EOL;
033 import static org.opends.server.util.StaticUtils.getFileForPath;
034
035 import java.io.File;
036 import java.io.IOException;
037 import java.io.StringReader;
038 import java.net.InetAddress;
039 import java.net.InetSocketAddress;
040 import java.net.ServerSocket;
041 import java.net.Socket;
042 import java.net.UnknownHostException;
043 import java.util.ArrayList;
044 import java.util.Collection;
045 import java.util.concurrent.ConcurrentHashMap;
046 import java.util.Iterator;
047 import java.util.LinkedHashSet;
048 import java.util.List;
049 import java.util.Set;
050
051 import org.opends.messages.Message;
052 import org.opends.messages.MessageBuilder;
053 import org.opends.server.admin.server.ConfigurationChangeListener;
054 import org.opends.server.admin.std.server.MonitorProviderCfg;
055 import org.opends.server.admin.std.server.ReplicationServerCfg;
056 import org.opends.server.api.Backend;
057 import org.opends.server.api.BackupTaskListener;
058 import org.opends.server.api.ExportTaskListener;
059 import org.opends.server.api.ImportTaskListener;
060 import org.opends.server.api.MonitorProvider;
061 import org.opends.server.api.RestoreTaskListener;
062 import org.opends.server.config.ConfigException;
063 import org.opends.server.core.DirectoryServer;
064 import org.opends.server.loggers.LogLevel;
065 import org.opends.server.loggers.debug.DebugTracer;
066 import org.opends.server.replication.protocol.ProtocolSession;
067 import org.opends.server.replication.protocol.ReplSessionSecurity;
068 import org.opends.server.types.Attribute;
069 import org.opends.server.types.AttributeType;
070 import org.opends.server.types.AttributeValue;
071 import org.opends.server.types.BackupConfig;
072 import org.opends.server.types.ConfigChangeResult;
073 import org.opends.server.types.DN;
074 import org.opends.server.types.Entry;
075 import org.opends.server.types.LDIFExportConfig;
076 import org.opends.server.types.LDIFImportConfig;
077 import org.opends.server.types.RestoreConfig;
078 import org.opends.server.types.ResultCode;
079 import org.opends.server.util.LDIFReader;
080
081 import com.sleepycat.je.DatabaseException;
082
083 /**
084 * ReplicationServer Listener.
085 *
086 * This singleton is the main object of the replication server
087 * It waits for the incoming connections and create listener
088 * and publisher objects for
089 * connection with LDAP servers and with replication servers
090 *
091 * It is responsible for creating the replication server replicationServerDomain
092 * and managing it
093 */
094 public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
095 implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
096 BackupTaskListener, RestoreTaskListener, ImportTaskListener,
097 ExportTaskListener
098 {
099 private short serverId;
100 private String serverURL;
101
102 private ServerSocket listenSocket;
103 private Thread listenThread;
104 private Thread connectThread;
105
106 /* The list of replication servers configured by the administrator */
107 private Collection<String> replicationServers;
108
109 /* This table is used to store the list of dn for which we are currently
110 * handling servers.
111 */
112 private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs =
113 new ConcurrentHashMap<DN, ReplicationServerDomain>();
114
115 private String localURL = "null";
116 private boolean shutdown = false;
117 private short replicationServerId;
118 private ReplicationDbEnv dbEnv;
119 private int rcvWindow;
120 private int queueSize;
121 private String dbDirname = null;
122
123 // The delay (in sec) after which the changes must
124 // be deleted from the persistent storage.
125 private long purgeDelay;
126
127 private int replicationPort;
128 private boolean stopListen = false;
129 private ReplSessionSecurity replSessionSecurity;
130
131 // For the backend associated to this replication server,
132 // DN of the config entry of the backend
133 private DN backendConfigEntryDN;
134 // ID of the backend
135 private static final String backendId = "replicationChanges";
136
137 // At startup, the listen thread wait on this flag for the connet
138 // thread to look for other servers in the topology.
139 private boolean connectedInTopology = false;
140 private final Object connectedInTopologyLock = new Object();
141
142 /**
143 * The tracer object for the debug logger.
144 */
145 private static final DebugTracer TRACER = getTracer();
146
147 /**
148 * Creates a new Replication server using the provided configuration entry.
149 *
150 * @param configuration The configuration of this replication server.
151 * @throws ConfigException When Configuration is invalid.
152 */
153 public ReplicationServer(ReplicationServerCfg configuration)
154 throws ConfigException
155 {
156 super("Replication Server" + configuration.getReplicationPort());
157
158 replicationPort = configuration.getReplicationPort();
159 replicationServerId = (short) configuration.getReplicationServerId();
160 replicationServers = configuration.getReplicationServer();
161 if (replicationServers == null)
162 replicationServers = new ArrayList<String>();
163 queueSize = configuration.getQueueSize();
164 purgeDelay = configuration.getReplicationPurgeDelay();
165 dbDirname = configuration.getReplicationDBDirectory();
166 rcvWindow = configuration.getWindowSize();
167 if (dbDirname == null)
168 {
169 dbDirname = "changelogDb";
170 }
171 // Check that this path exists or create it.
172 File f = getFileForPath(dbDirname);
173 try
174 {
175 if (!f.exists())
176 {
177 f.mkdir();
178 }
179 }
180 catch (Exception e)
181 {
182
183 MessageBuilder mb = new MessageBuilder();
184 mb.append(e.getLocalizedMessage());
185 mb.append(" ");
186 mb.append(String.valueOf(getFileForPath(dbDirname)));
187 Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
188 throw new ConfigException(msg, e);
189 }
190
191 replSessionSecurity = new ReplSessionSecurity(configuration);
192 initialize(replicationServerId, replicationPort);
193 configuration.addChangeListener(this);
194 DirectoryServer.registerMonitorProvider(this);
195
196 try
197 {
198 backendConfigEntryDN = DN.decode(
199 "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
200 } catch (Exception e) {}
201
202 // Creates the backend associated to this ReplicationServer
203 // if it does not exist.
204 createBackend();
205
206 DirectoryServer.registerBackupTaskListener(this);
207 DirectoryServer.registerRestoreTaskListener(this);
208 DirectoryServer.registerExportTaskListener(this);
209 DirectoryServer.registerImportTaskListener(this);
210 }
211
212
213 /**
214 * The run method for the Listen thread.
215 * This thread accept incoming connections on the replication server
216 * ports from other replication servers or from LDAP servers
217 * and spawn further thread responsible for handling those connections
218 */
219
220 void runListen()
221 {
222 Socket newSocket;
223
224 // wait for the connect thread to find other replication
225 // servers in the topology before starting to accept connections
226 // from the ldap servers.
227 synchronized (connectedInTopologyLock)
228 {
229 if (connectedInTopology == false)
230 {
231 try
232 {
233 connectedInTopologyLock.wait(1000);
234 } catch (InterruptedException e)
235 {
236 }
237 }
238 }
239
240 while ((shutdown == false) && (stopListen == false))
241 {
242 // Wait on the replicationServer port.
243 // Read incoming messages and create LDAP or ReplicationServer listener
244 // and Publisher.
245
246 try
247 {
248 newSocket = listenSocket.accept();
249 newSocket.setReceiveBufferSize(1000000);
250 newSocket.setTcpNoDelay(true);
251 newSocket.setKeepAlive(true);
252 ProtocolSession session =
253 replSessionSecurity.createServerSession(newSocket);
254 if (session == null) // Error, go back to accept
255 continue;
256 ServerHandler handler = new ServerHandler(session, queueSize);
257 handler.start(null, serverId, serverURL, rcvWindow,
258 false, this);
259 }
260 catch (Exception e)
261 {
262 // The socket has probably been closed as part of the
263 // shutdown or changing the port number process.
264 // just log debug information and loop.
265 Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
266 logError(message);
267 }
268 }
269 }
270
271 /**
272 * This method manages the connection with the other replication servers.
273 * It periodically checks that this replication server is indeed connected
274 * to all the other replication servers and if not attempts to
275 * make the connection.
276 */
277 void runConnect()
278 {
279 while (shutdown == false)
280 {
281 /*
282 * periodically check that we are connected to all other
283 * replication servers and if not establish the connection
284 */
285 for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
286 {
287 Set<String> connectedReplServers =
288 replicationServerDomain.getChangelogs();
289 /*
290 * check that all replication server in the config are in the connected
291 * Set. If not create the connection
292 */
293 for (String serverURL : replicationServers)
294 {
295 int separator = serverURL.lastIndexOf(':');
296 String port = serverURL.substring(separator + 1);
297 String hostname = serverURL.substring(0, separator);
298
299 try
300 {
301 InetAddress inetAddress = InetAddress.getByName(hostname);
302 String serverAddress = inetAddress.getHostAddress() + ":" + port;
303
304 if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0)
305 && (serverAddress.compareTo(this.localURL) != 0)
306 && (!connectedReplServers.contains(serverAddress)))
307 {
308 this.connect(serverURL, replicationServerDomain.getBaseDn());
309 }
310 }
311 catch (IOException e)
312 {
313 Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
314 logError(message);
315 }
316 }
317 }
318 synchronized (connectedInTopologyLock)
319 {
320 // wake up the listen thread if necessary.
321 if (connectedInTopology == false)
322 {
323 connectedInTopologyLock.notify();
324 connectedInTopology = true;
325 }
326 }
327 try
328 {
329 synchronized (this)
330 {
331 /* check if we are connected every second */
332 int randomizer = (int) Math.random()*100;
333 wait(1000 + randomizer);
334 }
335 } catch (InterruptedException e)
336 {
337 // ignore error, will try to connect again or shutdown
338 }
339 }
340 }
341
342 /**
343 * Establish a connection to the server with the address and port.
344 *
345 * @param serverURL The address and port for the server, separated by a
346 * colon.
347 * @param baseDn The baseDn of the connection
348 */
349 private void connect(String serverURL, DN baseDn)
350 {
351 int separator = serverURL.lastIndexOf(':');
352 String port = serverURL.substring(separator + 1);
353 String hostname = serverURL.substring(0, separator);
354 boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
355
356 if (debugEnabled())
357 TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
358 " connects to " + serverURL);
359
360 try
361 {
362 InetSocketAddress ServerAddr = new InetSocketAddress(
363 InetAddress.getByName(hostname), Integer.parseInt(port));
364 Socket socket = new Socket();
365 socket.setReceiveBufferSize(1000000);
366 socket.setTcpNoDelay(true);
367 socket.connect(ServerAddr, 500);
368
369 ServerHandler handler = new ServerHandler(
370 replSessionSecurity.createClientSession(serverURL, socket),
371 queueSize);
372 handler.start(baseDn, serverId, this.serverURL, rcvWindow,
373 sslEncryption, this);
374 }
375 catch (Exception e)
376 {
377 // ignore
378 }
379
380 }
381
382 /**
383 * initialization function for the replicationServer.
384 *
385 * @param changelogId The unique identifier for this replicationServer.
386 * @param changelogPort The port on which the replicationServer should
387 * listen.
388 *
389 */
390 private void initialize(short changelogId, int changelogPort)
391 {
392 shutdown = false;
393
394 try
395 {
396 /*
397 * Initialize the replicationServer database.
398 */
399 dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
400 this);
401
402 /*
403 * create replicationServer replicationServerDomain
404 */
405 serverId = changelogId;
406
407 /*
408 * Open replicationServer socket
409 */
410 String localhostname = InetAddress.getLocalHost().getHostName();
411 String localAdddress = InetAddress.getLocalHost().getHostAddress();
412 serverURL = localhostname + ":" + String.valueOf(changelogPort);
413 localURL = localAdddress + ":" + String.valueOf(changelogPort);
414 listenSocket = new ServerSocket();
415 listenSocket.setReceiveBufferSize(1000000);
416 listenSocket.bind(new InetSocketAddress(changelogPort));
417
418 /*
419 * creates working threads
420 * We must first connect, then start to listen.
421 */
422 if (debugEnabled())
423 TRACER.debugInfo("RS " +getMonitorInstanceName()+
424 " creates connect threads");
425 connectThread =
426 new ReplicationServerConnectThread("Replication Server Connect", this);
427 connectThread.start();
428
429 // FIXME : Is it better to have the time to receive the ReplServerInfo
430 // from all the other replication servers since this info is necessary
431 // to route an early received total update request.
432 try { Thread.sleep(300);} catch(Exception e) {}
433 if (debugEnabled())
434 TRACER.debugInfo("RS " +getMonitorInstanceName()+
435 " creates listen threads");
436
437 listenThread =
438 new ReplicationServerListenThread("Replication Server Listener", this);
439 listenThread.start();
440
441 if (debugEnabled())
442 TRACER.debugInfo("RS " +getMonitorInstanceName()+
443 " successfully initialized");
444
445 } catch (DatabaseException e)
446 {
447 Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
448 getFileForPath(dbDirname).getAbsolutePath());
449 logError(message);
450 } catch (ReplicationDBException e)
451 {
452 Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
453 e.getLocalizedMessage());
454 logError(message);
455 } catch (UnknownHostException e)
456 {
457 Message message = ERR_UNKNOWN_HOSTNAME.get();
458 logError(message);
459 } catch (IOException e)
460 {
461 Message message =
462 ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
463 logError(message);
464 }
465 }
466
467 /**
468 * Get the ReplicationServerDomain associated to the base DN given in
469 * parameter.
470 *
471 * @param baseDn The base Dn for which the ReplicationServerDomain must be
472 * returned.
473 * @param create Specifies whether to create the ReplicationServerDomain if
474 * it does not already exist.
475 * @return The ReplicationServerDomain associated to the base DN given in
476 * parameter.
477 */
478 public ReplicationServerDomain getReplicationServerDomain(DN baseDn,
479 boolean create)
480 {
481 ReplicationServerDomain replicationServerDomain;
482
483 synchronized (baseDNs)
484 {
485 replicationServerDomain = baseDNs.get(baseDn);
486 if ((replicationServerDomain == null) && (create))
487 {
488 replicationServerDomain = new ReplicationServerDomain(baseDn, this);
489 baseDNs.put(baseDn, replicationServerDomain);
490 }
491 }
492
493 return replicationServerDomain;
494 }
495
496 /**
497 * Shutdown the Replication Server service and all its connections.
498 */
499 public void shutdown()
500 {
501 if (shutdown)
502 return;
503
504 shutdown = true;
505
506 // shutdown the connect thread
507 if (connectThread != null)
508 {
509 connectThread.interrupt();
510 }
511
512 // shutdown the listener thread
513 try
514 {
515 if (listenSocket != null)
516 {
517 listenSocket.close();
518 }
519 } catch (IOException e)
520 {
521 // replication Server service is closing anyway.
522 }
523
524 // shutdown the listen thread
525 if (listenThread != null)
526 {
527 listenThread.interrupt();
528 }
529
530 // shutdown all the ChangelogCaches
531 for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
532 {
533 replicationServerDomain.shutdown();
534 }
535
536 if (dbEnv != null)
537 {
538 dbEnv.shutdown();
539 }
540 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
541 }
542
543
544 /**
545 * Creates a new DB handler for this ReplicationServer and the serverId and
546 * DN given in parameter.
547 *
548 * @param id The serverId for which the dbHandler must be created.
549 * @param baseDn The DN for which the dbHandler muste be created.
550 * @return The new DB handler for this ReplicationServer and the serverId and
551 * DN given in parameter.
552 * @throws DatabaseException in case of underlying database problem.
553 */
554 public DbHandler newDbHandler(short id, DN baseDn)
555 throws DatabaseException
556 {
557 return new DbHandler(id, baseDn, this, dbEnv);
558 }
559
560 /**
561 * Clears the generationId for the replicationServerDomain related to the
562 * provided baseDn.
563 * @param baseDn The baseDn for which to delete the generationId.
564 * @throws DatabaseException When it occurs.
565 */
566 public void clearGenerationId(DN baseDn)
567 throws DatabaseException
568 {
569 try
570 {
571 dbEnv.clearGenerationId(baseDn);
572 }
573 catch(Exception e)
574 {
575 TRACER.debugCaught(LogLevel.ALL, e);
576 }
577 }
578
579 /**
580 * Retrieves the time after which changes must be deleted from the
581 * persistent storage (in milliseconds).
582 *
583 * @return The time after which changes must be deleted from the
584 * persistent storage (in milliseconds).
585 */
586 long getTrimage()
587 {
588 return purgeDelay * 1000;
589 }
590
591 /**
592 * Check if the provided configuration is acceptable for add.
593 *
594 * @param configuration The configuration to check.
595 * @param unacceptableReasons When the configuration is not acceptable, this
596 * table is use to return the reasons why this
597 * configuration is not acceptbale.
598 *
599 * @return true if the configuration is acceptable, false other wise.
600 */
601 public static boolean isConfigurationAcceptable(
602 ReplicationServerCfg configuration, List<Message> unacceptableReasons)
603 {
604 int port = configuration.getReplicationPort();
605
606 try
607 {
608 ServerSocket tmpSocket = new ServerSocket();
609 tmpSocket.bind(new InetSocketAddress(port));
610 tmpSocket.close();
611 }
612 catch (Exception e)
613 {
614 Message message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage());
615 unacceptableReasons.add(message);
616 return false;
617 }
618
619 return true;
620 }
621
622 /**
623 * {@inheritDoc}
624 */
625 public ConfigChangeResult applyConfigurationChange(
626 ReplicationServerCfg configuration)
627 {
628 // Changing those properties don't need specific code.
629 // They will be applied for next connections.
630 replicationServers = configuration.getReplicationServer();
631 if (replicationServers == null)
632 replicationServers = new ArrayList<String>();
633 queueSize = configuration.getQueueSize();
634 long newPurgeDelay = configuration.getReplicationPurgeDelay();
635 if (newPurgeDelay != purgeDelay)
636 {
637 purgeDelay = newPurgeDelay;
638 // propagate
639 for (ReplicationServerDomain domain : baseDNs.values())
640 {
641 domain.setPurgeDelay(purgeDelay);
642 }
643 }
644
645 rcvWindow = configuration.getWindowSize();
646
647 // changing the listen port requires to stop the listen thread
648 // and restart it.
649 int newPort = configuration.getReplicationPort();
650 if (newPort != replicationPort)
651 {
652 stopListen = true;
653 try
654 {
655 listenSocket.close();
656 listenThread.join();
657 stopListen = false;
658
659 replicationPort = newPort;
660 String localhostname = InetAddress.getLocalHost().getHostName();
661 String localAdddress = InetAddress.getLocalHost().getHostAddress();
662 serverURL = localhostname + ":" + String.valueOf(replicationPort);
663 localURL = localAdddress + ":" + String.valueOf(replicationPort);
664 listenSocket = new ServerSocket();
665 listenSocket.setReceiveBufferSize(1000000);
666 listenSocket.bind(new InetSocketAddress(replicationPort));
667
668 listenThread =
669 new ReplicationServerListenThread(
670 "Replication Server Listener", this);
671 listenThread.start();
672 }
673 catch (IOException e)
674 {
675 Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString());
676 logError(message);
677 new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(),
678 false);
679 }
680 catch (InterruptedException e)
681 {
682 Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString());
683 logError(message);
684 new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(),
685 false);
686 }
687 }
688
689 if ((configuration.getReplicationDBDirectory() != null) &&
690 (!dbDirname.equals(configuration.getReplicationDBDirectory())))
691 {
692 return new ConfigChangeResult(ResultCode.SUCCESS, true);
693 }
694
695 return new ConfigChangeResult(ResultCode.SUCCESS, false);
696 }
697
698 /**
699 * {@inheritDoc}
700 */
701 public boolean isConfigurationChangeAcceptable(
702 ReplicationServerCfg configuration, List<Message> unacceptableReasons)
703 {
704 return true;
705 }
706
707 /**
708 * {@inheritDoc}
709 */
710 @Override
711 public void initializeMonitorProvider(MonitorProviderCfg configuraiton)
712 {
713 // Nothing to do for now
714 }
715
716 /**
717 * {@inheritDoc}
718 */
719 @Override
720 public String getMonitorInstanceName()
721 {
722 return "Replication Server " + this.replicationPort + " "
723 + replicationServerId;
724 }
725
726 /**
727 * {@inheritDoc}
728 */
729 @Override
730 public long getUpdateInterval()
731 {
732 /* we don't wont to do polling on this monitor */
733 return 0;
734 }
735
736 /**
737 * {@inheritDoc}
738 */
739 @Override
740 public void updateMonitorData()
741 {
742 // As long as getUpdateInterval() returns 0, this will never get called
743
744 }
745
746 /**
747 * {@inheritDoc}
748 */
749 @Override
750 public ArrayList<Attribute> getMonitorData()
751 {
752 /*
753 * publish the server id and the port number.
754 */
755 ArrayList<Attribute> attributes = new ArrayList<Attribute>();
756 attributes.add(new Attribute("replication server id",
757 String.valueOf(serverId)));
758 attributes.add(new Attribute("replication server port",
759 String.valueOf(replicationPort)));
760
761 /*
762 * Add all the base DNs that are known by this replication server.
763 */
764 AttributeType baseType=
765 DirectoryServer.getAttributeType("base-dn", true);
766 LinkedHashSet<AttributeValue> baseValues =
767 new LinkedHashSet<AttributeValue>();
768 for (DN base : baseDNs.keySet())
769 {
770 baseValues.add(new AttributeValue(baseType, base. toString()));
771 }
772
773 Attribute bases = new Attribute(baseType, "base-dn", baseValues);
774 attributes.add(bases);
775
776 // Publish to monitor the generation ID by replicationServerDomain
777 AttributeType generationIdType=
778 DirectoryServer.getAttributeType("base-dn-generation-id", true);
779 LinkedHashSet<AttributeValue> generationIdValues =
780 new LinkedHashSet<AttributeValue>();
781 for (DN base : baseDNs.keySet())
782 {
783 long generationId=-1;
784 ReplicationServerDomain replicationServerDomain =
785 getReplicationServerDomain(base, false);
786 if (replicationServerDomain != null)
787 generationId = replicationServerDomain.getGenerationId();
788 generationIdValues.add(new AttributeValue(generationIdType,
789 base.toString() + " " + generationId));
790 }
791 Attribute generationIds = new Attribute(generationIdType, "generation-id",
792 generationIdValues);
793 attributes.add(generationIds);
794
795 return attributes;
796 }
797
798 /**
799 * Get the value of generationId for the replication replicationServerDomain
800 * associated with the provided baseDN.
801 *
802 * @param baseDN The baseDN of the replicationServerDomain.
803 * @return The value of the generationID.
804 */
805 public long getGenerationId(DN baseDN)
806 {
807 ReplicationServerDomain rsd =
808 this.getReplicationServerDomain(baseDN, false);
809 if (rsd!=null)
810 return rsd.getGenerationId();
811 return -1;
812 }
813
814 /**
815 * Get the serverId for this replication server.
816 *
817 * @return The value of the serverId.
818 *
819 */
820 public short getServerId()
821 {
822 return serverId;
823 }
824
825 /**
826 * Creates the backend associated to this replication server.
827 * @throws ConfigException
828 */
829 private void createBackend()
830 throws ConfigException
831 {
832 try
833 {
834 String ldif = makeLdif(
835 "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config",
836 "objectClass: top",
837 "objectClass: ds-cfg-backend",
838 "ds-cfg-base-dn: dc="+backendId,
839 "ds-cfg-enabled: true",
840 "ds-cfg-writability-mode: enabled",
841 "ds-cfg-java-class: " +
842 "org.opends.server.replication.server.ReplicationBackend",
843 "ds-cfg-backend-id: " + backendId);
844
845 LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
846 new StringReader(ldif));
847 LDIFReader reader = new LDIFReader(ldifImportConfig);
848 Entry backendConfigEntry = reader.readEntry();
849 if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
850 {
851 // Add the replication backend
852 DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
853 }
854 ldifImportConfig.close();
855 }
856 catch(Exception e)
857 {
858 MessageBuilder mb = new MessageBuilder();
859 mb.append(e.getLocalizedMessage());
860 Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString());
861 throw new ConfigException(msg, e);
862
863 }
864 }
865
866 private static String makeLdif(String... lines)
867 {
868 StringBuilder buffer = new StringBuilder();
869 for (String line : lines) {
870 buffer.append(line).append(EOL);
871 }
872 // Append an extra line so we can append LDIF Strings.
873 buffer.append(EOL);
874 return buffer.toString();
875 }
876
877 /**
878 * Do what needed when the config object related to this replication server
879 * is deleted from the server configuration.
880 */
881 public void remove()
882 {
883 if (debugEnabled())
884 TRACER.debugInfo("RS " +getMonitorInstanceName()+
885 " starts removing");
886
887 shutdown();
888 removeBackend();
889
890 DirectoryServer.deregisterBackupTaskListener(this);
891 DirectoryServer.deregisterRestoreTaskListener(this);
892 DirectoryServer.deregisterExportTaskListener(this);
893 DirectoryServer.deregisterImportTaskListener(this);
894 }
895
896 /**
897 * Removes the backend associated to this Replication Server that has been
898 * created when this replication server was created.
899 */
900 protected void removeBackend()
901 {
902 try
903 {
904 if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
905 {
906 // Delete the replication backend
907 DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
908 null);
909 }
910 }
911 catch(Exception e)
912 {
913 MessageBuilder mb = new MessageBuilder();
914 mb.append(e.getLocalizedMessage());
915 Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString());
916 logError(msg);
917 }
918 }
919 /**
920 * {@inheritDoc}
921 */
922 public void processBackupBegin(Backend backend, BackupConfig config)
923 {
924 // Nothing is needed at the moment
925 }
926
927 /**
928 * {@inheritDoc}
929 */
930 public void processBackupEnd(Backend backend, BackupConfig config,
931 boolean successful)
932 {
933 // Nothing is needed at the moment
934 }
935
936 /**
937 * {@inheritDoc}
938 */
939 public void processRestoreBegin(Backend backend, RestoreConfig config)
940 {
941 if (backend.getBackendID().equals(backendId))
942 shutdown();
943 }
944
945 /**
946 * {@inheritDoc}
947 */
948 public void processRestoreEnd(Backend backend, RestoreConfig config,
949 boolean successful)
950 {
951 if (backend.getBackendID().equals(backendId))
952 initialize(this.replicationServerId, this.replicationPort);
953 }
954
955 /**
956 * {@inheritDoc}
957 */
958 public void processImportBegin(Backend backend, LDIFImportConfig config)
959 {
960 // Nothing is needed at the moment
961 }
962
963 /**
964 * {@inheritDoc}
965 */
966 public void processImportEnd(Backend backend, LDIFImportConfig config,
967 boolean successful)
968 {
969 // Nothing is needed at the moment
970 }
971
972 /**
973 * {@inheritDoc}
974 */
975 public void processExportBegin(Backend backend, LDIFExportConfig config)
976 {
977 if (debugEnabled())
978 TRACER.debugInfo("RS " +getMonitorInstanceName()+
979 " Export starts");
980 if (backend.getBackendID().equals(backendId))
981 {
982 // Retrieves the backend related to this replicationServerDomain
983 // backend =
984 ReplicationBackend b =
985 (ReplicationBackend)DirectoryServer.getBackend(backendId);
986 b.setServer(this);
987 }
988 }
989
990 /**
991 * {@inheritDoc}
992 */
993 public void processExportEnd(Backend backend, LDIFExportConfig config,
994 boolean successful)
995 {
996 // Nothing is needed at the moment
997 }
998
999 /**
1000 * Returns an iterator on the list of replicationServerDomain.
1001 * Returns null if none.
1002 * @return the iterator.
1003 */
1004 public Iterator<ReplicationServerDomain> getCacheIterator()
1005 {
1006 if (!baseDNs.isEmpty())
1007 return baseDNs.values().iterator();
1008 else
1009 return null;
1010 }
1011
1012 /**
1013 * Clears the Db associated with that server.
1014 */
1015 public void clearDb()
1016 {
1017 Iterator<ReplicationServerDomain> rcachei = getCacheIterator();
1018 if (rcachei != null)
1019 {
1020 while (rcachei.hasNext())
1021 {
1022 ReplicationServerDomain rsd = rcachei.next();
1023 rsd.clearDbs();
1024 }
1025 }
1026 }
1027 }