1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
34 import org.apache.logging.log4j.core.impl.ThrowableProxy;
35 import org.apache.logging.log4j.core.time.Instant;
36 import org.apache.logging.log4j.core.util.Patterns;
37 import org.apache.logging.log4j.core.util.UuidUtil;
38 import org.apache.logging.log4j.message.MapMessage;
39 import org.apache.logging.log4j.message.Message;
40 import org.apache.logging.log4j.message.StructuredDataId;
41 import org.apache.logging.log4j.message.StructuredDataMessage;
42 import org.apache.logging.log4j.util.ReadOnlyStringMap;
43 import org.apache.logging.log4j.util.Strings;
44
45
46
47
48 public class FlumeEvent extends SimpleEvent implements LogEvent {
49
50 static final String GUID = "guId";
51
52
53
54 private static final long serialVersionUID = -8988674608627854140L;
55
56 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
57
58 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
59
60 private static final String EVENT_TYPE = "eventType";
61
62 private static final String EVENT_ID = "eventId";
63
64 private static final String TIMESTAMP = "timeStamp";
65
66 private final LogEvent event;
67
68 private final Map<String, String> contextMap = new HashMap<>();
69
70 private final boolean compress;
71
72
73
74
75
76
77
78
79
80
81
82 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
83 String mdcPrefix, String eventPrefix, final boolean compress) {
84 this.event = event;
85 this.compress = compress;
86 final Map<String, String> headers = getHeaders();
87 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
88 if (mdcPrefix == null) {
89 mdcPrefix = DEFAULT_MDC_PREFIX;
90 }
91 if (eventPrefix == null) {
92 eventPrefix = DEFAULT_EVENT_PREFIX;
93 }
94 final Map<String, String> mdc = event.getContextData().toMap();
95 if (includes != null) {
96 final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
97 if (array.length > 0) {
98 for (String str : array) {
99 str = str.trim();
100 if (mdc.containsKey(str)) {
101 contextMap.put(str, mdc.get(str));
102 }
103 }
104 }
105 } else if (excludes != null) {
106 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
107 if (array.length > 0) {
108 final List<String> list = new ArrayList<>(array.length);
109 for (final String value : array) {
110 list.add(value.trim());
111 }
112 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
113 if (!list.contains(entry.getKey())) {
114 contextMap.put(entry.getKey(), entry.getValue());
115 }
116 }
117 }
118 } else {
119 contextMap.putAll(mdc);
120 }
121
122 if (required != null) {
123 final String[] array = required.split(Patterns.COMMA_SEPARATOR);
124 if (array.length > 0) {
125 for (String str : array) {
126 str = str.trim();
127 if (!mdc.containsKey(str)) {
128 throw new LoggingException("Required key " + str + " is missing from the MDC");
129 }
130 }
131 }
132 }
133 final String guid = UuidUtil.getTimeBasedUuid().toString();
134 final Message message = event.getMessage();
135 if (message instanceof MapMessage) {
136
137 @SuppressWarnings("unchecked")
138 final
139 MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
140 stringMapMessage.put(GUID, guid);
141 if (message instanceof StructuredDataMessage) {
142 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
143 }
144 addMapData(eventPrefix, headers, stringMapMessage);
145 } else {
146 headers.put(GUID, guid);
147 }
148
149 addContextData(mdcPrefix, headers, contextMap);
150 }
151
152 protected void addStructuredData(final String prefix, final Map<String, String> fields,
153 final StructuredDataMessage msg) {
154 fields.put(prefix + EVENT_TYPE, msg.getType());
155 final StructuredDataId id = msg.getId();
156 fields.put(prefix + EVENT_ID, id.getName());
157 }
158
159 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
160 final Map<String, String> data = msg.getData();
161 for (final Map.Entry<String, String> entry : data.entrySet()) {
162 fields.put(prefix + entry.getKey(), entry.getValue());
163 }
164 }
165
166 protected void addContextData(final String prefix, final Map<String, String> fields,
167 final Map<String, String> context) {
168 final Map<String, String> map = new HashMap<>();
169 for (final Map.Entry<String, String> entry : context.entrySet()) {
170 if (entry.getKey() != null && entry.getValue() != null) {
171 fields.put(prefix + entry.getKey(), entry.getValue());
172 map.put(prefix + entry.getKey(), entry.getValue());
173 }
174 }
175 context.clear();
176 context.putAll(map);
177 }
178
179 @Override
180 public LogEvent toImmutable() {
181 return Log4jLogEvent.createMemento(this);
182 }
183
184
185
186
187
188 @Override
189 public void setBody(final byte[] body) {
190 if (body == null || body.length == 0) {
191 super.setBody(new byte[0]);
192 return;
193 }
194 if (compress) {
195 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
196 try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
197 os.write(body);
198 } catch (final IOException ioe) {
199 throw new LoggingException("Unable to compress message", ioe);
200 }
201 super.setBody(baos.toByteArray());
202 } else {
203 super.setBody(body);
204 }
205 }
206
207
208
209
210
211 @Override
212 public String getLoggerFqcn() {
213 return event.getLoggerFqcn();
214 }
215
216
217
218
219
220 @Override
221 public Level getLevel() {
222 return event.getLevel();
223 }
224
225
226
227
228
229 @Override
230 public String getLoggerName() {
231 return event.getLoggerName();
232 }
233
234
235
236
237
238 @Override
239 public StackTraceElement getSource() {
240 return event.getSource();
241 }
242
243
244
245
246
247 @Override
248 public Message getMessage() {
249 return event.getMessage();
250 }
251
252
253
254
255
256 @Override
257 public Marker getMarker() {
258 return event.getMarker();
259 }
260
261
262
263
264
265 @Override
266 public long getThreadId() {
267 return event.getThreadId();
268 }
269
270
271
272
273
274 @Override
275 public int getThreadPriority() {
276 return event.getThreadPriority();
277 }
278
279
280
281
282
283 @Override
284 public String getThreadName() {
285 return event.getThreadName();
286 }
287
288
289
290
291
292 @Override
293 public long getTimeMillis() {
294 return event.getTimeMillis();
295 }
296
297
298
299
300
301 @Override
302 public Instant getInstant() {
303 return event.getInstant();
304 }
305
306
307
308
309
310
311 @Override
312 public long getNanoTime() {
313 return event.getNanoTime();
314 }
315
316
317
318
319
320 @Override
321 public Throwable getThrown() {
322 return event.getThrown();
323 }
324
325
326
327
328
329 @Override
330 public ThrowableProxy getThrownProxy() {
331 return event.getThrownProxy();
332 }
333
334
335
336
337
338 @Override
339 public Map<String, String> getContextMap() {
340 return contextMap;
341 }
342
343
344
345
346
347 @Override
348 public ReadOnlyStringMap getContextData() {
349 return event.getContextData();
350 }
351
352
353
354
355
356 @Override
357 public ThreadContext.ContextStack getContextStack() {
358 return event.getContextStack();
359 }
360
361 @Override
362 public boolean isIncludeLocation() {
363 return event.isIncludeLocation();
364 }
365
366 @Override
367 public void setIncludeLocation(final boolean includeLocation) {
368 event.setIncludeLocation(includeLocation);
369 }
370
371 @Override
372 public boolean isEndOfBatch() {
373 return event.isEndOfBatch();
374 }
375
376 @Override
377 public void setEndOfBatch(final boolean endOfBatch) {
378 event.setEndOfBatch(endOfBatch);
379 }
380
381 }