View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import javax.crypto.Cipher;
37  import javax.crypto.SecretKey;
38  
39  import org.apache.flume.Event;
40  import org.apache.flume.event.SimpleEvent;
41  import org.apache.logging.log4j.LoggingException;
42  import org.apache.logging.log4j.core.appender.ManagerFactory;
43  import org.apache.logging.log4j.core.config.Property;
44  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46  import org.apache.logging.log4j.core.util.FileUtils;
47  import org.apache.logging.log4j.core.util.SecretKeyProvider;
48  import org.apache.logging.log4j.util.Strings;
49  
50  import com.sleepycat.je.Cursor;
51  import com.sleepycat.je.CursorConfig;
52  import com.sleepycat.je.Database;
53  import com.sleepycat.je.DatabaseConfig;
54  import com.sleepycat.je.DatabaseEntry;
55  import com.sleepycat.je.Environment;
56  import com.sleepycat.je.EnvironmentConfig;
57  import com.sleepycat.je.LockConflictException;
58  import com.sleepycat.je.LockMode;
59  import com.sleepycat.je.OperationStatus;
60  import com.sleepycat.je.StatsConfig;
61  import com.sleepycat.je.Transaction;
62  
63  /**
64   * Manager that persists data to Berkeley DB before passing it on to Flume.
65   */
66  public class FlumePersistentManager extends FlumeAvroManager {
67  
68      /** Attribute name for the key provider. */
69      public static final String KEY_PROVIDER = "keyProvider";
70  
71      private static final Charset UTF8 = Charset.forName("UTF-8");
72  
73      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74  
75      private static final int SHUTDOWN_WAIT = 60;
76  
77      private static final int MILLIS_PER_SECOND = 1000;
78  
79      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80  
81      private static BDBManagerFactory factory = new BDBManagerFactory();
82  
83      private final Database database;
84  
85      private final Environment environment;
86  
87      private final WriterThread worker;
88  
89      private final Gate gate = new Gate();
90  
91      private final SecretKey secretKey;
92  
93      private final int delay;
94  
95      private final int lockTimeoutRetryCount;
96  
97      private final ExecutorService threadPool;
98  
99      private final AtomicLong dbCount = new AtomicLong();
100 
101     /**
102      * Constructor
103      * @param name The unique name of this manager.
104      * @param shortName Original name for the Manager.
105      * @param agents An array of Agents.
106      * @param batchSize The number of events to include in a batch.
107      * @param retries The number of times to retry connecting before giving up.
108      * @param connectionTimeout The amount of time to wait for a connection to be established.
109      * @param requestTimeout The amount of time to wair for a response to a request.
110      * @param delay The amount of time to wait between retries.
111      * @param database The database to write to.
112      * @param environment The database environment.
113      * @param secretKey The SecretKey to use for encryption.
114      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115      */
116     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                      final int batchSize, final int retries, final int connectionTimeout,
118                                      final int requestTimeout, final int delay, final Database database,
119                                      final Environment environment, final SecretKey secretKey,
120                                      final int lockTimeoutRetryCount) {
121         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122         this.delay = delay;
123         this.database = database;
124         this.environment = environment;
125         dbCount.set(database.count());
126         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127             lockTimeoutRetryCount);
128         this.worker.start();
129         this.secretKey = secretKey;
130         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132     }
133 
134 
135     /**
136      * Returns a FlumeAvroManager.
137      * @param name The name of the manager.
138      * @param agents The agents to use.
139      * @param properties Properties to pass to the Manager.
140      * @param batchSize The number of events to include in a batch.
141      * @param retries The number of times to retry connecting before giving up.
142      * @param connectionTimeout The amount of time to wait to establish a connection.
143      * @param requestTimeout The amount of time to wait for a response to a request.
144      * @param delay Amount of time to delay before delivering a batch.
145      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
146      * @param dataDir The location of the Berkeley database.
147      * @return A FlumeAvroManager.
148      */
149     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150                                                     final Property[] properties, int batchSize, final int retries,
151                                                     final int connectionTimeout, final int requestTimeout,
152                                                     final int delay, final int lockTimeoutRetryCount,
153                                                     final String dataDir) {
154         if (agents == null || agents.length == 0) {
155             throw new IllegalArgumentException("At least one agent is required");
156         }
157 
158         if (batchSize <= 0) {
159             batchSize = 1;
160         }
161         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162 
163         final StringBuilder sb = new StringBuilder("FlumePersistent[");
164         boolean first = true;
165         for (final Agent agent : agents) {
166             if (!first) {
167                 sb.append(',');
168             }
169             sb.append(agent.getHost()).append(':').append(agent.getPort());
170             first = false;
171         }
172         sb.append(']');
173         sb.append(' ').append(dataDirectory);
174         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175             connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
176     }
177 
178     @Override
179     public void send(final Event event)  {
180         if (worker.isShutdown()) {
181             throw new LoggingException("Unable to record event");
182         }
183 
184         final Map<String, String> headers = event.getHeaders();
185         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186         try {
187             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188             final DataOutputStream daos = new DataOutputStream(baos);
189             daos.writeInt(event.getBody().length);
190             daos.write(event.getBody(), 0, event.getBody().length);
191             daos.writeInt(event.getHeaders().size());
192             for (final Map.Entry<String, String> entry : headers.entrySet()) {
193                 daos.writeUTF(entry.getKey());
194                 daos.writeUTF(entry.getValue());
195             }
196             byte[] eventData = baos.toByteArray();
197             if (secretKey != null) {
198                 final Cipher cipher = Cipher.getInstance("AES");
199                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200                 eventData = cipher.doFinal(eventData);
201             }
202             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204             boolean interrupted = false;
205             int count = 0;
206             do {
207                 try {
208                     future.get();
209                 } catch (final InterruptedException ie) {
210                     interrupted = true;
211                     ++count;
212                 }
213             } while (interrupted && count <= 1);
214 
215         } catch (final Exception ex) {
216             throw new LoggingException("Exception occurred writing log event", ex);
217         }
218     }
219 
220     @Override
221     protected void releaseSub() {
222         LOGGER.debug("Shutting down FlumePersistentManager");
223         worker.shutdown();
224         try {
225             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226         } catch (final InterruptedException ie) {
227             // Ignore the exception and shutdown.
228         }
229         threadPool.shutdown();
230         try {
231             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232         } catch (final InterruptedException ie) {
233             LOGGER.warn("PersistentManager Thread pool failed to shut down");
234         }
235         try {
236             worker.join();
237         } catch (final InterruptedException ex) {
238             LOGGER.debug("Interrupted while waiting for worker to complete");
239         }
240         try {
241             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242             database.close();
243         } catch (final Exception ex) {
244             LOGGER.warn("Failed to close database", ex);
245         }
246         try {
247             environment.cleanLog();
248             environment.close();
249         } catch (final Exception ex) {
250             LOGGER.warn("Failed to close environment", ex);
251         }
252         super.releaseSub();
253     }
254 
255     private void doSend(final SimpleEvent event) {
256         LOGGER.debug("Sending event to Flume");
257         super.send(event);
258     }
259 
260     /**
261      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
262      */
263     private static class BDBWriter implements Callable<Integer> {
264         private final byte[] eventData;
265         private final byte[] keyData;
266         private final Environment environment;
267         private final Database database;
268         private final Gate gate;
269         private final AtomicLong dbCount;
270         private final long batchSize;
271         private final int lockTimeoutRetryCount;
272 
273         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275                          final int lockTimeoutRetryCount) {
276             this.keyData = keyData;
277             this.eventData = eventData;
278             this.environment = environment;
279             this.database = database;
280             this.gate = gate;
281             this.dbCount = dbCount;
282             this.batchSize = batchSize;
283             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284         }
285 
286         @Override
287         public Integer call() throws Exception {
288             final DatabaseEntry key = new DatabaseEntry(keyData);
289             final DatabaseEntry data = new DatabaseEntry(eventData);
290             Exception exception = null;
291             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292                 Transaction txn = null;
293                 try {
294                     txn = environment.beginTransaction(null, null);
295                     try {
296                         database.put(txn, key, data);
297                         txn.commit();
298                         txn = null;
299                         if (dbCount.incrementAndGet() >= batchSize) {
300                             gate.open();
301                         }
302                         exception = null;
303                         break;
304                     } catch (final LockConflictException lce) {
305                         exception = lce;
306                         // Fall through and retry.
307                     } catch (final Exception ex) {
308                         if (txn != null) {
309                             txn.abort();
310                         }
311                         throw ex;
312                     } finally {
313                         if (txn != null) {
314                             txn.abort();
315                             txn = null;
316                         }
317                     }
318                 } catch (final LockConflictException lce) {
319                     exception = lce;
320                     if (txn != null) {
321                         try {
322                             txn.abort();
323                             txn = null;
324                         } catch (final Exception ex) {
325                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326                         }
327                     }
328 
329                 }
330                 try {
331                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332                 } catch (final InterruptedException ie) {
333                     // Ignore the error
334                 }
335             }
336             if (exception != null) {
337                 throw exception;
338             }
339             return eventData.length;
340         }
341     }
342 
343     /**
344      * Factory data.
345      */
346     private static class FactoryData {
347         private final String name;
348         private final Agent[] agents;
349         private final int batchSize;
350         private final String dataDir;
351         private final int retries;
352         private final int connectionTimeout;
353         private final int requestTimeout;
354         private final int delay;
355         private final int lockTimeoutRetryCount;
356         private final Property[] properties;
357 
358         /**
359          * Constructor.
360          * @param name The name of the Appender.
361          * @param agents The agents.
362          * @param batchSize The number of events to include in a batch.
363          * @param dataDir The directory for data.
364          */
365         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366                            final int connectionTimeout, final int requestTimeout, final int delay,
367                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368             this.name = name;
369             this.agents = agents;
370             this.batchSize = batchSize;
371             this.dataDir = dataDir;
372             this.retries = retries;
373             this.connectionTimeout = connectionTimeout;
374             this.requestTimeout = requestTimeout;
375             this.delay = delay;
376             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377             this.properties = properties;
378         }
379     }
380 
381     /**
382      * Avro Manager Factory.
383      */
384     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385 
386         /**
387          * Create the FlumeKratiManager.
388          * @param name The name of the entity to manage.
389          * @param data The data required to create the entity.
390          * @return The FlumeKratiManager.
391          */
392         @Override
393         public FlumePersistentManager createManager(final String name, final FactoryData data) {
394             SecretKey secretKey = null;
395             Database database = null;
396             Environment environment = null;
397 
398             final Map<String, String> properties = new HashMap<String, String>();
399             if (data.properties != null) {
400                 for (final Property property : data.properties) {
401                     properties.put(property.getName(), property.getValue());
402                 }
403             }
404 
405             try {
406                 final File dir = new File(data.dataDir);
407                 FileUtils.mkdir(dir, true);
408                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409                 dbEnvConfig.setTransactional(true);
410                 dbEnvConfig.setAllowCreate(true);
411                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412                 environment = new Environment(dir, dbEnvConfig);
413                 final DatabaseConfig dbConfig = new DatabaseConfig();
414                 dbConfig.setTransactional(true);
415                 dbConfig.setAllowCreate(true);
416                 database = environment.openDatabase(null, name, dbConfig);
417             } catch (final Exception ex) {
418                 LOGGER.error("Could not create FlumePersistentManager", ex);
419                 // For consistency, close database as well as environment even though it should never happen since the
420                 // database is that last thing in the block above, but this does guard against a future line being
421                 // inserted at the end that would bomb (like some debug logging).
422                 if (database != null) {
423                     database.close();
424                     database = null;
425                 }
426                 if (environment != null) {
427                     environment.close();
428                     environment = null;
429                 }
430                 return null;
431             }
432 
433             try {
434                 String key = null;
435                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
436                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437                         key = entry.getValue();
438                         break;
439                     }
440                 }
441                 if (key != null) {
442                     final PluginManager manager = new PluginManager("KeyProvider");
443                     manager.collectPlugins();
444                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
445                     if (plugins != null) {
446                         boolean found = false;
447                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448                             if (entry.getKey().equalsIgnoreCase(key)) {
449                                 found = true;
450                                 final Class<?> cl = entry.getValue().getPluginClass();
451                                 try {
452                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453                                     secretKey = provider.getSecretKey();
454                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455                                 } catch (final Exception ex) {
456                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457                                         cl.getName());
458                                 }
459                                 break;
460                             }
461                         }
462                         if (!found) {
463                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                         }
465                     } else {
466                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467                     }
468                 }
469             } catch (final Exception ex) {
470                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471             }
472             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473                 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
474                 data.lockTimeoutRetryCount);
475         }
476     }
477 
478     /**
479      * Thread that sends data to Flume and pulls it from Berkeley DB.
480      */
481     private static class WriterThread extends Thread  {
482         private volatile boolean shutdown = false;
483         private final Database database;
484         private final Environment environment;
485         private final FlumePersistentManager manager;
486         private final Gate gate;
487         private final SecretKey secretKey;
488         private final int batchSize;
489         private final AtomicLong dbCounter;
490         private final int lockTimeoutRetryCount;
491 
492         public WriterThread(final Database database, final Environment environment,
493                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
494                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495             this.database = database;
496             this.environment = environment;
497             this.manager = manager;
498             this.gate = gate;
499             this.batchSize = batchsize;
500             this.secretKey = secretKey;
501             this.setDaemon(true);
502             this.dbCounter = dbCount;
503             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504         }
505 
506         public void shutdown() {
507             LOGGER.debug("Writer thread shutting down");
508             this.shutdown = true;
509             gate.open();
510         }
511 
512         public boolean isShutdown() {
513             return shutdown;
514         }
515 
516         @Override
517         public void run() {
518             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
519             long nextBatch = System.currentTimeMillis() + manager.delay;
520             while (!shutdown) {
521                 final long now = System.currentTimeMillis();
522                 final long dbCount = database.count();
523                 dbCounter.set(dbCount);
524                 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
525                     nextBatch = now + manager.delay;
526                     try {
527                         boolean errors = false;
528                         final DatabaseEntry key = new DatabaseEntry();
529                         final DatabaseEntry data = new DatabaseEntry();
530 
531                         gate.close();
532                         OperationStatus status;
533                         if (batchSize > 1) {
534                             try {
535                                 errors = sendBatch(key, data);
536                             } catch (final Exception ex) {
537                                 break;
538                             }
539                         } else {
540                             Exception exception = null;
541                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542                                 exception = null;
543                                 Transaction txn = null;
544                                 Cursor cursor = null;
545                                 try {
546                                     txn = environment.beginTransaction(null, null);
547                                     cursor = database.openCursor(txn, null);
548                                     try {
549                                         status = cursor.getFirst(key, data, LockMode.RMW);
550                                         while (status == OperationStatus.SUCCESS) {
551                                             final SimpleEvent event = createEvent(data);
552                                             if (event != null) {
553                                                 try {
554                                                     manager.doSend(event);
555                                                 } catch (final Exception ioe) {
556                                                     errors = true;
557                                                     LOGGER.error("Error sending event", ioe);
558                                                     break;
559                                                 }
560                                                 try {
561                                                     cursor.delete();
562                                                 } catch (final Exception ex) {
563                                                     LOGGER.error("Unable to delete event", ex);
564                                                 }
565                                             }
566                                             status = cursor.getNext(key, data, LockMode.RMW);
567                                         }
568                                         if (cursor != null) {
569                                             cursor.close();
570                                             cursor = null;
571                                         }
572                                         txn.commit();
573                                         txn = null;
574                                         dbCounter.decrementAndGet();
575                                         exception = null;
576                                         break;
577                                     } catch (final LockConflictException lce) {
578                                         exception = lce;
579                                         // Fall through and retry.
580                                     } catch (final Exception ex) {
581                                         LOGGER.error("Error reading or writing to database", ex);
582                                         shutdown = true;
583                                         break;
584                                     } finally {
585                                         if (cursor != null) {
586                                             cursor.close();
587                                             cursor = null;
588                                         }
589                                         if (txn != null) {
590                                             txn.abort();
591                                             txn = null;
592                                         }
593                                     }
594                                 } catch (final LockConflictException lce) {
595                                     exception = lce;
596                                     if (cursor != null) {
597                                         try {
598                                             cursor.close();
599                                             cursor = null;
600                                         } catch (final Exception ex) {
601                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602                                         }
603                                     }
604                                     if (txn != null) {
605                                         try {
606                                             txn.abort();
607                                             txn = null;
608                                         } catch (final Exception ex) {
609                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610                                         }
611                                     }
612                                 }
613                                 try {
614                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615                                 } catch (final InterruptedException ie) {
616                                     // Ignore the error
617                                 }
618                             }
619                             if (exception != null) {
620                                 LOGGER.error("Unable to read or update data base", exception);
621                             }
622                         }
623                         if (errors) {
624                             Thread.sleep(manager.delay);
625                             continue;
626                         }
627                     } catch (final Exception ex) {
628                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629                     }
630                 } else {
631                     if (nextBatch <= now) {
632                         nextBatch = now + manager.delay;
633                     }
634                     try {
635                         final long interval = nextBatch - now;
636                         gate.waitForOpen(interval);
637                     } catch (final InterruptedException ie) {
638                         LOGGER.warn("WriterThread interrupted, continuing");
639                     } catch (final Exception ex) {
640                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641                         break;
642                     }
643                 }
644             }
645 
646             if (batchSize > 1 && database.count() > 0) {
647                 final DatabaseEntry key = new DatabaseEntry();
648                 final DatabaseEntry data = new DatabaseEntry();
649                 try {
650                     sendBatch(key, data);
651                 } catch (final Exception ex) {
652                     LOGGER.warn("Unable to write final batch");
653                 }
654             }
655             LOGGER.trace("WriterThread exiting");
656         }
657 
658         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
659             boolean errors = false;
660             OperationStatus status;
661             Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
662             try {
663                 status = cursor.getFirst(key, data, null);
664 
665                 final BatchEvent batch = new BatchEvent();
666                 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667                     final SimpleEvent event = createEvent(data);
668                     if (event != null) {
669                         batch.addEvent(event);
670                     }
671                     status = cursor.getNext(key, data, null);
672                 }
673                 try {
674                     manager.send(batch);
675                 } catch (final Exception ioe) {
676                     LOGGER.error("Error sending events", ioe);
677                     errors = true;
678                 }
679                 if (!errors) {
680                     cursor.close();
681                     cursor = null;
682                     Transaction txn = null;
683                     Exception exception = null;
684                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
685                         try {
686                             txn = environment.beginTransaction(null, null);
687                             try {
688                                 for (final Event event : batch.getEvents()) {
689                                     try {
690                                         final Map<String, String> headers = event.getHeaders();
691                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
692                                         database.delete(txn, key);
693                                     } catch (final Exception ex) {
694                                         LOGGER.error("Error deleting key from database", ex);
695                                     }
696                                 }
697                                 txn.commit();
698                                 long count = dbCounter.get();
699                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
700                                     count = dbCounter.get();
701                                 }
702                                 exception = null;
703                                 break;
704                             } catch (final LockConflictException lce) {
705                                 exception = lce;
706                                 if (cursor != null) {
707                                     try {
708                                         cursor.close();
709                                         cursor = null;
710                                     } catch (final Exception ex) {
711                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
712                                     }
713                                 }
714                                 if (txn != null) {
715                                     try {
716                                         txn.abort();
717                                         txn = null;
718                                     } catch (final Exception ex) {
719                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
720                                     }
721                                 }
722                             } catch (final Exception ex) {
723                                 LOGGER.error("Unable to commit transaction", ex);
724                                 if (txn != null) {
725                                     txn.abort();
726                                 }
727                             }
728                         } catch (final LockConflictException lce) {
729                             exception = lce;
730                             if (cursor != null) {
731                                 try {
732                                     cursor.close();
733                                     cursor = null;
734                                 } catch (final Exception ex) {
735                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
736                                 }
737                             }
738                             if (txn != null) {
739                                 try {
740                                     txn.abort();
741                                     txn = null;
742                                 } catch (final Exception ex) {
743                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
744                                 }
745                             }
746                         } finally {
747                             if (cursor != null) {
748                                 cursor.close();
749                                 cursor = null;
750                             }
751                             if (txn != null) {
752                                 txn.abort();
753                                 txn = null;
754                             }
755                         }
756                         try {
757                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
758                         } catch (final InterruptedException ie) {
759                             // Ignore the error
760                         }
761                     }
762                     if (exception != null) {
763                         LOGGER.error("Unable to delete events from data base", exception);
764                     }
765                 }
766             } catch (final Exception ex) {
767                 LOGGER.error("Error reading database", ex);
768                 shutdown = true;
769                 throw ex;
770             } finally {
771                 if (cursor != null) {
772                     cursor.close();
773                 }
774             }
775 
776             return errors;
777         }
778 
779         private SimpleEvent createEvent(final DatabaseEntry data) {
780             final SimpleEvent event = new SimpleEvent();
781             try {
782                 byte[] eventData = data.getData();
783                 if (secretKey != null) {
784                     final Cipher cipher = Cipher.getInstance("AES");
785                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
786                     eventData = cipher.doFinal(eventData);
787                 }
788                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
789                 final DataInputStream dais = new DataInputStream(bais);
790                 int length = dais.readInt();
791                 final byte[] bytes = new byte[length];
792                 dais.read(bytes, 0, length);
793                 event.setBody(bytes);
794                 length = dais.readInt();
795                 final Map<String, String> map = new HashMap<String, String>(length);
796                 for (int i = 0; i < length; ++i) {
797                     final String headerKey = dais.readUTF();
798                     final String value = dais.readUTF();
799                     map.put(headerKey, value);
800                 }
801                 event.setHeaders(map);
802                 return event;
803             } catch (final Exception ex) {
804                 LOGGER.error("Error retrieving event", ex);
805                 return null;
806             }
807         }
808 
809     }
810 
811     /**
812      * Factory that creates Daemon threads that can be properly shut down.
813      */
814     private static class DaemonThreadFactory implements ThreadFactory {
815         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
816         private final ThreadGroup group;
817         private final AtomicInteger threadNumber = new AtomicInteger(1);
818         private final String namePrefix;
819 
820         public DaemonThreadFactory() {
821             final SecurityManager securityManager = System.getSecurityManager();
822             group = securityManager != null ? securityManager.getThreadGroup() :
823                 Thread.currentThread().getThreadGroup();
824             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
825         }
826 
827         @Override
828         public Thread newThread(final Runnable r) {
829             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
830             thread.setDaemon(true);
831             if (thread.getPriority() != Thread.NORM_PRIORITY) {
832                 thread.setPriority(Thread.NORM_PRIORITY);
833             }
834             return thread;
835         }
836     }
837 
838     /**
839      * An internal class.
840      */
841     private static class Gate {
842 
843         private boolean isOpen = false;
844 
845         public boolean isOpen() {
846             return isOpen;
847         }
848 
849         public synchronized void open() {
850             isOpen = true;
851             notifyAll();
852         }
853 
854         public synchronized void close() {
855             isOpen = false;
856         }
857 
858         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
859             wait(timeout);
860         }
861     }
862 }