View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.io.Serializable;
21  import java.util.Objects;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.apache.logging.log4j.core.AbstractLifeCycle;
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.Layout;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.appender.AbstractAppender;
32  import org.apache.logging.log4j.core.config.Configuration;
33  import org.apache.logging.log4j.core.config.Node;
34  import org.apache.logging.log4j.core.config.Property;
35  import org.apache.logging.log4j.core.config.plugins.Plugin;
36  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38  import org.apache.logging.log4j.core.layout.SerializedLayout;
39  
40  /**
41   * Sends log events to an Apache Kafka topic.
42   */
43  @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
44  public final class KafkaAppender extends AbstractAppender {
45  
46      /**
47       * Builds KafkaAppender instances.
48       * @param <B> The type to build
49       */
50      public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
51              implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
52  
53          @PluginAttribute("topic")
54          private String topic;
55  
56          @PluginAttribute("key")
57          private String key;
58  
59          @PluginAttribute(value = "syncSend", defaultBoolean = true)
60          private boolean syncSend;
61  
62          @SuppressWarnings("resource")
63          @Override
64          public KafkaAppender build() {
65              final Layout<? extends Serializable> layout = getLayout();
66              if (layout == null) {
67                  AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
68                  return null;
69              }
70              final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
71                  getName(), topic, syncSend, getPropertyArray(), key);
72              return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
73                      getPropertyArray());
74          }
75  
76          public String getTopic() {
77              return topic;
78          }
79  
80          public boolean isSyncSend() {
81              return syncSend;
82          }
83  
84          public B setTopic(final String topic) {
85              this.topic = topic;
86              return asBuilder();
87          }
88  
89          public B setSyncSend(final boolean syncSend) {
90              this.syncSend = syncSend;
91              return asBuilder();
92          }
93  
94      }
95  
96      @Deprecated
97      public static KafkaAppender createAppender(
98              final Layout<? extends Serializable> layout,
99              final Filter filter,
100             final String name,
101             final boolean ignoreExceptions,
102             final String topic,
103             final Property[] properties,
104             final Configuration configuration,
105             final String key) {
106 
107         if (layout == null) {
108             AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
109             return null;
110         }
111         final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic,
112             true, properties, key);
113         return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null);
114     }
115 
116     /**
117      * Creates a builder for a KafkaAppender.
118      * @return a builder for a KafkaAppender.
119      */
120     @PluginBuilderFactory
121     public static <B extends Builder<B>> B newBuilder() {
122         return new Builder<B>().asBuilder();
123     }
124 
125     private final KafkaManager manager;
126 
127     private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
128             final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties) {
129         super(name, filter, layout, ignoreExceptions, properties);
130         this.manager = Objects.requireNonNull(manager, "manager");
131     }
132 
133     @Override
134     public void append(final LogEvent event) {
135         if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
136             LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
137         } else {
138             try {
139                 tryAppend(event);
140             } catch (final Exception e) {
141                 error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
142             }
143         }
144     }
145 
146     private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
147         final Layout<? extends Serializable> layout = getLayout();
148         byte[] data;
149         if (layout instanceof SerializedLayout) {
150             final byte[] header = layout.getHeader();
151             final byte[] body = layout.toByteArray(event);
152             data = new byte[header.length + body.length];
153             System.arraycopy(header, 0, data, 0, header.length);
154             System.arraycopy(body, 0, data, header.length, body.length);
155         } else {
156             data = layout.toByteArray(event);
157         }
158         manager.send(data);
159     }
160 
161     @Override
162     public void start() {
163         super.start();
164         manager.startup();
165     }
166 
167     @Override
168     public boolean stop(final long timeout, final TimeUnit timeUnit) {
169         setStopping();
170         boolean stopped = super.stop(timeout, timeUnit, false);
171         stopped &= manager.stop(timeout, timeUnit);
172         setStopped();
173         return stopped;
174     }
175 
176     @Override
177     public String toString() {
178         return "KafkaAppender{" +
179             "name=" + getName() +
180             ", state=" + getState() +
181             ", topic=" + manager.getTopic() +
182             '}';
183     }
184 }