View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.core.LogEvent;
33  import org.apache.logging.log4j.core.impl.ThrowableProxy;
34  import org.apache.logging.log4j.core.util.Patterns;
35  import org.apache.logging.log4j.core.util.UuidUtil;
36  import org.apache.logging.log4j.message.MapMessage;
37  import org.apache.logging.log4j.message.Message;
38  import org.apache.logging.log4j.message.StructuredDataId;
39  import org.apache.logging.log4j.message.StructuredDataMessage;
40  import org.apache.logging.log4j.util.Strings;
41  
42  /**
43   * Class that is both a Flume and Log4j Event.
44   */
45  public class FlumeEvent extends SimpleEvent implements LogEvent {
46  
47      static final String GUID = "guId";
48      /**
49       * Generated serial version ID.
50       */
51      private static final long serialVersionUID = -8988674608627854140L;
52  
53      private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
54  
55      private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
56  
57      private static final String EVENT_TYPE = "eventType";
58  
59      private static final String EVENT_ID = "eventId";
60  
61      private static final String TIMESTAMP = "timeStamp";
62  
63      private final LogEvent event;
64  
65      private final Map<String, String> contextMap = new HashMap<>();
66  
67      private final boolean compress;
68  
69      /**
70       * Construct the FlumeEvent.
71       * @param event The Log4j LogEvent.
72       * @param includes A comma separated list of MDC elements to include.
73       * @param excludes A comma separated list of MDC elements to exclude.
74       * @param required A comma separated list of MDC elements that are required to be defined.
75       * @param mdcPrefix The value to prefix to MDC keys.
76       * @param eventPrefix The value to prefix to event keys.
77       * @param compress If true the event body should be compressed.
78       */
79      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
80                        String mdcPrefix, String eventPrefix, final boolean compress) {
81          this.event = event;
82          this.compress = compress;
83          final Map<String, String> headers = getHeaders();
84          headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
85          if (mdcPrefix == null) {
86              mdcPrefix = DEFAULT_MDC_PREFIX;
87          }
88          if (eventPrefix == null) {
89              eventPrefix = DEFAULT_EVENT_PREFIX;
90          }
91          final Map<String, String> mdc = event.getContextMap();
92          if (includes != null) {
93              final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
94              if (array.length > 0) {
95                  for (String str : array) {
96                      str = str.trim();
97                      if (mdc.containsKey(str)) {
98                          contextMap.put(str, mdc.get(str));
99                      }
100                 }
101             }
102         } else if (excludes != null) {
103             final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
104             if (array.length > 0) {
105                 final List<String> list = new ArrayList<>(array.length);
106                 for (final String value : array) {
107                     list.add(value.trim());
108                 }
109                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
110                     if (!list.contains(entry.getKey())) {
111                         contextMap.put(entry.getKey(), entry.getValue());
112                     }
113                 }
114             }
115         } else {
116             contextMap.putAll(mdc);
117         }
118 
119         if (required != null) {
120             final String[] array = required.split(Patterns.COMMA_SEPARATOR);
121             if (array.length > 0) {
122                 for (String str : array) {
123                     str = str.trim();
124                     if (!mdc.containsKey(str)) {
125                         throw new LoggingException("Required key " + str + " is missing from the MDC");
126                     }
127                 }
128             }
129         }
130         final String guid =  UuidUtil.getTimeBasedUuid().toString();
131         final Message message = event.getMessage();
132         if (message instanceof MapMessage) {
133             // Add the guid to the Map so that it can be included in the Layout.
134             ((MapMessage) message).put(GUID, guid);
135             if (message instanceof StructuredDataMessage) {
136                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
137             }
138             addMapData(eventPrefix, headers, (MapMessage) message);
139         } else {
140             headers.put(GUID, guid);
141         }
142 
143         addContextData(mdcPrefix, headers, contextMap);
144     }
145 
146     protected void addStructuredData(final String prefix, final Map<String, String> fields,
147                                      final StructuredDataMessage msg) {
148         fields.put(prefix + EVENT_TYPE, msg.getType());
149         final StructuredDataId id = msg.getId();
150         fields.put(prefix + EVENT_ID, id.getName());
151     }
152 
153     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
154         final Map<String, String> data = msg.getData();
155         for (final Map.Entry<String, String> entry : data.entrySet()) {
156             fields.put(prefix + entry.getKey(), entry.getValue());
157         }
158     }
159 
160     protected void addContextData(final String prefix, final Map<String, String> fields,
161                                   final Map<String, String> context) {
162         final Map<String, String> map = new HashMap<>();
163         for (final Map.Entry<String, String> entry : context.entrySet()) {
164             if (entry.getKey() != null && entry.getValue() != null) {
165                 fields.put(prefix + entry.getKey(), entry.getValue());
166                 map.put(prefix + entry.getKey(), entry.getValue());
167             }
168         }
169         context.clear();
170         context.putAll(map);
171     }
172 
173     /**
174      * Set the body in the event.
175      * @param body The body to add to the event.
176      */
177     @Override
178     public void setBody(final byte[] body) {
179         if (body == null || body.length == 0) {
180             super.setBody(new byte[0]);
181             return;
182         }
183         if (compress) {
184             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185             try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
186                 os.write(body);
187             } catch (final IOException ioe) {
188                 throw new LoggingException("Unable to compress message", ioe);
189             }
190             super.setBody(baos.toByteArray());
191         } else {
192             super.setBody(body);
193         }
194     }
195 
196     /**
197      * Get the Frequently Qualified Class Name.
198      * @return the FQCN String.
199      */
200     @Override
201     public String getLoggerFqcn() {
202         return event.getLoggerFqcn();
203     }
204 
205     /**
206      * Returns the logging Level.
207      * @return the Level.
208      */
209     @Override
210     public Level getLevel() {
211         return event.getLevel();
212     }
213 
214     /**
215      * Returns the logger name.
216      * @return the logger name.
217      */
218     @Override
219     public String getLoggerName() {
220         return event.getLoggerName();
221     }
222 
223     /**
224      * Returns the StackTraceElement for the caller of the logging API.
225      * @return the StackTraceElement of the caller.
226      */
227     @Override
228     public StackTraceElement getSource() {
229         return event.getSource();
230     }
231 
232     /**
233      * Returns the Message.
234      * @return the Message.
235      */
236     @Override
237     public Message getMessage() {
238         return event.getMessage();
239     }
240 
241     /**
242      * Returns the Marker.
243      * @return the Marker.
244      */
245     @Override
246     public Marker getMarker() {
247         return event.getMarker();
248     }
249 
250     /**
251      * Returns the name of the Thread.
252      * @return the name of the Thread.
253      */
254     @Override
255     public String getThreadName() {
256         return event.getThreadName();
257     }
258 
259     /**
260      * Returns the event timestamp.
261      * @return the event timestamp.
262      */
263     @Override
264     public long getTimeMillis() {
265         return event.getTimeMillis();
266     }
267 
268     /**
269      * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
270      * or a dummy value if it is known that this value will not be used downstream.
271      * @return the event nanosecond timestamp.
272      */
273     @Override
274     public long getNanoTime() {
275         return event.getNanoTime();
276     }
277 
278     /**
279      * Returns the Throwable associated with the event, if any.
280      * @return the Throwable.
281      */
282     @Override
283     public Throwable getThrown() {
284         return event.getThrown();
285     }
286 
287     /**
288      * Returns the Throwable associated with the event, if any.
289      * @return the Throwable.
290      */
291     @Override
292     public ThrowableProxy getThrownProxy() {
293         return event.getThrownProxy();
294     }
295 
296     /**
297      * Returns a copy of the context Map.
298      * @return a copy of the context Map.
299      */
300     @Override
301     public Map<String, String> getContextMap() {
302         return contextMap;
303     }
304 
305     /**
306      * Returns a copy of the context stack.
307      * @return a copy of the context stack.
308      */
309     @Override
310     public ThreadContext.ContextStack getContextStack() {
311         return event.getContextStack();
312     }
313 
314     @Override
315     public boolean isIncludeLocation() {
316         return event.isIncludeLocation();
317     }
318 
319     @Override
320     public void setIncludeLocation(final boolean includeLocation) {
321         event.setIncludeLocation(includeLocation);
322     }
323 
324     @Override
325     public boolean isEndOfBatch() {
326         return event.isEndOfBatch();
327     }
328 
329     @Override
330     public void setEndOfBatch(final boolean endOfBatch) {
331         event.setEndOfBatch(endOfBatch);
332     }
333 }