001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2006-2008 Sun Microsystems, Inc.
026 */
027 package org.opends.server.replication.server;
028 import org.opends.messages.*;
029
030 import static org.opends.server.loggers.ErrorLogger.logError;
031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033 import org.opends.server.loggers.debug.DebugTracer;
034 import static org.opends.messages.ReplicationMessages.*;
035 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
036
037 import java.io.File;
038 import java.io.UnsupportedEncodingException;
039
040 import org.opends.server.types.DN;
041 import org.opends.server.types.DirectoryException;
042
043 import com.sleepycat.je.Cursor;
044 import com.sleepycat.je.Database;
045 import com.sleepycat.je.DatabaseConfig;
046 import com.sleepycat.je.DatabaseEntry;
047 import com.sleepycat.je.DatabaseException;
048 import com.sleepycat.je.Environment;
049 import com.sleepycat.je.EnvironmentConfig;
050 import com.sleepycat.je.LockMode;
051 import com.sleepycat.je.OperationStatus;
052 import com.sleepycat.je.Transaction;
053
054 /**
055 * This class is used to represent a Db environement that can be used
056 * to create ReplicationDB.
057 */
058 public class ReplicationDbEnv
059 {
060 private Environment dbEnvironment = null;
061 private Database stateDb = null;
062 private ReplicationServer replicationServer = null;
063 private static final String GENERATION_ID_TAG = "GENID";
064 private static final String FIELD_SEPARATOR = " ";
065 /**
066 * The tracer object for the debug logger.
067 */
068 private static final DebugTracer TRACER = getTracer();
069
070 /**
071 * Initialize this class.
072 * Creates Db environment that will be used to create databases.
073 * It also reads the currently known databases from the "changelogstate"
074 * database.
075 * @param path Path where the backing files must be created.
076 * @param replicationServer the ReplicationServer that creates this
077 * ReplicationDbEnv.
078 * @throws DatabaseException If a DatabaseException occurred that prevented
079 * the initialization to happen.
080 * @throws ReplicationDBException If a replicationServer internal error caused
081 * a failure of the replicationServer processing.
082 */
083 public ReplicationDbEnv(String path, ReplicationServer replicationServer)
084 throws DatabaseException, ReplicationDBException
085 {
086 this.replicationServer = replicationServer;
087 EnvironmentConfig envConfig = new EnvironmentConfig();
088
089 /* Create the DB Environment that will be used for all
090 * the ReplicationServer activities related to the db
091 */
092 envConfig.setAllowCreate(true);
093 envConfig.setTransactional(true);
094 envConfig.setConfigParam("je.cleaner.expunge", "true");
095 // TODO : the DB cache size should be configurable
096 // For now set 5M is OK for being efficient in 64M total for the JVM
097 envConfig.setConfigParam("je.maxMemory", "5000000");
098 dbEnvironment = new Environment(new File(path), envConfig);
099
100 /*
101 * One database is created to store the update from each LDAP
102 * server in the topology.
103 * The database "changelogstate" is used to store the list of all
104 * the servers that have been seen in the past.
105 */
106 DatabaseConfig dbConfig = new DatabaseConfig();
107 dbConfig.setAllowCreate(true);
108 dbConfig.setTransactional(true);
109
110 stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
111 start();
112
113 }
114
115 /**
116 * Read the list of known servers from the database and start dbHandler
117 * for each of them.
118 *
119 * @throws DatabaseException in case of underlying DatabaseException
120 * @throws ReplicationDBException when the information from the database
121 * cannot be decoded correctly.
122 */
123 private void start() throws DatabaseException, ReplicationDBException
124 {
125 Cursor cursor = stateDb.openCursor(null, null);
126 DatabaseEntry key = new DatabaseEntry();
127 DatabaseEntry data = new DatabaseEntry();
128
129 try
130 {
131 /*
132 * Get the domain base DN/ generationIDs records
133 */
134 OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
135 while (status == OperationStatus.SUCCESS)
136 {
137 try
138 {
139 String stringData = new String(data.getData(), "UTF-8");
140
141 if (debugEnabled())
142 TRACER.debugInfo(
143 "In " + this.replicationServer.getMonitorInstanceName() +
144 " Read tag baseDn generationId=" + stringData);
145
146 String[] str = stringData.split(FIELD_SEPARATOR, 3);
147 if (str[0].equals(GENERATION_ID_TAG))
148 {
149 long generationId=-1;
150
151 DN baseDn;
152
153 try
154 {
155 // <generationId>
156 generationId = new Long(str[1]);
157 }
158 catch (NumberFormatException e)
159 {
160 // should never happen
161 // TODO: i18n
162 throw new ReplicationDBException(Message.raw(
163 "replicationServer state database has a wrong format: " +
164 e.getLocalizedMessage()
165 + "<" + str[1] + ">"));
166 }
167
168 // <baseDn>
169 baseDn = null;
170 try
171 {
172 baseDn = DN.decode(str[2]);
173 } catch (DirectoryException e)
174 {
175 Message message =
176 ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
177 logError(message);
178
179 }
180
181 if (debugEnabled())
182 TRACER.debugInfo(
183 "In " + this.replicationServer.getMonitorInstanceName() +
184 " Has read baseDn=" + baseDn
185 + " generationId=" + generationId);
186
187 replicationServer.getReplicationServerDomain(baseDn, true).
188 setGenerationId(generationId, true);
189 }
190 }
191 catch (UnsupportedEncodingException e)
192 {
193 // should never happens
194 // TODO: i18n
195 throw new ReplicationDBException(Message.raw("need UTF-8 support"));
196 }
197 status = cursor.getNext(key, data, LockMode.DEFAULT);
198 }
199
200 /*
201 * Get the server Id / domain base DN records
202 */
203 status = cursor.getFirst(key, data, LockMode.DEFAULT);
204 while (status == OperationStatus.SUCCESS)
205 {
206 String stringData = null;
207 try
208 {
209 stringData = new String(data.getData(), "UTF-8");
210 }
211 catch (UnsupportedEncodingException e)
212 {
213 // should never happens
214 // TODO: i18n
215 throw new ReplicationDBException(Message.raw(
216 "need UTF-8 support"));
217 }
218
219 if (debugEnabled())
220 TRACER.debugInfo(
221 "In " + this.replicationServer.getMonitorInstanceName() +
222 " Read serverId BaseDN=" + stringData);
223
224 String[] str = stringData.split(FIELD_SEPARATOR, 2);
225 if (!str[0].equals(GENERATION_ID_TAG))
226 {
227 short serverId = -1;
228 try
229 {
230 // <serverId>
231 serverId = new Short(str[0]);
232 } catch (NumberFormatException e)
233 {
234 // should never happen
235 // TODO: i18n
236 throw new ReplicationDBException(Message.raw(
237 "replicationServer state database has a wrong format: " +
238 e.getLocalizedMessage()
239 + "<" + str[0] + ">"));
240 }
241 // <baseDn>
242 DN baseDn = null;
243 try
244 {
245 baseDn = DN.decode(str[1]);
246 } catch (DirectoryException e)
247 {
248 Message message =
249 ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
250 logError(message);
251 }
252
253 if (debugEnabled())
254 TRACER.debugInfo(
255 "In " + this.replicationServer.getMonitorInstanceName() +
256 " Has read: baseDn=" + baseDn
257 + " serverId=" + serverId);
258
259 DbHandler dbHandler =
260 new DbHandler(serverId, baseDn, replicationServer, this);
261
262 replicationServer.getReplicationServerDomain(baseDn, true).
263 setDbHandler(serverId, dbHandler);
264 }
265
266 status = cursor.getNext(key, data, LockMode.DEFAULT);
267 }
268 cursor.close();
269
270 }
271 catch (DatabaseException dbe)
272 {
273 cursor.close();
274 throw dbe;
275 }
276 }
277
278 /**
279 * Finds or creates the database used to store changes from the server
280 * with the given serverId and the given baseDn.
281 *
282 * @param serverId The server id that identifies the server.
283 * @param baseDn The baseDn that identifies the domain.
284 * @param generationId The generationId associated to this domain.
285 * @return the Database.
286 * @throws DatabaseException in case of underlying Exception.
287 */
288 public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
289 throws DatabaseException
290 {
291 if (debugEnabled())
292 TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
293 serverId + " " + baseDn + " " + generationId);
294 try
295 {
296 String stringId = serverId.toString() + FIELD_SEPARATOR
297 + baseDn.toNormalizedString();
298
299 // Opens the database for the changes received from this server
300 // on this domain. Create it if it does not already exist.
301 DatabaseConfig dbConfig = new DatabaseConfig();
302 dbConfig.setAllowCreate(true);
303 dbConfig.setTransactional(true);
304 Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
305
306 // Creates the record serverId/domain base Dn in the stateDb
307 // if it does not already exist.
308 byte[] byteId;
309 byteId = stringId.getBytes("UTF-8");
310 DatabaseEntry key = new DatabaseEntry();
311 key.setData(byteId);
312 DatabaseEntry data = new DatabaseEntry();
313 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
314 if (status == OperationStatus.NOTFOUND)
315 {
316 Transaction txn = dbEnvironment.beginTransaction(null, null);
317 try {
318 data.setData(byteId);
319 if (debugEnabled())
320 TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
321 " serverId/Domain=<"+stringId+">");
322 stateDb.put(txn, key, data);
323 txn.commitWriteNoSync();
324 } catch (DatabaseException dbe)
325 {
326 // Abort the txn and propagate the Exception to the caller
327 txn.abort();
328 throw dbe;
329 }
330 }
331
332 // Creates the record domain base Dn/ generationId in the stateDb
333 // if it does not already exist.
334 stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
335 baseDn.toNormalizedString();
336 String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
337 generationId.toString() + FIELD_SEPARATOR +
338 baseDn.toNormalizedString();
339 byteId = stringId.getBytes("UTF-8");
340 byte[] dataByteId;
341 dataByteId = dataStringId.getBytes("UTF-8");
342 key = new DatabaseEntry();
343 key.setData(byteId);
344 data = new DatabaseEntry();
345 status = stateDb.get(null, key, data, LockMode.DEFAULT);
346 if (status == OperationStatus.NOTFOUND)
347 {
348 Transaction txn = dbEnvironment.beginTransaction(null, null);
349 try {
350 data.setData(dataByteId);
351 if (debugEnabled())
352 TRACER.debugInfo(
353 "Created in the state Db record Tag/Domain/GenId key=" +
354 stringId + " value=" + dataStringId);
355 stateDb.put(txn, key, data);
356 txn.commitWriteNoSync();
357 } catch (DatabaseException dbe)
358 {
359 // Abort the txn and propagate the Exception to the caller
360 txn.abort();
361 throw dbe;
362 }
363 }
364 return db;
365 }
366 catch (UnsupportedEncodingException e)
367 {
368 // can't happen
369 return null;
370 }
371 }
372
373 /**
374 * Creates a new transaction.
375 *
376 * @return the transaction.
377 * @throws DatabaseException in case of underlying database Exception.
378 */
379 public Transaction beginTransaction() throws DatabaseException
380 {
381 return dbEnvironment.beginTransaction(null, null);
382 }
383
384 /**
385 * Shutdown the Db environment.
386 */
387 public void shutdown()
388 {
389 try
390 {
391 stateDb.close();
392 dbEnvironment.close();
393 } catch (DatabaseException e)
394 {
395 MessageBuilder mb = new MessageBuilder();
396 mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
397 mb.append(stackTraceToSingleLineString(e));
398 logError(mb.toMessage());
399 }
400 }
401
402 /**
403 * Clears the provided generationId associated to the provided baseDn
404 * from the state Db.
405 *
406 * @param baseDn The baseDn for which the generationID must be cleared.
407 *
408 */
409 public void clearGenerationId(DN baseDn)
410 {
411 if (debugEnabled())
412 TRACER.debugInfo(
413 "In " + this.replicationServer.getMonitorInstanceName() +
414 " clearGenerationId " + baseDn);
415 try
416 {
417 // Deletes the record domain base Dn/ generationId in the stateDb
418 String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
419 baseDn.toNormalizedString();
420 byte[] byteId = stringId.getBytes("UTF-8");
421 DatabaseEntry key = new DatabaseEntry();
422 key.setData(byteId);
423 DatabaseEntry data = new DatabaseEntry();
424 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
425 if ((status == OperationStatus.SUCCESS) ||
426 (status == OperationStatus.KEYEXIST))
427 {
428 Transaction txn = dbEnvironment.beginTransaction(null, null);
429 try
430 {
431 stateDb.delete(txn, key);
432 txn.commitWriteNoSync();
433 if (debugEnabled())
434 TRACER.debugInfo(
435 "In " + this.replicationServer.getMonitorInstanceName() +
436 " clearGenerationId (" +
437 baseDn +") succeeded.");
438 }
439 catch (DatabaseException dbe)
440 {
441 // Abort the txn and propagate the Exception to the caller
442 txn.abort();
443 throw dbe;
444 }
445 }
446 else
447 {
448 // TODO : should have a better error logging
449 if (debugEnabled())
450 TRACER.debugInfo(
451 "In " + this.replicationServer.getMonitorInstanceName() +
452 " clearGenerationId ("+ baseDn + " failed" + status.toString());
453 }
454 }
455 catch (UnsupportedEncodingException e)
456 {
457 // can't happen
458 }
459 catch (DatabaseException dbe)
460 {
461 // can't happen
462 }
463 }
464
465 /**
466 * Clears the provided serverId associated to the provided baseDn
467 * from the state Db.
468 *
469 * @param baseDn The baseDn for which the generationID must be cleared.
470 * @param serverId The serverId to remove from the Db.
471 *
472 */
473 public void clearServerId(DN baseDn, Short serverId)
474 {
475 if (debugEnabled())
476 TRACER.debugInfo(
477 "In " + this.replicationServer.getMonitorInstanceName() +
478 "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
479 try
480 {
481 String stringId = serverId.toString() + FIELD_SEPARATOR
482 + baseDn.toNormalizedString();
483
484 // Deletes the record serverId/domain base Dn in the stateDb
485 byte[] byteId;
486 byteId = stringId.getBytes("UTF-8");
487 DatabaseEntry key = new DatabaseEntry();
488 key.setData(byteId);
489 DatabaseEntry data = new DatabaseEntry();
490 OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
491 if (status != OperationStatus.NOTFOUND)
492 {
493 Transaction txn = dbEnvironment.beginTransaction(null, null);
494 try {
495 data.setData(byteId);
496 stateDb.delete(txn, key);
497 txn.commitWriteNoSync();
498 if (debugEnabled())
499 TRACER.debugInfo(
500 " In " + this.replicationServer.getMonitorInstanceName() +
501 " clearServerId() succeeded " + baseDn + " " +
502 serverId);
503 }
504 catch (DatabaseException dbe)
505 {
506 // Abort the txn and propagate the Exception to the caller
507 txn.abort();
508 throw dbe;
509 }
510 }
511 }
512 catch (UnsupportedEncodingException e)
513 {
514 // can't happen
515 }
516 catch (DatabaseException dbe)
517 {
518 // can't happen
519 }
520 }
521
522 /**
523 * Clears the database.
524 *
525 * @param databaseName The name of the database to clear.
526 */
527 public final void clearDb(String databaseName)
528 {
529 Transaction txn = null;
530 try
531 {
532 txn = dbEnvironment.beginTransaction(null, null);
533 dbEnvironment.truncateDatabase(txn, databaseName, false);
534 txn.commitWriteNoSync();
535 txn = null;
536 }
537 catch (DatabaseException e)
538 {
539 MessageBuilder mb = new MessageBuilder();
540 mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
541 e.getMessage() + " " +
542 stackTraceToSingleLineString(e)));
543 logError(mb.toMessage());
544 }
545 finally
546 {
547 try
548 {
549 if (txn != null)
550 txn.abort();
551 }
552 catch(Exception e)
553 {}
554 }
555 }
556 }