View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.Serializable;
20  import java.util.Locale;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.logging.log4j.core.Appender;
24  import org.apache.logging.log4j.core.Filter;
25  import org.apache.logging.log4j.core.Layout;
26  import org.apache.logging.log4j.core.LogEvent;
27  import org.apache.logging.log4j.core.appender.AbstractAppender;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.config.plugins.Plugin;
30  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
31  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
32  import org.apache.logging.log4j.core.config.plugins.PluginElement;
33  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
34  import org.apache.logging.log4j.core.layout.Rfc5424Layout;
35  import org.apache.logging.log4j.core.net.Facility;
36  import org.apache.logging.log4j.core.util.Booleans;
37  import org.apache.logging.log4j.core.util.Integers;
38  import org.apache.logging.log4j.util.Timer;
39  
40  /**
41   * An Appender that uses the Avro protocol to route events to Flume.
42   */
43  @Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
44  public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
45  
46      private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
47      private static final int DEFAULT_MAX_DELAY = 60000;
48  
49      private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
50  
51      private final AbstractFlumeManager manager;
52  
53      private final String mdcIncludes;
54      private final String mdcExcludes;
55      private final String mdcRequired;
56  
57      private final String eventPrefix;
58  
59      private final String mdcPrefix;
60  
61      private final boolean compressBody;
62  
63      private final FlumeEventFactory factory;
64  
65      private Timer timer = new Timer("FlumeEvent", 5000);
66      private volatile long count = 0;
67  
68      /**
69       * Which Manager will be used by the appender instance.
70       */
71      private enum ManagerType {
72          AVRO, EMBEDDED, PERSISTENT;
73  
74          public static ManagerType getType(final String type) {
75              return valueOf(type.toUpperCase(Locale.US));
76          }
77      }
78  
79      private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
80              final boolean ignoreExceptions, final String includes, final String excludes, final String required,
81              final String mdcPrefix, final String eventPrefix, final boolean compress, final FlumeEventFactory factory,
82              final Property[] properties, final AbstractFlumeManager manager) {
83          super(name, filter, layout, ignoreExceptions, properties);
84          this.manager = manager;
85          this.mdcIncludes = includes;
86          this.mdcExcludes = excludes;
87          this.mdcRequired = required;
88          this.eventPrefix = eventPrefix;
89          this.mdcPrefix = mdcPrefix;
90          this.compressBody = compress;
91          this.factory = factory == null ? this : factory;
92      }
93  
94      /**
95       * Publish the event.
96       * @param event The LogEvent.
97       */
98      @Override
99      public void append(final LogEvent event) {
100         final String name = event.getLoggerName();
101         if (name != null) {
102             for (final String pkg : EXCLUDED_PACKAGES) {
103                 if (name.startsWith(pkg)) {
104                     return;
105                 }
106             }
107         }
108         timer.startOrResume();
109         final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
110             eventPrefix, compressBody);
111         flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
112         if (update()) {
113             String msg = timer.stop();
114             LOGGER.debug(msg);
115         } else {
116             timer.pause();
117         }
118         manager.send(flumeEvent);
119     }
120 
121     private synchronized boolean update() {
122         if (++count == 5000) {
123             count = 0;
124             return true;
125         }
126         return false;
127     }
128 
129     @Override
130     public boolean stop(final long timeout, final TimeUnit timeUnit) {
131         setStopping();
132         boolean stopped = super.stop(timeout, timeUnit, false);
133         stopped &= manager.stop(timeout, timeUnit);
134         setStopped();
135         return stopped;
136     }
137 
138     /**
139      * Create a Flume event.
140      * @param event The Log4j LogEvent.
141      * @param includes comma separated list of mdc elements to include.
142      * @param excludes comma separated list of mdc elements to exclude.
143      * @param required comma separated list of mdc elements that must be present with a value.
144      * @param mdcPrefix The prefix to add to MDC key names.
145      * @param eventPrefix The prefix to add to event fields.
146      * @param compress If true the body will be compressed.
147      * @return A Flume Event.
148      */
149     @Override
150     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
151                                   final String required, final String mdcPrefix, final String eventPrefix,
152                                   final boolean compress) {
153         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
154             eventPrefix, compressBody);
155     }
156 
157     /**
158      * Create a Flume Avro Appender.
159      * @param agents An array of Agents.
160      * @param properties Properties to pass to the embedded agent.
161      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
162      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
163      * @param type Avro (default), Embedded, or Persistent.
164      * @param dataDir The directory where the Flume FileChannel should write its data.
165      * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is
166      *                          1000.
167      * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
168      * @param agentRetries The number of times to retry an agent before failing to the next agent.
169      * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch.
170      * @param name The name of the Appender.
171      * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
172      *               they are propagated to the caller.
173      * @param excludes A comma separated list of MDC elements to exclude.
174      * @param includes A comma separated list of MDC elements to include.
175      * @param required A comma separated list of MDC elements that are required.
176      * @param mdcPrefix The prefix to add to MDC key names.
177      * @param eventPrefix The prefix to add to event key names.
178      * @param compressBody If true the event body will be compressed.
179      * @param batchSize Number of events to include in a batch. Defaults to 1.
180      * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
181      * @param factory The factory to use to create Flume events.
182      * @param layout The layout to format the event.
183      * @param filter A Filter to filter events.
184      *
185      * @return A Flume Avro Appender.
186      */
187     @PluginFactory
188     public static FlumeAppender createAppender(@PluginElement("Agents") final Agent[] agents,
189                                                @PluginElement("Properties") final Property[] properties,
190                                                @PluginAttribute("hosts") final String hosts,
191                                                @PluginAttribute("embedded") final String embedded,
192                                                @PluginAttribute("type") final String type,
193                                                @PluginAttribute("dataDir") final String dataDir,
194                                                @PluginAliases("connectTimeout")
195                                                @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
196                                                @PluginAliases("requestTimeout")
197                                                @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
198                                                @PluginAttribute("agentRetries") final String agentRetries,
199                                                @PluginAliases("maxDelay") // deprecated
200                                                @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
201                                                @PluginAttribute("name") final String name,
202                                                @PluginAttribute("ignoreExceptions") final String ignore,
203                                                @PluginAttribute("mdcExcludes") final String excludes,
204                                                @PluginAttribute("mdcIncludes") final String includes,
205                                                @PluginAttribute("mdcRequired") final String required,
206                                                @PluginAttribute("mdcPrefix") final String mdcPrefix,
207                                                @PluginAttribute("eventPrefix") final String eventPrefix,
208                                                @PluginAttribute("compress") final String compressBody,
209                                                @PluginAttribute("batchSize") final String batchSize,
210                                                @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
211                                                @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
212                                                @PluginElement("Layout") Layout<? extends Serializable> layout,
213                                                @PluginElement("Filter") final Filter filter) {
214 
215         final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
216             (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0;
217         final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
218         final boolean compress = Booleans.parseBoolean(compressBody, true);
219         ManagerType managerType;
220         if (type != null) {
221             if (embed && embedded != null) {
222                 try {
223                     managerType = ManagerType.getType(type);
224                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
225                 } catch (final Exception ex) {
226                     LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
227                         " is invalid.");
228                     managerType = ManagerType.EMBEDDED;
229                 }
230             } else {
231                 try {
232                     managerType = ManagerType.getType(type);
233                 } catch (final Exception ex) {
234                     LOGGER.warn("Type " + type + " is invalid.");
235                     managerType = ManagerType.EMBEDDED;
236                 }
237             }
238         }  else if (embed) {
239            managerType = ManagerType.EMBEDDED;
240         }  else {
241            managerType = ManagerType.AVRO;
242         }
243 
244         final int batchCount = Integers.parseInt(batchSize, 1);
245         final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
246         final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
247         final int retries = Integers.parseInt(agentRetries, 0);
248         final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
249         final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
250 
251         if (layout == null) {
252             final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
253             layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
254                     mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
255                     null);
256         }
257 
258         if (name == null) {
259             LOGGER.error("No name provided for Appender");
260             return null;
261         }
262 
263         AbstractFlumeManager manager;
264 
265         switch (managerType) {
266             case EMBEDDED:
267                 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
268                 break;
269             case AVRO:
270                 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
271                 break;
272             case PERSISTENT:
273                 manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries,
274                     connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
275                 break;
276             default:
277                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
278                 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
279         }
280 
281         if (manager == null) {
282             return null;
283         }
284 
285         return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
286             excludes, required, mdcPrefix, eventPrefix, compress, factory, Property.EMPTY_ARRAY, manager);
287     }
288 
289     private static Agent[] getAgents(Agent[] agents, final String hosts) {
290         if (agents == null || agents.length == 0) {
291             if (hosts != null && !hosts.isEmpty()) {
292                 LOGGER.debug("Parsing agents from hosts parameter");
293                 final String[] hostports = hosts.split(",");
294                 agents = new Agent[hostports.length];
295                 for(int i = 0; i < hostports.length; ++i) {
296                     final String[] h = hostports[i].split(":");
297                     agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null);
298                 }
299             } else {
300                 LOGGER.debug("No agents provided, using defaults");
301                 agents = new Agent[] {Agent.createAgent(null, null)};
302             }
303         }
304 
305         LOGGER.debug("Using agents {}", agents);
306         return agents;
307     }
308 }