View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  import javax.crypto.Cipher;
36  import javax.crypto.SecretKey;
37  
38  import org.apache.flume.Event;
39  import org.apache.flume.event.SimpleEvent;
40  import org.apache.logging.log4j.LoggingException;
41  import org.apache.logging.log4j.core.appender.ManagerFactory;
42  import org.apache.logging.log4j.core.config.Property;
43  import org.apache.logging.log4j.core.config.plugins.PluginManager;
44  import org.apache.logging.log4j.core.config.plugins.PluginType;
45  import org.apache.logging.log4j.core.helpers.FileUtils;
46  import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
47  import org.apache.logging.log4j.core.helpers.Strings;
48  
49  import com.sleepycat.je.Cursor;
50  import com.sleepycat.je.CursorConfig;
51  import com.sleepycat.je.Database;
52  import com.sleepycat.je.DatabaseConfig;
53  import com.sleepycat.je.DatabaseEntry;
54  import com.sleepycat.je.Environment;
55  import com.sleepycat.je.EnvironmentConfig;
56  import com.sleepycat.je.LockConflictException;
57  import com.sleepycat.je.LockMode;
58  import com.sleepycat.je.OperationStatus;
59  import com.sleepycat.je.StatsConfig;
60  import com.sleepycat.je.Transaction;
61  
62  /**
63   * Manager that persists data to Berkeley DB before passing it on to Flume.
64   */
65  public class FlumePersistentManager extends FlumeAvroManager {
66  
67      /** Attribute name for the key provider. */
68      public static final String KEY_PROVIDER = "keyProvider";
69  
70      private static final Charset UTF8 = Charset.forName("UTF-8");
71  
72      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
73  
74      private static final int SHUTDOWN_WAIT = 60;
75  
76      private static final int MILLIS_PER_SECOND = 1000;
77  
78      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
79  
80      private static BDBManagerFactory factory = new BDBManagerFactory();
81  
82      private final Database database;
83  
84      private final Environment environment;
85  
86      private final WriterThread worker;
87  
88      private final Gate gate = new Gate();
89  
90      private final SecretKey secretKey;
91  
92      private final int delay;
93  
94      private final int lockTimeoutRetryCount;
95  
96      private final ExecutorService threadPool;
97  
98      private AtomicLong dbCount = new AtomicLong();
99  
100     /**
101      * Constructor
102      * @param name The unique name of this manager.
103      * @param shortName Original name for the Manager.
104      * @param agents An array of Agents.
105      * @param batchSize The number of events to include in a batch.
106      * @param retries The number of times to retry connecting before giving up.
107      * @param connectionTimeout The amount of time to wait for a connection to be established.
108      * @param requestTimeout The amount of time to wair for a response to a request.
109      * @param delay The amount of time to wait between retries.
110      * @param database The database to write to.
111      * @param environment The database environment.
112      * @param secretKey The SecretKey to use for encryption.
113      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
114      */
115     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116                                      final int batchSize, final int retries, final int connectionTimeout,
117                                      final int requestTimeout, final int delay, final Database database,
118                                      final Environment environment, final SecretKey secretKey,
119                                      final int lockTimeoutRetryCount) {
120         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
121         this.delay = delay;
122         this.database = database;
123         this.environment = environment;
124         dbCount.set(database.count());
125         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126             lockTimeoutRetryCount);
127         this.worker.start();
128         this.secretKey = secretKey;
129         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131     }
132 
133 
134     /**
135      * Returns a FlumeAvroManager.
136      * @param name The name of the manager.
137      * @param agents The agents to use.
138      * @param properties Properties to pass to the Manager.
139      * @param batchSize The number of events to include in a batch.
140      * @param retries The number of times to retry connecting before giving up.
141      * @param connectionTimeout The amount of time to wait to establish a connection.
142      * @param requestTimeout The amount of time to wait for a response to a request.
143      * @param delay Amount of time to delay before delivering a batch.
144      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
145      * @param dataDir The location of the Berkeley database.
146      * @return A FlumeAvroManager.
147      */
148     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149                                                     final Property[] properties, int batchSize, final int retries,
150                                                     final int connectionTimeout, final int requestTimeout,
151                                                     final int delay, final int lockTimeoutRetryCount,
152                                                     final String dataDir) {
153         if (agents == null || agents.length == 0) {
154             throw new IllegalArgumentException("At least one agent is required");
155         }
156 
157         if (batchSize <= 0) {
158             batchSize = 1;
159         }
160         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161 
162         final StringBuilder sb = new StringBuilder("FlumePersistent[");
163         boolean first = true;
164         for (final Agent agent : agents) {
165             if (!first) {
166                 sb.append(",");
167             }
168             sb.append(agent.getHost()).append(":").append(agent.getPort());
169             first = false;
170         }
171         sb.append("]");
172         sb.append(" ").append(dataDirectory);
173         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174             connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
175     }
176 
177     @Override
178     public void send(final Event event)  {
179         if (worker.isShutdown()) {
180             throw new LoggingException("Unable to record event");
181         }
182 
183         final Map<String, String> headers = event.getHeaders();
184         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185         try {
186             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187             final DataOutputStream daos = new DataOutputStream(baos);
188             daos.writeInt(event.getBody().length);
189             daos.write(event.getBody(), 0, event.getBody().length);
190             daos.writeInt(event.getHeaders().size());
191             for (final Map.Entry<String, String> entry : headers.entrySet()) {
192                 daos.writeUTF(entry.getKey());
193                 daos.writeUTF(entry.getValue());
194             }
195             byte[] eventData = baos.toByteArray();
196             if (secretKey != null) {
197                 final Cipher cipher = Cipher.getInstance("AES");
198                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199                 eventData = cipher.doFinal(eventData);
200             }
201             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203             boolean interrupted = false;
204             int count = 0;
205             do {
206                 try {
207                     future.get();
208                 } catch (final InterruptedException ie) {
209                     interrupted = true;
210                     ++count;
211                 }
212             } while (interrupted && count <= 1);
213 
214         } catch (final Exception ex) {
215             throw new LoggingException("Exception occurred writing log event", ex);
216         }
217     }
218 
219     @Override
220     protected void releaseSub() {
221         LOGGER.debug("Shutting down FlumePersistentManager");
222         worker.shutdown();
223         try {
224             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225         } catch (InterruptedException ie) {
226             // Ignore the exception and shutdown.
227         }
228         threadPool.shutdown();
229         try {
230             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231         } catch (final InterruptedException ie) {
232             LOGGER.warn("PersistentManager Thread pool failed to shut down");
233         }
234         try {
235             worker.join();
236         } catch (final InterruptedException ex) {
237             LOGGER.debug("Interrupted while waiting for worker to complete");
238         }
239         try {
240             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241             database.close();
242         } catch (final Exception ex) {
243             LOGGER.warn("Failed to close database", ex);
244         }
245         try {
246             environment.cleanLog();
247             environment.close();
248         } catch (final Exception ex) {
249             LOGGER.warn("Failed to close environment", ex);
250         }
251         super.releaseSub();
252     }
253 
254     private void doSend(final SimpleEvent event) {
255         LOGGER.debug("Sending event to Flume");
256         super.send(event);
257     }
258 
259     /**
260      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
261      */
262     private static class BDBWriter implements Callable<Integer> {
263         private final byte[] eventData;
264         private final byte[] keyData;
265         private final Environment environment;
266         private final Database database;
267         private final Gate gate;
268         private final AtomicLong dbCount;
269         private final long batchSize;
270         private final int lockTimeoutRetryCount;
271 
272         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274                          final int lockTimeoutRetryCount) {
275             this.keyData = keyData;
276             this.eventData = eventData;
277             this.environment = environment;
278             this.database = database;
279             this.gate = gate;
280             this.dbCount = dbCount;
281             this.batchSize = batchSize;
282             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283         }
284 
285         @Override
286         public Integer call() throws Exception {
287             final DatabaseEntry key = new DatabaseEntry(keyData);
288             final DatabaseEntry data = new DatabaseEntry(eventData);
289             Exception exception = null;
290             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291                 Transaction txn = null;
292                 try {
293                     txn = environment.beginTransaction(null, null);
294                     try {
295                         database.put(txn, key, data);
296                         txn.commit();
297                         txn = null;
298                         if (dbCount.incrementAndGet() >= batchSize) {
299                             gate.open();
300                         }
301                         exception = null;
302                         break;
303                     } catch (final LockConflictException lce) {
304                         exception = lce;
305                         // Fall through and retry.
306                     } catch (final Exception ex) {
307                         if (txn != null) {
308                             txn.abort();
309                         }
310                         throw ex;
311                     } finally {
312                         if (txn != null) {
313                             txn.abort();
314                             txn = null;
315                         }
316                     }
317                 } catch (LockConflictException lce) {
318                     exception = lce;
319                     if (txn != null) {
320                         try {
321                             txn.abort();
322                             txn = null;
323                         } catch (Exception ex) {
324                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325                         }
326                     }
327 
328                 }
329                 try {
330                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331                 } catch (InterruptedException ie) {
332                     // Ignore the error
333                 }
334             }
335             if (exception != null) {
336                 throw exception;
337             }
338             return eventData.length;
339         }
340     }
341 
342     /**
343      * Factory data.
344      */
345     private static class FactoryData {
346         private final String name;
347         private final Agent[] agents;
348         private final int batchSize;
349         private final String dataDir;
350         private final int retries;
351         private final int connectionTimeout;
352         private final int requestTimeout;
353         private final int delay;
354         private final int lockTimeoutRetryCount;
355         private final Property[] properties;
356 
357         /**
358          * Constructor.
359          * @param name The name of the Appender.
360          * @param agents The agents.
361          * @param batchSize The number of events to include in a batch.
362          * @param dataDir The directory for data.
363          */
364         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365                            final int connectionTimeout, final int requestTimeout, final int delay,
366                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367             this.name = name;
368             this.agents = agents;
369             this.batchSize = batchSize;
370             this.dataDir = dataDir;
371             this.retries = retries;
372             this.connectionTimeout = connectionTimeout;
373             this.requestTimeout = requestTimeout;
374             this.delay = delay;
375             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376             this.properties = properties;
377         }
378     }
379 
380     /**
381      * Avro Manager Factory.
382      */
383     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384 
385         /**
386          * Create the FlumeKratiManager.
387          * @param name The name of the entity to manage.
388          * @param data The data required to create the entity.
389          * @return The FlumeKratiManager.
390          */
391         @Override
392         public FlumePersistentManager createManager(final String name, final FactoryData data) {
393             SecretKey secretKey = null;
394             Database database = null;
395             Environment environment = null;
396 
397             final Map<String, String> properties = new HashMap<String, String>();
398             if (data.properties != null) {
399                 for (final Property property : data.properties) {
400                     properties.put(property.getName(), property.getValue());
401                 }
402             }
403 
404             try {
405                 final File dir = new File(data.dataDir);
406                 FileUtils.mkdir(dir, true);
407                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408                 dbEnvConfig.setTransactional(true);
409                 dbEnvConfig.setAllowCreate(true);
410                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411                 environment = new Environment(dir, dbEnvConfig);
412                 final DatabaseConfig dbConfig = new DatabaseConfig();
413                 dbConfig.setTransactional(true);
414                 dbConfig.setAllowCreate(true);
415                 database = environment.openDatabase(null, name, dbConfig);
416             } catch (final Exception ex) {
417                 LOGGER.error("Could not create FlumePersistentManager", ex);
418                 // For consistency, close database as well as environment even though it should never happen since the
419                 // database is that last thing in the block above, but this does guard against a future line being
420                 // inserted at the end that would bomb (like some debug logging).
421                 if (database != null) {
422                     database.close();
423                     database = null;
424                 }
425                 if (environment != null) {
426                     environment.close();
427                     environment = null;
428                 }
429                 return null;
430             }
431 
432             try {
433                 String key = null;
434                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
435                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436                         key = entry.getValue();
437                         break;
438                     }
439                 }
440                 if (key != null) {
441                     final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
442                     manager.collectPlugins();
443                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
444                     if (plugins != null) {
445                         boolean found = false;
446                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447                             if (entry.getKey().equalsIgnoreCase(key)) {
448                                 found = true;
449                                 final Class<?> cl = entry.getValue().getPluginClass();
450                                 try {
451                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452                                     secretKey = provider.getSecretKey();
453                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454                                 } catch (final Exception ex) {
455                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456                                         cl.getName());
457                                 }
458                                 break;
459                             }
460                         }
461                         if (!found) {
462                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463                         }
464                     } else {
465                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466                     }
467                 }
468             } catch (final Exception ex) {
469                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470             }
471             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472                 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
473                 data.lockTimeoutRetryCount);
474         }
475     }
476 
477     /**
478      * Thread that sends data to Flume and pulls it from Berkeley DB.
479      */
480     private static class WriterThread extends Thread  {
481         private volatile boolean shutdown = false;
482         private final Database database;
483         private final Environment environment;
484         private final FlumePersistentManager manager;
485         private final Gate gate;
486         private final SecretKey secretKey;
487         private final int batchSize;
488         private final AtomicLong dbCounter;
489         private final int lockTimeoutRetryCount;
490 
491         public WriterThread(final Database database, final Environment environment,
492                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
493                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494             this.database = database;
495             this.environment = environment;
496             this.manager = manager;
497             this.gate = gate;
498             this.batchSize = batchsize;
499             this.secretKey = secretKey;
500             this.setDaemon(true);
501             this.dbCounter = dbCount;
502             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503         }
504 
505         public void shutdown() {
506             LOGGER.debug("Writer thread shutting down");
507             this.shutdown = true;
508             gate.open();
509         }
510 
511         public boolean isShutdown() {
512             return shutdown;
513         }
514 
515         @Override
516         public void run() {
517             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
518             long nextBatch = System.currentTimeMillis() + manager.delay;
519             while (!shutdown) {
520                 long now = System.currentTimeMillis();
521                 long dbCount = database.count();
522                 dbCounter.set(dbCount);
523                 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
524                     nextBatch = now + manager.delay;
525                     try {
526                         boolean errors = false;
527                         DatabaseEntry key = new DatabaseEntry();
528                         final DatabaseEntry data = new DatabaseEntry();
529 
530                         gate.close();
531                         OperationStatus status;
532                         if (batchSize > 1) {
533                             try {
534                                 errors = sendBatch(key, data);
535                             } catch (final Exception ex) {
536                                 break;
537                             }
538                         } else {
539                             Exception exception = null;
540                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541                                 exception = null;
542                                 Transaction txn = null;
543                                 Cursor cursor = null;
544                                 try {
545                                     txn = environment.beginTransaction(null, null);
546                                     cursor = database.openCursor(txn, null);
547                                     try {
548                                         status = cursor.getFirst(key, data, LockMode.RMW);
549                                         while (status == OperationStatus.SUCCESS) {
550                                             final SimpleEvent event = createEvent(data);
551                                             if (event != null) {
552                                                 try {
553                                                     manager.doSend(event);
554                                                 } catch (final Exception ioe) {
555                                                     errors = true;
556                                                     LOGGER.error("Error sending event", ioe);
557                                                     break;
558                                                 }
559                                                 try {
560                                                     cursor.delete();
561                                                 } catch (final Exception ex) {
562                                                     LOGGER.error("Unable to delete event", ex);
563                                                 }
564                                             }
565                                             status = cursor.getNext(key, data, LockMode.RMW);
566                                         }
567                                         if (cursor != null) {
568                                             cursor.close();
569                                             cursor = null;
570                                         }
571                                         txn.commit();
572                                         txn = null;
573                                         dbCounter.decrementAndGet();
574                                         exception = null;
575                                         break;
576                                     } catch (final LockConflictException lce) {
577                                         exception = lce;
578                                         // Fall through and retry.
579                                     } catch (final Exception ex) {
580                                         LOGGER.error("Error reading or writing to database", ex);
581                                         shutdown = true;
582                                         break;
583                                     } finally {
584                                         if (cursor != null) {
585                                             cursor.close();
586                                             cursor = null;
587                                         }
588                                         if (txn != null) {
589                                             txn.abort();
590                                             txn = null;
591                                         }
592                                     }
593                                 } catch (LockConflictException lce) {
594                                     exception = lce;
595                                     if (cursor != null) {
596                                         try {
597                                             cursor.close();
598                                             cursor = null;
599                                         } catch (Exception ex) {
600                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601                                         }
602                                     }
603                                     if (txn != null) {
604                                         try {
605                                             txn.abort();
606                                             txn = null;
607                                         } catch (Exception ex) {
608                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609                                         }
610                                     }
611                                 }
612                                 try {
613                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614                                 } catch (InterruptedException ie) {
615                                     // Ignore the error
616                                 }
617                             }
618                             if (exception != null) {
619                                 LOGGER.error("Unable to read or update data base", exception);
620                             }
621                         }
622                         if (errors) {
623                             Thread.sleep(manager.delay);
624                             continue;
625                         }
626                     } catch (final Exception ex) {
627                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628                     }
629                 } else {
630                     if (nextBatch <= now) {
631                         nextBatch = now + manager.delay;
632                     }
633                     try {
634                         final long interval = nextBatch - now;
635                         gate.waitForOpen(interval);
636                     } catch (final InterruptedException ie) {
637                         LOGGER.warn("WriterThread interrupted, continuing");
638                     } catch (final Exception ex) {
639                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640                         break;
641                     }
642                 }
643             }
644 
645             if (batchSize > 1 && database.count() > 0) {
646                 DatabaseEntry key = new DatabaseEntry();
647                 final DatabaseEntry data = new DatabaseEntry();
648                 try {
649                     sendBatch(key, data);
650                 } catch (final Exception ex) {
651                     LOGGER.warn("Unable to write final batch");
652                 }
653             }
654             LOGGER.trace("WriterThread exiting");
655         }
656 
657         private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
658             boolean errors = false;
659             OperationStatus status;
660             Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
661             try {
662                 status = cursor.getFirst(key, data, null);
663 
664                 final BatchEvent batch = new BatchEvent();
665                 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
666                     final SimpleEvent event = createEvent(data);
667                     if (event != null) {
668                         batch.addEvent(event);
669                     }
670                     status = cursor.getNext(key, data, null);
671                 }
672                 try {
673                     manager.send(batch);
674                 } catch (final Exception ioe) {
675                     LOGGER.error("Error sending events", ioe);
676                     errors = true;
677                 }
678                 if (!errors) {
679                     cursor.close();
680                     cursor = null;
681                     Transaction txn = null;
682                     Exception exception = null;
683                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
684                         try {
685                             txn = environment.beginTransaction(null, null);
686                             try {
687                                 for (final Event event : batch.getEvents()) {
688                                     try {
689                                         final Map<String, String> headers = event.getHeaders();
690                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
691                                         database.delete(txn, key);
692                                     } catch (final Exception ex) {
693                                         LOGGER.error("Error deleting key from database", ex);
694                                     }
695                                 }
696                                 txn.commit();
697                                 long count = dbCounter.get();
698                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
699                                     count = dbCounter.get();
700                                 }
701                                 exception = null;
702                                 break;
703                             } catch (final LockConflictException lce) {
704                                 exception = lce;
705                                 if (cursor != null) {
706                                     try {
707                                         cursor.close();
708                                         cursor = null;
709                                     } catch (Exception ex) {
710                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
711                                     }
712                                 }
713                                 if (txn != null) {
714                                     try {
715                                         txn.abort();
716                                         txn = null;
717                                     } catch (Exception ex) {
718                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
719                                     }
720                                 }
721                             } catch (final Exception ex) {
722                                 LOGGER.error("Unable to commit transaction", ex);
723                                 if (txn != null) {
724                                     txn.abort();
725                                 }
726                             }
727                         } catch (LockConflictException lce) {
728                             exception = lce;
729                             if (cursor != null) {
730                                 try {
731                                     cursor.close();
732                                     cursor = null;
733                                 } catch (Exception ex) {
734                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
735                                 }
736                             }
737                             if (txn != null) {
738                                 try {
739                                     txn.abort();
740                                     txn = null;
741                                 } catch (Exception ex) {
742                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
743                                 }
744                             }
745                         } finally {
746                             if (cursor != null) {
747                                 cursor.close();
748                                 cursor = null;
749                             }
750                             if (txn != null) {
751                                 txn.abort();
752                                 txn = null;
753                             }
754                         }
755                         try {
756                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
757                         } catch (InterruptedException ie) {
758                             // Ignore the error
759                         }
760                     }
761                     if (exception != null) {
762                         LOGGER.error("Unable to delete events from data base", exception);
763                     }
764                 }
765             } catch (final Exception ex) {
766                 LOGGER.error("Error reading database", ex);
767                 shutdown = true;
768                 throw ex;
769             } finally {
770                 if (cursor != null) {
771                     cursor.close();
772                 }
773             }
774 
775             return errors;
776         }
777 
778         private SimpleEvent createEvent(final DatabaseEntry data) {
779             final SimpleEvent event = new SimpleEvent();
780             try {
781                 byte[] eventData = data.getData();
782                 if (secretKey != null) {
783                     final Cipher cipher = Cipher.getInstance("AES");
784                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
785                     eventData = cipher.doFinal(eventData);
786                 }
787                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
788                 final DataInputStream dais = new DataInputStream(bais);
789                 int length = dais.readInt();
790                 final byte[] bytes = new byte[length];
791                 dais.read(bytes, 0, length);
792                 event.setBody(bytes);
793                 length = dais.readInt();
794                 final Map<String, String> map = new HashMap<String, String>(length);
795                 for (int i = 0; i < length; ++i) {
796                     final String headerKey = dais.readUTF();
797                     final String value = dais.readUTF();
798                     map.put(headerKey, value);
799                 }
800                 event.setHeaders(map);
801                 return event;
802             } catch (final Exception ex) {
803                 LOGGER.error("Error retrieving event", ex);
804                 return null;
805             }
806         }
807 
808     }
809 
810     /**
811      * Factory that creates Daemon threads that can be properly shut down.
812      */
813     private static class DaemonThreadFactory implements ThreadFactory {
814         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
815         private final ThreadGroup group;
816         private final AtomicInteger threadNumber = new AtomicInteger(1);
817         private final String namePrefix;
818 
819         public DaemonThreadFactory() {
820             final SecurityManager securityManager = System.getSecurityManager();
821             group = securityManager != null ? securityManager.getThreadGroup() :
822                 Thread.currentThread().getThreadGroup();
823             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
824         }
825 
826         @Override
827         public Thread newThread(final Runnable r) {
828             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
829             thread.setDaemon(true);
830             if (thread.getPriority() != Thread.NORM_PRIORITY) {
831                 thread.setPriority(Thread.NORM_PRIORITY);
832             }
833             return thread;
834         }
835     }
836 
837     /**
838      * An internal class.
839      */
840     private static class Gate {
841 
842         private boolean isOpen = false;
843 
844         public boolean isOpen() {
845             return isOpen;
846         }
847 
848         public synchronized void open() {
849             isOpen = true;
850             notifyAll();
851         }
852 
853         public synchronized void close() {
854             isOpen = false;
855         }
856 
857         public synchronized void waitForOpen(long timeout) throws InterruptedException {
858             wait(timeout);
859         }
860     }
861 }