001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.Serializable;
020import java.util.Locale;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.logging.log4j.core.Appender;
024import org.apache.logging.log4j.core.Filter;
025import org.apache.logging.log4j.core.Layout;
026import org.apache.logging.log4j.core.LogEvent;
027import org.apache.logging.log4j.core.appender.AbstractAppender;
028import org.apache.logging.log4j.core.config.Property;
029import org.apache.logging.log4j.core.config.plugins.Plugin;
030import org.apache.logging.log4j.core.config.plugins.PluginAliases;
031import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
032import org.apache.logging.log4j.core.config.plugins.PluginElement;
033import org.apache.logging.log4j.core.config.plugins.PluginFactory;
034import org.apache.logging.log4j.core.layout.Rfc5424Layout;
035import org.apache.logging.log4j.core.net.Facility;
036import org.apache.logging.log4j.core.util.Booleans;
037import org.apache.logging.log4j.core.util.Integers;
038import org.apache.logging.log4j.util.Timer;
039
040/**
041 * An Appender that uses the Avro protocol to route events to Flume.
042 */
043@Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
044public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
045
046    private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
047    private static final int DEFAULT_MAX_DELAY = 60000;
048
049    private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
050
051    private final AbstractFlumeManager manager;
052
053    private final String mdcIncludes;
054    private final String mdcExcludes;
055    private final String mdcRequired;
056
057    private final String eventPrefix;
058
059    private final String mdcPrefix;
060
061    private final boolean compressBody;
062
063    private final FlumeEventFactory factory;
064
065    private Timer timer = new Timer("FlumeEvent", 5000);
066    private volatile long count = 0;
067
068    /**
069     * Which Manager will be used by the appender instance.
070     */
071    private enum ManagerType {
072        AVRO, EMBEDDED, PERSISTENT;
073
074        public static ManagerType getType(final String type) {
075            return valueOf(type.toUpperCase(Locale.US));
076        }
077    }
078
079    private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
080            final boolean ignoreExceptions, final String includes, final String excludes, final String required,
081            final String mdcPrefix, final String eventPrefix, final boolean compress, final FlumeEventFactory factory,
082            final Property[] properties, final AbstractFlumeManager manager) {
083        super(name, filter, layout, ignoreExceptions, properties);
084        this.manager = manager;
085        this.mdcIncludes = includes;
086        this.mdcExcludes = excludes;
087        this.mdcRequired = required;
088        this.eventPrefix = eventPrefix;
089        this.mdcPrefix = mdcPrefix;
090        this.compressBody = compress;
091        this.factory = factory == null ? this : factory;
092    }
093
094    /**
095     * Publish the event.
096     * @param event The LogEvent.
097     */
098    @Override
099    public void append(final LogEvent event) {
100        final String name = event.getLoggerName();
101        if (name != null) {
102            for (final String pkg : EXCLUDED_PACKAGES) {
103                if (name.startsWith(pkg)) {
104                    return;
105                }
106            }
107        }
108        timer.startOrResume();
109        final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
110            eventPrefix, compressBody);
111        flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
112        if (update()) {
113            String msg = timer.stop();
114            LOGGER.debug(msg);
115        } else {
116            timer.pause();
117        }
118        manager.send(flumeEvent);
119    }
120
121    private synchronized boolean update() {
122        if (++count == 5000) {
123            count = 0;
124            return true;
125        }
126        return false;
127    }
128
129    @Override
130    public boolean stop(final long timeout, final TimeUnit timeUnit) {
131        setStopping();
132        boolean stopped = super.stop(timeout, timeUnit, false);
133        stopped &= manager.stop(timeout, timeUnit);
134        setStopped();
135        return stopped;
136    }
137
138    /**
139     * Create a Flume event.
140     * @param event The Log4j LogEvent.
141     * @param includes comma separated list of mdc elements to include.
142     * @param excludes comma separated list of mdc elements to exclude.
143     * @param required comma separated list of mdc elements that must be present with a value.
144     * @param mdcPrefix The prefix to add to MDC key names.
145     * @param eventPrefix The prefix to add to event fields.
146     * @param compress If true the body will be compressed.
147     * @return A Flume Event.
148     */
149    @Override
150    public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
151                                  final String required, final String mdcPrefix, final String eventPrefix,
152                                  final boolean compress) {
153        return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
154            eventPrefix, compressBody);
155    }
156
157    /**
158     * Create a Flume Avro Appender.
159     * @param agents An array of Agents.
160     * @param properties Properties to pass to the embedded agent.
161     * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
162     * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
163     * @param type Avro (default), Embedded, or Persistent.
164     * @param dataDir The directory where the Flume FileChannel should write its data.
165     * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is
166     *                          1000.
167     * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
168     * @param agentRetries The number of times to retry an agent before failing to the next agent.
169     * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch.
170     * @param name The name of the Appender.
171     * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
172     *               they are propagated to the caller.
173     * @param excludes A comma separated list of MDC elements to exclude.
174     * @param includes A comma separated list of MDC elements to include.
175     * @param required A comma separated list of MDC elements that are required.
176     * @param mdcPrefix The prefix to add to MDC key names.
177     * @param eventPrefix The prefix to add to event key names.
178     * @param compressBody If true the event body will be compressed.
179     * @param batchSize Number of events to include in a batch. Defaults to 1.
180     * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
181     * @param factory The factory to use to create Flume events.
182     * @param layout The layout to format the event.
183     * @param filter A Filter to filter events.
184     *
185     * @return A Flume Avro Appender.
186     */
187    @PluginFactory
188    public static FlumeAppender createAppender(@PluginElement("Agents") final Agent[] agents,
189                                               @PluginElement("Properties") final Property[] properties,
190                                               @PluginAttribute("hosts") final String hosts,
191                                               @PluginAttribute("embedded") final String embedded,
192                                               @PluginAttribute("type") final String type,
193                                               @PluginAttribute("dataDir") final String dataDir,
194                                               @PluginAliases("connectTimeout")
195                                               @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
196                                               @PluginAliases("requestTimeout")
197                                               @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
198                                               @PluginAttribute("agentRetries") final String agentRetries,
199                                               @PluginAliases("maxDelay") // deprecated
200                                               @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
201                                               @PluginAttribute("name") final String name,
202                                               @PluginAttribute("ignoreExceptions") final String ignore,
203                                               @PluginAttribute("mdcExcludes") final String excludes,
204                                               @PluginAttribute("mdcIncludes") final String includes,
205                                               @PluginAttribute("mdcRequired") final String required,
206                                               @PluginAttribute("mdcPrefix") final String mdcPrefix,
207                                               @PluginAttribute("eventPrefix") final String eventPrefix,
208                                               @PluginAttribute("compress") final String compressBody,
209                                               @PluginAttribute("batchSize") final String batchSize,
210                                               @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
211                                               @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
212                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
213                                               @PluginElement("Filter") final Filter filter) {
214
215        final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
216            (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0;
217        final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
218        final boolean compress = Booleans.parseBoolean(compressBody, true);
219        ManagerType managerType;
220        if (type != null) {
221            if (embed && embedded != null) {
222                try {
223                    managerType = ManagerType.getType(type);
224                    LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
225                } catch (final Exception ex) {
226                    LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
227                        " is invalid.");
228                    managerType = ManagerType.EMBEDDED;
229                }
230            } else {
231                try {
232                    managerType = ManagerType.getType(type);
233                } catch (final Exception ex) {
234                    LOGGER.warn("Type " + type + " is invalid.");
235                    managerType = ManagerType.EMBEDDED;
236                }
237            }
238        }  else if (embed) {
239           managerType = ManagerType.EMBEDDED;
240        }  else {
241           managerType = ManagerType.AVRO;
242        }
243
244        final int batchCount = Integers.parseInt(batchSize, 1);
245        final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
246        final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
247        final int retries = Integers.parseInt(agentRetries, 0);
248        final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
249        final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
250
251        if (layout == null) {
252            final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
253            layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
254                    mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
255                    null);
256        }
257
258        if (name == null) {
259            LOGGER.error("No name provided for Appender");
260            return null;
261        }
262
263        AbstractFlumeManager manager;
264
265        switch (managerType) {
266            case EMBEDDED:
267                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
268                break;
269            case AVRO:
270                manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
271                break;
272            case PERSISTENT:
273                manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries,
274                    connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
275                break;
276            default:
277                LOGGER.debug("No manager type specified. Defaulting to AVRO");
278                manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
279        }
280
281        if (manager == null) {
282            return null;
283        }
284
285        return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
286            excludes, required, mdcPrefix, eventPrefix, compress, factory, Property.EMPTY_ARRAY, manager);
287    }
288
289    private static Agent[] getAgents(Agent[] agents, final String hosts) {
290        if (agents == null || agents.length == 0) {
291            if (hosts != null && !hosts.isEmpty()) {
292                LOGGER.debug("Parsing agents from hosts parameter");
293                final String[] hostports = hosts.split(",");
294                agents = new Agent[hostports.length];
295                for(int i = 0; i < hostports.length; ++i) {
296                    final String[] h = hostports[i].split(":");
297                    agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null);
298                }
299            } else {
300                LOGGER.debug("No agents provided, using defaults");
301                agents = new Agent[] {Agent.createAgent(null, null)};
302            }
303        }
304
305        LOGGER.debug("Using agents {}", agents);
306        return agents;
307    }
308}