1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21
22 import org.apache.logging.log4j.core.Filter;
23 import org.apache.logging.log4j.core.Layout;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.appender.AbstractAppender;
26 import org.apache.logging.log4j.core.config.Property;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
29 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
30 import org.apache.logging.log4j.core.config.plugins.PluginElement;
31 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32 import org.apache.logging.log4j.core.layout.Rfc5424Layout;
33 import org.apache.logging.log4j.core.net.Facility;
34 import org.apache.logging.log4j.core.util.Booleans;
35 import org.apache.logging.log4j.core.util.Integers;
36
37
38
39
40 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
41 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
42
43 private static final long serialVersionUID = 1L;
44 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
45 private static final int DEFAULT_MAX_DELAY = 60000;
46
47 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
48
49 private final AbstractFlumeManager manager;
50
51 private final String mdcIncludes;
52 private final String mdcExcludes;
53 private final String mdcRequired;
54
55 private final String eventPrefix;
56
57 private final String mdcPrefix;
58
59 private final boolean compressBody;
60
61 private final FlumeEventFactory factory;
62
63
64
65
66 private enum ManagerType {
67 AVRO, EMBEDDED, PERSISTENT;
68
69 public static ManagerType getType(final String type) {
70 return valueOf(type.toUpperCase(Locale.US));
71 }
72 }
73
74 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
75 final boolean ignoreExceptions, final String includes, final String excludes,
76 final String required, final String mdcPrefix, final String eventPrefix,
77 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
78 super(name, filter, layout, ignoreExceptions);
79 this.manager = manager;
80 this.mdcIncludes = includes;
81 this.mdcExcludes = excludes;
82 this.mdcRequired = required;
83 this.eventPrefix = eventPrefix;
84 this.mdcPrefix = mdcPrefix;
85 this.compressBody = compress;
86 this.factory = factory == null ? this : factory;
87 }
88
89
90
91
92
93 @Override
94 public void append(final LogEvent event) {
95 final String name = event.getLoggerName();
96 if (name != null) {
97 for (final String pkg : EXCLUDED_PACKAGES) {
98 if (name.startsWith(pkg)) {
99 return;
100 }
101 }
102 }
103 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
104 eventPrefix, compressBody);
105 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
106 manager.send(flumeEvent);
107 }
108
109 @Override
110 public void stop() {
111 super.stop();
112 manager.release();
113 }
114
115
116
117
118
119
120
121
122
123
124
125
126 @Override
127 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
128 final String required, final String mdcPrefix, final String eventPrefix,
129 final boolean compress) {
130 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
131 eventPrefix, compressBody);
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 @PluginFactory
165 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
166 @PluginElement("Properties") final Property[] properties,
167 @PluginAttribute("embedded") final String embedded,
168 @PluginAttribute("type") final String type,
169 @PluginAttribute("dataDir") final String dataDir,
170 @PluginAliases("connectTimeout")
171 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
172 @PluginAliases("requestTimeout")
173 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
174 @PluginAttribute("agentRetries") final String agentRetries,
175 @PluginAliases("maxDelay")
176 @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
177 @PluginAttribute("name") final String name,
178 @PluginAttribute("ignoreExceptions") final String ignore,
179 @PluginAttribute("mdcExcludes") final String excludes,
180 @PluginAttribute("mdcIncludes") final String includes,
181 @PluginAttribute("mdcRequired") final String required,
182 @PluginAttribute("mdcPrefix") final String mdcPrefix,
183 @PluginAttribute("eventPrefix") final String eventPrefix,
184 @PluginAttribute("compress") final String compressBody,
185 @PluginAttribute("batchSize") final String batchSize,
186 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
187 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
188 @PluginElement("Layout") Layout<? extends Serializable> layout,
189 @PluginElement("Filter") final Filter filter) {
190
191 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
192 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
193 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
194 final boolean compress = Booleans.parseBoolean(compressBody, true);
195 ManagerType managerType;
196 if (type != null) {
197 if (embed && embedded != null) {
198 try {
199 managerType = ManagerType.getType(type);
200 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
201 } catch (final Exception ex) {
202 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
203 " is invalid.");
204 managerType = ManagerType.EMBEDDED;
205 }
206 } else {
207 try {
208 managerType = ManagerType.getType(type);
209 } catch (final Exception ex) {
210 LOGGER.warn("Type " + type + " is invalid.");
211 managerType = ManagerType.EMBEDDED;
212 }
213 }
214 } else if (embed) {
215 managerType = ManagerType.EMBEDDED;
216 } else {
217 managerType = ManagerType.AVRO;
218 }
219
220 final int batchCount = Integers.parseInt(batchSize, 1);
221 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
222 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
223 final int retries = Integers.parseInt(agentRetries, 0);
224 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
225 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
226
227 if (layout == null) {
228 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
229 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
230 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
231 null);
232 }
233
234 if (name == null) {
235 LOGGER.error("No name provided for Appender");
236 return null;
237 }
238
239 AbstractFlumeManager manager;
240
241 switch (managerType) {
242 case EMBEDDED:
243 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
244 break;
245 case AVRO:
246 if (agents == null || agents.length == 0) {
247 LOGGER.debug("No agents provided, using defaults");
248 agents = new Agent[] {Agent.createAgent(null, null)};
249 }
250 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
251 break;
252 case PERSISTENT:
253 if (agents == null || agents.length == 0) {
254 LOGGER.debug("No agents provided, using defaults");
255 agents = new Agent[] {Agent.createAgent(null, null)};
256 }
257 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
258 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
259 break;
260 default:
261 LOGGER.debug("No manager type specified. Defaulting to AVRO");
262 if (agents == null || agents.length == 0) {
263 LOGGER.debug("No agents provided, using defaults");
264 agents = new Agent[] {Agent.createAgent(null, null)};
265 }
266 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
267 }
268
269 if (manager == null) {
270 return null;
271 }
272
273 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
274 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
275 }
276 }