View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import javax.crypto.Cipher;
37  import javax.crypto.SecretKey;
38  
39  import org.apache.flume.Event;
40  import org.apache.flume.event.SimpleEvent;
41  import org.apache.logging.log4j.LoggingException;
42  import org.apache.logging.log4j.core.appender.ManagerFactory;
43  import org.apache.logging.log4j.core.config.Property;
44  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46  import org.apache.logging.log4j.core.util.FileUtils;
47  import org.apache.logging.log4j.core.util.SecretKeyProvider;
48  import org.apache.logging.log4j.util.Strings;
49  
50  import com.sleepycat.je.Cursor;
51  import com.sleepycat.je.CursorConfig;
52  import com.sleepycat.je.Database;
53  import com.sleepycat.je.DatabaseConfig;
54  import com.sleepycat.je.DatabaseEntry;
55  import com.sleepycat.je.Environment;
56  import com.sleepycat.je.EnvironmentConfig;
57  import com.sleepycat.je.LockConflictException;
58  import com.sleepycat.je.LockMode;
59  import com.sleepycat.je.OperationStatus;
60  import com.sleepycat.je.StatsConfig;
61  import com.sleepycat.je.Transaction;
62  
63  /**
64   * Manager that persists data to Berkeley DB before passing it on to Flume.
65   */
66  public class FlumePersistentManager extends FlumeAvroManager {
67  
68      /** Attribute name for the key provider. */
69      public static final String KEY_PROVIDER = "keyProvider";
70  
71      private static final Charset UTF8 = Charset.forName("UTF-8");
72  
73      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74  
75      private static final int SHUTDOWN_WAIT = 60;
76  
77      private static final int MILLIS_PER_SECOND = 1000;
78  
79      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80  
81      private static BDBManagerFactory factory = new BDBManagerFactory();
82  
83      private final Database database;
84  
85      private final Environment environment;
86  
87      private final WriterThread worker;
88  
89      private final Gate gate = new Gate();
90  
91      private final SecretKey secretKey;
92  
93      private final int delayMillis;
94  
95      private final int lockTimeoutRetryCount;
96  
97      private final ExecutorService threadPool;
98  
99      private final AtomicLong dbCount = new AtomicLong();
100 
101     /**
102      * Constructor
103      * @param name The unique name of this manager.
104      * @param shortName Original name for the Manager.
105      * @param agents An array of Agents.
106      * @param batchSize The number of events to include in a batch.
107      * @param retries The number of times to retry connecting before giving up.
108      * @param connectionTimeout The amount of time to wait for a connection to be established.
109      * @param requestTimeout The amount of time to wair for a response to a request.
110      * @param delay The amount of time to wait between retries.
111      * @param database The database to write to.
112      * @param environment The database environment.
113      * @param secretKey The SecretKey to use for encryption.
114      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115      */
116     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                      final int batchSize, final int retries, final int connectionTimeout,
118                                      final int requestTimeout, final int delay, final Database database,
119                                      final Environment environment, final SecretKey secretKey,
120                                      final int lockTimeoutRetryCount) {
121         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122         this.delayMillis = delay;
123         this.database = database;
124         this.environment = environment;
125         dbCount.set(database.count());
126         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127             lockTimeoutRetryCount);
128         this.worker.start();
129         this.secretKey = secretKey;
130         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132     }
133 
134 
135     /**
136      * Returns a FlumeAvroManager.
137      * @param name The name of the manager.
138      * @param agents The agents to use.
139      * @param properties Properties to pass to the Manager.
140      * @param batchSize The number of events to include in a batch.
141      * @param retries The number of times to retry connecting before giving up.
142      * @param connectionTimeout The amount of time to wait to establish a connection.
143      * @param requestTimeout The amount of time to wait for a response to a request.
144      * @param delayMillis Amount of time to delay before delivering a batch.
145      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
146      * @param dataDir The location of the Berkeley database.
147      * @return A FlumeAvroManager.
148      */
149     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150                                                     final Property[] properties, int batchSize, final int retries,
151                                                     final int connectionTimeout, final int requestTimeout,
152                                                     final int delayMillis, final int lockTimeoutRetryCount,
153                                                     final String dataDir) {
154         if (agents == null || agents.length == 0) {
155             throw new IllegalArgumentException("At least one agent is required");
156         }
157 
158         if (batchSize <= 0) {
159             batchSize = 1;
160         }
161         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162 
163         final StringBuilder sb = new StringBuilder("FlumePersistent[");
164         boolean first = true;
165         for (final Agent agent : agents) {
166             if (!first) {
167                 sb.append(',');
168             }
169             sb.append(agent.getHost()).append(':').append(agent.getPort());
170             first = false;
171         }
172         sb.append(']');
173         sb.append(' ').append(dataDirectory);
174         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
176     }
177 
178     @Override
179     public void send(final Event event)  {
180         if (worker.isShutdown()) {
181             throw new LoggingException("Unable to record event");
182         }
183 
184         final Map<String, String> headers = event.getHeaders();
185         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186         try {
187             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188             final DataOutputStream daos = new DataOutputStream(baos);
189             daos.writeInt(event.getBody().length);
190             daos.write(event.getBody(), 0, event.getBody().length);
191             daos.writeInt(event.getHeaders().size());
192             for (final Map.Entry<String, String> entry : headers.entrySet()) {
193                 daos.writeUTF(entry.getKey());
194                 daos.writeUTF(entry.getValue());
195             }
196             byte[] eventData = baos.toByteArray();
197             if (secretKey != null) {
198                 final Cipher cipher = Cipher.getInstance("AES");
199                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200                 eventData = cipher.doFinal(eventData);
201             }
202             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204             boolean interrupted = false;
205             int count = 0;
206             do {
207                 try {
208                     future.get();
209                 } catch (final InterruptedException ie) {
210                     interrupted = true;
211                     ++count;
212                 }
213             } while (interrupted && count <= 1);
214 
215         } catch (final Exception ex) {
216             throw new LoggingException("Exception occurred writing log event", ex);
217         }
218     }
219 
220     @Override
221     protected void releaseSub() {
222         LOGGER.debug("Shutting down FlumePersistentManager");
223         worker.shutdown();
224         try {
225             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226         } catch (final InterruptedException ie) {
227             // Ignore the exception and shutdown.
228         }
229         threadPool.shutdown();
230         try {
231             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232         } catch (final InterruptedException ie) {
233             LOGGER.warn("PersistentManager Thread pool failed to shut down");
234         }
235         try {
236             worker.join();
237         } catch (final InterruptedException ex) {
238             LOGGER.debug("Interrupted while waiting for worker to complete");
239         }
240         try {
241             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242             database.close();
243         } catch (final Exception ex) {
244             LOGGER.warn("Failed to close database", ex);
245         }
246         try {
247             environment.cleanLog();
248             environment.close();
249         } catch (final Exception ex) {
250             LOGGER.warn("Failed to close environment", ex);
251         }
252         super.releaseSub();
253     }
254 
255     private void doSend(final SimpleEvent event) {
256         LOGGER.debug("Sending event to Flume");
257         super.send(event);
258     }
259 
260     /**
261      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
262      */
263     private static class BDBWriter implements Callable<Integer> {
264         private final byte[] eventData;
265         private final byte[] keyData;
266         private final Environment environment;
267         private final Database database;
268         private final Gate gate;
269         private final AtomicLong dbCount;
270         private final long batchSize;
271         private final int lockTimeoutRetryCount;
272 
273         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275                          final int lockTimeoutRetryCount) {
276             this.keyData = keyData;
277             this.eventData = eventData;
278             this.environment = environment;
279             this.database = database;
280             this.gate = gate;
281             this.dbCount = dbCount;
282             this.batchSize = batchSize;
283             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284         }
285 
286         @Override
287         public Integer call() throws Exception {
288             final DatabaseEntry key = new DatabaseEntry(keyData);
289             final DatabaseEntry data = new DatabaseEntry(eventData);
290             Exception exception = null;
291             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292                 Transaction txn = null;
293                 try {
294                     txn = environment.beginTransaction(null, null);
295                     try {
296                         database.put(txn, key, data);
297                         txn.commit();
298                         txn = null;
299                         if (dbCount.incrementAndGet() >= batchSize) {
300                             gate.open();
301                         }
302                         exception = null;
303                         break;
304                     } catch (final LockConflictException lce) {
305                         exception = lce;
306                         // Fall through and retry.
307                     } catch (final Exception ex) {
308                         if (txn != null) {
309                             txn.abort();
310                         }
311                         throw ex;
312                     } finally {
313                         if (txn != null) {
314                             txn.abort();
315                             txn = null;
316                         }
317                     }
318                 } catch (final LockConflictException lce) {
319                     exception = lce;
320                     if (txn != null) {
321                         try {
322                             txn.abort();
323                             txn = null;
324                         } catch (final Exception ex) {
325                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326                         }
327                     }
328 
329                 }
330                 try {
331                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332                 } catch (final InterruptedException ie) {
333                     // Ignore the error
334                 }
335             }
336             if (exception != null) {
337                 throw exception;
338             }
339             return eventData.length;
340         }
341     }
342 
343     /**
344      * Factory data.
345      */
346     private static class FactoryData {
347         private final String name;
348         private final Agent[] agents;
349         private final int batchSize;
350         private final String dataDir;
351         private final int retries;
352         private final int connectionTimeout;
353         private final int requestTimeout;
354         private final int delayMillis;
355         private final int lockTimeoutRetryCount;
356         private final Property[] properties;
357 
358         /**
359          * Constructor.
360          * @param name The name of the Appender.
361          * @param agents The agents.
362          * @param batchSize The number of events to include in a batch.
363          * @param dataDir The directory for data.
364          */
365         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
367                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368             this.name = name;
369             this.agents = agents;
370             this.batchSize = batchSize;
371             this.dataDir = dataDir;
372             this.retries = retries;
373             this.connectionTimeout = connectionTimeout;
374             this.requestTimeout = requestTimeout;
375             this.delayMillis = delayMillis;
376             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377             this.properties = properties;
378         }
379     }
380 
381     /**
382      * Avro Manager Factory.
383      */
384     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385 
386         /**
387          * Create the FlumeKratiManager.
388          * @param name The name of the entity to manage.
389          * @param data The data required to create the entity.
390          * @return The FlumeKratiManager.
391          */
392         @Override
393         public FlumePersistentManager createManager(final String name, final FactoryData data) {
394             SecretKey secretKey = null;
395             Database database = null;
396             Environment environment = null;
397 
398             final Map<String, String> properties = new HashMap<String, String>();
399             if (data.properties != null) {
400                 for (final Property property : data.properties) {
401                     properties.put(property.getName(), property.getValue());
402                 }
403             }
404 
405             try {
406                 final File dir = new File(data.dataDir);
407                 FileUtils.mkdir(dir, true);
408                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409                 dbEnvConfig.setTransactional(true);
410                 dbEnvConfig.setAllowCreate(true);
411                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412                 environment = new Environment(dir, dbEnvConfig);
413                 final DatabaseConfig dbConfig = new DatabaseConfig();
414                 dbConfig.setTransactional(true);
415                 dbConfig.setAllowCreate(true);
416                 database = environment.openDatabase(null, name, dbConfig);
417             } catch (final Exception ex) {
418                 LOGGER.error("Could not create FlumePersistentManager", ex);
419                 // For consistency, close database as well as environment even though it should never happen since the
420                 // database is that last thing in the block above, but this does guard against a future line being
421                 // inserted at the end that would bomb (like some debug logging).
422                 if (database != null) {
423                     database.close();
424                     database = null;
425                 }
426                 if (environment != null) {
427                     environment.close();
428                     environment = null;
429                 }
430                 return null;
431             }
432 
433             try {
434                 String key = null;
435                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
436                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437                         key = entry.getValue();
438                         break;
439                     }
440                 }
441                 if (key != null) {
442                     final PluginManager manager = new PluginManager("KeyProvider");
443                     manager.collectPlugins();
444                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
445                     if (plugins != null) {
446                         boolean found = false;
447                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448                             if (entry.getKey().equalsIgnoreCase(key)) {
449                                 found = true;
450                                 final Class<?> cl = entry.getValue().getPluginClass();
451                                 try {
452                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453                                     secretKey = provider.getSecretKey();
454                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455                                 } catch (final Exception ex) {
456                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457                                         cl.getName());
458                                 }
459                                 break;
460                             }
461                         }
462                         if (!found) {
463                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                         }
465                     } else {
466                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467                     }
468                 }
469             } catch (final Exception ex) {
470                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471             }
472             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
474                 data.lockTimeoutRetryCount);
475         }
476     }
477 
478     /**
479      * Thread that sends data to Flume and pulls it from Berkeley DB.
480      */
481     private static class WriterThread extends Thread  {
482         private volatile boolean shutdown = false;
483         private final Database database;
484         private final Environment environment;
485         private final FlumePersistentManager manager;
486         private final Gate gate;
487         private final SecretKey secretKey;
488         private final int batchSize;
489         private final AtomicLong dbCounter;
490         private final int lockTimeoutRetryCount;
491 
492         public WriterThread(final Database database, final Environment environment,
493                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
494                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495             this.database = database;
496             this.environment = environment;
497             this.manager = manager;
498             this.gate = gate;
499             this.batchSize = batchsize;
500             this.secretKey = secretKey;
501             this.setDaemon(true);
502             this.dbCounter = dbCount;
503             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504         }
505 
506         public void shutdown() {
507             LOGGER.debug("Writer thread shutting down");
508             this.shutdown = true;
509             gate.open();
510         }
511 
512         public boolean isShutdown() {
513             return shutdown;
514         }
515 
516         @Override
517         public void run() {
518             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis);
519             long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis;
520             while (!shutdown) {
521                 final long nowMillis = System.currentTimeMillis();
522                 final long dbCount = database.count();
523                 dbCounter.set(dbCount);
524                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
525                     nextBatchMillis = nowMillis + manager.delayMillis;
526                     try {
527                         boolean errors = false;
528                         final DatabaseEntry key = new DatabaseEntry();
529                         final DatabaseEntry data = new DatabaseEntry();
530 
531                         gate.close();
532                         OperationStatus status;
533                         if (batchSize > 1) {
534                             try {
535                                 errors = sendBatch(key, data);
536                             } catch (final Exception ex) {
537                                 break;
538                             }
539                         } else {
540                             Exception exception = null;
541                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542                                 exception = null;
543                                 Transaction txn = null;
544                                 Cursor cursor = null;
545                                 try {
546                                     txn = environment.beginTransaction(null, null);
547                                     cursor = database.openCursor(txn, null);
548                                     try {
549                                         status = cursor.getFirst(key, data, LockMode.RMW);
550                                         while (status == OperationStatus.SUCCESS) {
551                                             final SimpleEvent event = createEvent(data);
552                                             if (event != null) {
553                                                 try {
554                                                     manager.doSend(event);
555                                                 } catch (final Exception ioe) {
556                                                     errors = true;
557                                                     LOGGER.error("Error sending event", ioe);
558                                                     break;
559                                                 }
560                                                 try {
561                                                     cursor.delete();
562                                                 } catch (final Exception ex) {
563                                                     LOGGER.error("Unable to delete event", ex);
564                                                 }
565                                             }
566                                             status = cursor.getNext(key, data, LockMode.RMW);
567                                         }
568                                         if (cursor != null) {
569                                             cursor.close();
570                                             cursor = null;
571                                         }
572                                         txn.commit();
573                                         txn = null;
574                                         dbCounter.decrementAndGet();
575                                         exception = null;
576                                         break;
577                                     } catch (final LockConflictException lce) {
578                                         exception = lce;
579                                         // Fall through and retry.
580                                     } catch (final Exception ex) {
581                                         LOGGER.error("Error reading or writing to database", ex);
582                                         shutdown = true;
583                                         break;
584                                     } finally {
585                                         if (cursor != null) {
586                                             cursor.close();
587                                             cursor = null;
588                                         }
589                                         if (txn != null) {
590                                             txn.abort();
591                                             txn = null;
592                                         }
593                                     }
594                                 } catch (final LockConflictException lce) {
595                                     exception = lce;
596                                     if (cursor != null) {
597                                         try {
598                                             cursor.close();
599                                             cursor = null;
600                                         } catch (final Exception ex) {
601                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602                                         }
603                                     }
604                                     if (txn != null) {
605                                         try {
606                                             txn.abort();
607                                             txn = null;
608                                         } catch (final Exception ex) {
609                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610                                         }
611                                     }
612                                 }
613                                 try {
614                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615                                 } catch (final InterruptedException ie) {
616                                     // Ignore the error
617                                 }
618                             }
619                             if (exception != null) {
620                                 LOGGER.error("Unable to read or update data base", exception);
621                             }
622                         }
623                         if (errors) {
624                             Thread.sleep(manager.delayMillis);
625                             continue;
626                         }
627                     } catch (final Exception ex) {
628                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629                     }
630                 } else {
631                     if (nextBatchMillis <= nowMillis) {
632                         nextBatchMillis = nowMillis + manager.delayMillis;
633                     }
634                     try {
635                         final long interval = nextBatchMillis - nowMillis;
636                         gate.waitForOpen(interval);
637                     } catch (final InterruptedException ie) {
638                         LOGGER.warn("WriterThread interrupted, continuing");
639                     } catch (final Exception ex) {
640                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641                         break;
642                     }
643                 }
644             }
645 
646             if (batchSize > 1 && database.count() > 0) {
647                 final DatabaseEntry key = new DatabaseEntry();
648                 final DatabaseEntry data = new DatabaseEntry();
649                 try {
650                     sendBatch(key, data);
651                 } catch (final Exception ex) {
652                     LOGGER.warn("Unable to write final batch");
653                 }
654             }
655             LOGGER.trace("WriterThread exiting");
656         }
657 
658         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
659             boolean errors = false;
660             OperationStatus status;
661             Cursor cursor = null;
662             try {
663             	final BatchEvent batch = new BatchEvent();
664             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
665             		try {
666             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
667             			status = cursor.getFirst(key, data, null);
668 
669             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
670             				final SimpleEvent event = createEvent(data);
671             				if (event != null) {
672             					batch.addEvent(event);
673             				}
674             				status = cursor.getNext(key, data, null);
675             			}
676             			break;
677             		} catch (final LockConflictException lce) {
678             			if (cursor != null) {
679             				try {
680                                 cursor.close();
681                                 cursor = null;
682                             } catch (final Exception ex) {
683                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
684                             }
685                         }
686                     }
687             	}
688 
689                 try {
690                     manager.send(batch);
691                 } catch (final Exception ioe) {
692                     LOGGER.error("Error sending events", ioe);
693                     errors = true;
694                 }
695                 if (!errors) {
696                 	if (cursor != null) {
697 	                    cursor.close();
698 	                    cursor = null;
699                 	}
700                     Transaction txn = null;
701                     Exception exception = null;
702                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
703                         try {
704                             txn = environment.beginTransaction(null, null);
705                             try {
706                                 for (final Event event : batch.getEvents()) {
707                                     try {
708                                         final Map<String, String> headers = event.getHeaders();
709                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
710                                         database.delete(txn, key);
711                                     } catch (final Exception ex) {
712                                         LOGGER.error("Error deleting key from database", ex);
713                                     }
714                                 }
715                                 txn.commit();
716                                 long count = dbCounter.get();
717                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
718                                     count = dbCounter.get();
719                                 }
720                                 exception = null;
721                                 break;
722                             } catch (final LockConflictException lce) {
723                                 exception = lce;
724                                 if (cursor != null) {
725                                     try {
726                                         cursor.close();
727                                         cursor = null;
728                                     } catch (final Exception ex) {
729                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
730                                     }
731                                 }
732                                 if (txn != null) {
733                                     try {
734                                         txn.abort();
735                                         txn = null;
736                                     } catch (final Exception ex) {
737                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
738                                     }
739                                 }
740                             } catch (final Exception ex) {
741                                 LOGGER.error("Unable to commit transaction", ex);
742                                 if (txn != null) {
743                                     txn.abort();
744                                 }
745                             }
746                         } catch (final LockConflictException lce) {
747                             exception = lce;
748                             if (cursor != null) {
749                                 try {
750                                     cursor.close();
751                                     cursor = null;
752                                 } catch (final Exception ex) {
753                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
754                                 }
755                             }
756                             if (txn != null) {
757                                 try {
758                                     txn.abort();
759                                     txn = null;
760                                 } catch (final Exception ex) {
761                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
762                                 }
763                             }
764                         } finally {
765                             if (cursor != null) {
766                                 cursor.close();
767                                 cursor = null;
768                             }
769                             if (txn != null) {
770                                 txn.abort();
771                                 txn = null;
772                             }
773                         }
774                         try {
775                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
776                         } catch (final InterruptedException ie) {
777                             // Ignore the error
778                         }
779                     }
780                     if (exception != null) {
781                         LOGGER.error("Unable to delete events from data base", exception);
782                     }
783                 }
784             } catch (final Exception ex) {
785                 LOGGER.error("Error reading database", ex);
786                 shutdown = true;
787                 throw ex;
788             } finally {
789                 if (cursor != null) {
790                     cursor.close();
791                 }
792             }
793 
794             return errors;
795         }
796 
797         private SimpleEvent createEvent(final DatabaseEntry data) {
798             final SimpleEvent event = new SimpleEvent();
799             try {
800                 byte[] eventData = data.getData();
801                 if (secretKey != null) {
802                     final Cipher cipher = Cipher.getInstance("AES");
803                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
804                     eventData = cipher.doFinal(eventData);
805                 }
806                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
807                 final DataInputStream dais = new DataInputStream(bais);
808                 int length = dais.readInt();
809                 final byte[] bytes = new byte[length];
810                 dais.read(bytes, 0, length);
811                 event.setBody(bytes);
812                 length = dais.readInt();
813                 final Map<String, String> map = new HashMap<String, String>(length);
814                 for (int i = 0; i < length; ++i) {
815                     final String headerKey = dais.readUTF();
816                     final String value = dais.readUTF();
817                     map.put(headerKey, value);
818                 }
819                 event.setHeaders(map);
820                 return event;
821             } catch (final Exception ex) {
822                 LOGGER.error("Error retrieving event", ex);
823                 return null;
824             }
825         }
826 
827     }
828 
829     /**
830      * Factory that creates Daemon threads that can be properly shut down.
831      */
832     private static class DaemonThreadFactory implements ThreadFactory {
833         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
834         private final ThreadGroup group;
835         private final AtomicInteger threadNumber = new AtomicInteger(1);
836         private final String namePrefix;
837 
838         public DaemonThreadFactory() {
839             final SecurityManager securityManager = System.getSecurityManager();
840             group = securityManager != null ? securityManager.getThreadGroup() :
841                 Thread.currentThread().getThreadGroup();
842             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
843         }
844 
845         @Override
846         public Thread newThread(final Runnable r) {
847             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
848             thread.setDaemon(true);
849             if (thread.getPriority() != Thread.NORM_PRIORITY) {
850                 thread.setPriority(Thread.NORM_PRIORITY);
851             }
852             return thread;
853         }
854     }
855 
856     /**
857      * An internal class.
858      */
859     private static class Gate {
860 
861         private boolean isOpen = false;
862 
863         public boolean isOpen() {
864             return isOpen;
865         }
866 
867         public synchronized void open() {
868             isOpen = true;
869             notifyAll();
870         }
871 
872         public synchronized void close() {
873             isOpen = false;
874         }
875 
876         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
877             wait(timeout);
878         }
879     }
880 }