View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.util.ReadOnlyStringMap;
33  import org.apache.logging.log4j.core.LogEvent;
34  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
35  import org.apache.logging.log4j.core.impl.ThrowableProxy;
36  import org.apache.logging.log4j.core.util.Patterns;
37  import org.apache.logging.log4j.core.util.UuidUtil;
38  import org.apache.logging.log4j.message.MapMessage;
39  import org.apache.logging.log4j.message.Message;
40  import org.apache.logging.log4j.message.StructuredDataId;
41  import org.apache.logging.log4j.message.StructuredDataMessage;
42  import org.apache.logging.log4j.util.Strings;
43  
44  /**
45   * Class that is both a Flume and Log4j Event.
46   */
47  public class FlumeEvent extends SimpleEvent implements LogEvent {
48  
49      static final String GUID = "guId";
50      /**
51       * Generated serial version ID.
52       */
53      private static final long serialVersionUID = -8988674608627854140L;
54  
55      private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
56  
57      private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
58  
59      private static final String EVENT_TYPE = "eventType";
60  
61      private static final String EVENT_ID = "eventId";
62  
63      private static final String TIMESTAMP = "timeStamp";
64  
65      private final LogEvent event;
66  
67      private final Map<String, String> contextMap = new HashMap<>();
68  
69      private final boolean compress;
70  
71      /**
72       * Construct the FlumeEvent.
73       * @param event The Log4j LogEvent.
74       * @param includes A comma separated list of MDC elements to include.
75       * @param excludes A comma separated list of MDC elements to exclude.
76       * @param required A comma separated list of MDC elements that are required to be defined.
77       * @param mdcPrefix The value to prefix to MDC keys.
78       * @param eventPrefix The value to prefix to event keys.
79       * @param compress If true the event body should be compressed.
80       */
81      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
82                        String mdcPrefix, String eventPrefix, final boolean compress) {
83          this.event = event;
84          this.compress = compress;
85          final Map<String, String> headers = getHeaders();
86          headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
87          if (mdcPrefix == null) {
88              mdcPrefix = DEFAULT_MDC_PREFIX;
89          }
90          if (eventPrefix == null) {
91              eventPrefix = DEFAULT_EVENT_PREFIX;
92          }
93          final Map<String, String> mdc = event.getContextData().toMap();
94          if (includes != null) {
95              final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
96              if (array.length > 0) {
97                  for (String str : array) {
98                      str = str.trim();
99                      if (mdc.containsKey(str)) {
100                         contextMap.put(str, mdc.get(str));
101                     }
102                 }
103             }
104         } else if (excludes != null) {
105             final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
106             if (array.length > 0) {
107                 final List<String> list = new ArrayList<>(array.length);
108                 for (final String value : array) {
109                     list.add(value.trim());
110                 }
111                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
112                     if (!list.contains(entry.getKey())) {
113                         contextMap.put(entry.getKey(), entry.getValue());
114                     }
115                 }
116             }
117         } else {
118             contextMap.putAll(mdc);
119         }
120 
121         if (required != null) {
122             final String[] array = required.split(Patterns.COMMA_SEPARATOR);
123             if (array.length > 0) {
124                 for (String str : array) {
125                     str = str.trim();
126                     if (!mdc.containsKey(str)) {
127                         throw new LoggingException("Required key " + str + " is missing from the MDC");
128                     }
129                 }
130             }
131         }
132         final String guid =  UuidUtil.getTimeBasedUuid().toString();
133         final Message message = event.getMessage();
134         if (message instanceof MapMessage) {
135             // Add the guid to the Map so that it can be included in the Layout.
136             ((MapMessage) message).put(GUID, guid);
137             if (message instanceof StructuredDataMessage) {
138                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
139             }
140             addMapData(eventPrefix, headers, (MapMessage) message);
141         } else {
142             headers.put(GUID, guid);
143         }
144 
145         addContextData(mdcPrefix, headers, contextMap);
146     }
147 
148     protected void addStructuredData(final String prefix, final Map<String, String> fields,
149                                      final StructuredDataMessage msg) {
150         fields.put(prefix + EVENT_TYPE, msg.getType());
151         final StructuredDataId id = msg.getId();
152         fields.put(prefix + EVENT_ID, id.getName());
153     }
154 
155     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
156         final Map<String, String> data = msg.getData();
157         for (final Map.Entry<String, String> entry : data.entrySet()) {
158             fields.put(prefix + entry.getKey(), entry.getValue());
159         }
160     }
161 
162     protected void addContextData(final String prefix, final Map<String, String> fields,
163                                   final Map<String, String> context) {
164         final Map<String, String> map = new HashMap<>();
165         for (final Map.Entry<String, String> entry : context.entrySet()) {
166             if (entry.getKey() != null && entry.getValue() != null) {
167                 fields.put(prefix + entry.getKey(), entry.getValue());
168                 map.put(prefix + entry.getKey(), entry.getValue());
169             }
170         }
171         context.clear();
172         context.putAll(map);
173     }
174 
175 	@Override
176 	public LogEvent toImmutable() {
177 		return Log4jLogEvent.createMemento(this);
178 	}
179 
180     /**
181      * Set the body in the event.
182      * @param body The body to add to the event.
183      */
184     @Override
185     public void setBody(final byte[] body) {
186         if (body == null || body.length == 0) {
187             super.setBody(new byte[0]);
188             return;
189         }
190         if (compress) {
191             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
192             try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
193                 os.write(body);
194             } catch (final IOException ioe) {
195                 throw new LoggingException("Unable to compress message", ioe);
196             }
197             super.setBody(baos.toByteArray());
198         } else {
199             super.setBody(body);
200         }
201     }
202 
203     /**
204      * Get the Frequently Qualified Class Name.
205      * @return the FQCN String.
206      */
207     @Override
208     public String getLoggerFqcn() {
209         return event.getLoggerFqcn();
210     }
211 
212     /**
213      * Returns the logging Level.
214      * @return the Level.
215      */
216     @Override
217     public Level getLevel() {
218         return event.getLevel();
219     }
220 
221     /**
222      * Returns the logger name.
223      * @return the logger name.
224      */
225     @Override
226     public String getLoggerName() {
227         return event.getLoggerName();
228     }
229 
230     /**
231      * Returns the StackTraceElement for the caller of the logging API.
232      * @return the StackTraceElement of the caller.
233      */
234     @Override
235     public StackTraceElement getSource() {
236         return event.getSource();
237     }
238 
239     /**
240      * Returns the Message.
241      * @return the Message.
242      */
243     @Override
244     public Message getMessage() {
245         return event.getMessage();
246     }
247 
248     /**
249      * Returns the Marker.
250      * @return the Marker.
251      */
252     @Override
253     public Marker getMarker() {
254         return event.getMarker();
255     }
256 
257     /**
258      * Returns the ID of the Thread.
259      * @return the ID of the Thread.
260      */
261     @Override
262     public long getThreadId() {
263         return event.getThreadId();
264     }
265 
266     /**
267      * Returns the priority of the Thread.
268      * @return the priority of the Thread.
269      */
270     @Override
271     public int getThreadPriority() {
272         return event.getThreadPriority();
273     }
274 
275     /**
276      * Returns the name of the Thread.
277      * @return the name of the Thread.
278      */
279     @Override
280     public String getThreadName() {
281         return event.getThreadName();
282     }
283 
284     /**
285      * Returns the event timestamp.
286      * @return the event timestamp.
287      */
288     @Override
289     public long getTimeMillis() {
290         return event.getTimeMillis();
291     }
292 
293     /**
294      * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
295      * or a dummy value if it is known that this value will not be used downstream.
296      * @return the event nanosecond timestamp.
297      */
298     @Override
299     public long getNanoTime() {
300         return event.getNanoTime();
301     }
302 
303     /**
304      * Returns the Throwable associated with the event, if any.
305      * @return the Throwable.
306      */
307     @Override
308     public Throwable getThrown() {
309         return event.getThrown();
310     }
311 
312     /**
313      * Returns the Throwable associated with the event, if any.
314      * @return the Throwable.
315      */
316     @Override
317     public ThrowableProxy getThrownProxy() {
318         return event.getThrownProxy();
319     }
320 
321     /**
322      * Returns a copy of the context Map.
323      * @return a copy of the context Map.
324      */
325     @Override
326     public Map<String, String> getContextMap() {
327         return contextMap;
328     }
329 
330     /**
331      * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
332      * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
333      */
334     @Override
335     public ReadOnlyStringMap getContextData() {
336         return event.getContextData();
337     }
338 
339     /**
340      * Returns a copy of the context stack.
341      * @return a copy of the context stack.
342      */
343     @Override
344     public ThreadContext.ContextStack getContextStack() {
345         return event.getContextStack();
346     }
347 
348     @Override
349     public boolean isIncludeLocation() {
350         return event.isIncludeLocation();
351     }
352 
353     @Override
354     public void setIncludeLocation(final boolean includeLocation) {
355         event.setIncludeLocation(includeLocation);
356     }
357 
358     @Override
359     public boolean isEndOfBatch() {
360         return event.isEndOfBatch();
361     }
362 
363     @Override
364     public void setEndOfBatch(final boolean endOfBatch) {
365         event.setEndOfBatch(endOfBatch);
366     }
367 
368 }