1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.logging.log4j.core.Appender;
24 import org.apache.logging.log4j.core.Filter;
25 import org.apache.logging.log4j.core.Layout;
26 import org.apache.logging.log4j.core.LogEvent;
27 import org.apache.logging.log4j.core.appender.AbstractAppender;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.config.plugins.Plugin;
30 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
31 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
32 import org.apache.logging.log4j.core.config.plugins.PluginElement;
33 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
34 import org.apache.logging.log4j.core.layout.Rfc5424Layout;
35 import org.apache.logging.log4j.core.net.Facility;
36 import org.apache.logging.log4j.core.util.Booleans;
37 import org.apache.logging.log4j.core.util.Integers;
38 import org.apache.logging.log4j.util.Timer;
39
40
41
42
43 @Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
44 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
45
46 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
47 private static final int DEFAULT_MAX_DELAY = 60000;
48
49 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
50
51 private final AbstractFlumeManager manager;
52
53 private final String mdcIncludes;
54 private final String mdcExcludes;
55 private final String mdcRequired;
56
57 private final String eventPrefix;
58
59 private final String mdcPrefix;
60
61 private final boolean compressBody;
62
63 private final FlumeEventFactory factory;
64
65 private Timer timer = new Timer("FlumeEvent", 5000);
66 private volatile long count = 0;
67
68
69
70
71 private enum ManagerType {
72 AVRO, EMBEDDED, PERSISTENT;
73
74 public static ManagerType getType(final String type) {
75 return valueOf(type.toUpperCase(Locale.US));
76 }
77 }
78
79 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
80 final boolean ignoreExceptions, final String includes, final String excludes, final String required,
81 final String mdcPrefix, final String eventPrefix, final boolean compress, final FlumeEventFactory factory,
82 final Property[] properties, final AbstractFlumeManager manager) {
83 super(name, filter, layout, ignoreExceptions, properties);
84 this.manager = manager;
85 this.mdcIncludes = includes;
86 this.mdcExcludes = excludes;
87 this.mdcRequired = required;
88 this.eventPrefix = eventPrefix;
89 this.mdcPrefix = mdcPrefix;
90 this.compressBody = compress;
91 this.factory = factory == null ? this : factory;
92 }
93
94
95
96
97
98 @Override
99 public void append(final LogEvent event) {
100 final String name = event.getLoggerName();
101 if (name != null) {
102 for (final String pkg : EXCLUDED_PACKAGES) {
103 if (name.startsWith(pkg)) {
104 return;
105 }
106 }
107 }
108 timer.startOrResume();
109 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
110 eventPrefix, compressBody);
111 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
112 if (update()) {
113 String msg = timer.stop();
114 LOGGER.debug(msg);
115 } else {
116 timer.pause();
117 }
118 manager.send(flumeEvent);
119 }
120
121 private synchronized boolean update() {
122 if (++count == 5000) {
123 count = 0;
124 return true;
125 }
126 return false;
127 }
128
129 @Override
130 public boolean stop(final long timeout, final TimeUnit timeUnit) {
131 setStopping();
132 boolean stopped = super.stop(timeout, timeUnit, false);
133 stopped &= manager.stop(timeout, timeUnit);
134 setStopped();
135 return stopped;
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149 @Override
150 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
151 final String required, final String mdcPrefix, final String eventPrefix,
152 final boolean compress) {
153 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
154 eventPrefix, compressBody);
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 @PluginFactory
188 public static FlumeAppender createAppender(@PluginElement("Agents") final Agent[] agents,
189 @PluginElement("Properties") final Property[] properties,
190 @PluginAttribute("hosts") final String hosts,
191 @PluginAttribute("embedded") final String embedded,
192 @PluginAttribute("type") final String type,
193 @PluginAttribute("dataDir") final String dataDir,
194 @PluginAliases("connectTimeout")
195 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
196 @PluginAliases("requestTimeout")
197 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
198 @PluginAttribute("agentRetries") final String agentRetries,
199 @PluginAliases("maxDelay")
200 @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
201 @PluginAttribute("name") final String name,
202 @PluginAttribute("ignoreExceptions") final String ignore,
203 @PluginAttribute("mdcExcludes") final String excludes,
204 @PluginAttribute("mdcIncludes") final String includes,
205 @PluginAttribute("mdcRequired") final String required,
206 @PluginAttribute("mdcPrefix") final String mdcPrefix,
207 @PluginAttribute("eventPrefix") final String eventPrefix,
208 @PluginAttribute("compress") final String compressBody,
209 @PluginAttribute("batchSize") final String batchSize,
210 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
211 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
212 @PluginElement("Layout") Layout<? extends Serializable> layout,
213 @PluginElement("Filter") final Filter filter) {
214
215 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
216 (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0;
217 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
218 final boolean compress = Booleans.parseBoolean(compressBody, true);
219 ManagerType managerType;
220 if (type != null) {
221 if (embed && embedded != null) {
222 try {
223 managerType = ManagerType.getType(type);
224 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
225 } catch (final Exception ex) {
226 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
227 " is invalid.");
228 managerType = ManagerType.EMBEDDED;
229 }
230 } else {
231 try {
232 managerType = ManagerType.getType(type);
233 } catch (final Exception ex) {
234 LOGGER.warn("Type " + type + " is invalid.");
235 managerType = ManagerType.EMBEDDED;
236 }
237 }
238 } else if (embed) {
239 managerType = ManagerType.EMBEDDED;
240 } else {
241 managerType = ManagerType.AVRO;
242 }
243
244 final int batchCount = Integers.parseInt(batchSize, 1);
245 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
246 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
247 final int retries = Integers.parseInt(agentRetries, 0);
248 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
249 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
250
251 if (layout == null) {
252 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
253 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
254 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
255 null);
256 }
257
258 if (name == null) {
259 LOGGER.error("No name provided for Appender");
260 return null;
261 }
262
263 AbstractFlumeManager manager;
264
265 switch (managerType) {
266 case EMBEDDED:
267 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
268 break;
269 case AVRO:
270 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
271 break;
272 case PERSISTENT:
273 manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries,
274 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
275 break;
276 default:
277 LOGGER.debug("No manager type specified. Defaulting to AVRO");
278 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
279 }
280
281 if (manager == null) {
282 return null;
283 }
284
285 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
286 excludes, required, mdcPrefix, eventPrefix, compress, factory, Property.EMPTY_ARRAY, manager);
287 }
288
289 private static Agent[] getAgents(Agent[] agents, final String hosts) {
290 if (agents == null || agents.length == 0) {
291 if (hosts != null && !hosts.isEmpty()) {
292 LOGGER.debug("Parsing agents from hosts parameter");
293 final String[] hostports = hosts.split(",");
294 agents = new Agent[hostports.length];
295 for(int i = 0; i < hostports.length; ++i) {
296 final String[] h = hostports[i].split(":");
297 agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null);
298 }
299 } else {
300 LOGGER.debug("No agents provided, using defaults");
301 agents = new Agent[] {Agent.createAgent(null, null)};
302 }
303 }
304
305 LOGGER.debug("Using agents {}", agents);
306 return agents;
307 }
308 }