1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import org.apache.logging.log4j.core.AbstractLifeCycle;
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.Layout;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.appender.AbstractAppender;
32 import org.apache.logging.log4j.core.config.Configuration;
33 import org.apache.logging.log4j.core.config.Node;
34 import org.apache.logging.log4j.core.config.Property;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38 import org.apache.logging.log4j.core.layout.SerializedLayout;
39
40
41
42
43 @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
44 public final class KafkaAppender extends AbstractAppender {
45
46
47
48
49
50 public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
51 implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
52
53 @PluginAttribute("topic")
54 private String topic;
55
56 @PluginAttribute("key")
57 private String key;
58
59 @PluginAttribute(value = "syncSend", defaultBoolean = true)
60 private boolean syncSend;
61
62 @SuppressWarnings("resource")
63 @Override
64 public KafkaAppender build() {
65 final Layout<? extends Serializable> layout = getLayout();
66 if (layout == null) {
67 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
68 return null;
69 }
70 final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
71 getName(), topic, syncSend, getPropertyArray(), key);
72 return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
73 getPropertyArray());
74 }
75
76 public String getTopic() {
77 return topic;
78 }
79
80 public boolean isSyncSend() {
81 return syncSend;
82 }
83
84 public B setTopic(final String topic) {
85 this.topic = topic;
86 return asBuilder();
87 }
88
89 public B setSyncSend(final boolean syncSend) {
90 this.syncSend = syncSend;
91 return asBuilder();
92 }
93
94 }
95
96 @Deprecated
97 public static KafkaAppender createAppender(
98 final Layout<? extends Serializable> layout,
99 final Filter filter,
100 final String name,
101 final boolean ignoreExceptions,
102 final String topic,
103 final Property[] properties,
104 final Configuration configuration,
105 final String key) {
106
107 if (layout == null) {
108 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
109 return null;
110 }
111 final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic,
112 true, properties, key);
113 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null);
114 }
115
116
117
118
119
120 @PluginBuilderFactory
121 public static <B extends Builder<B>> B newBuilder() {
122 return new Builder<B>().asBuilder();
123 }
124
125 private final KafkaManager manager;
126
127 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
128 final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties) {
129 super(name, filter, layout, ignoreExceptions, properties);
130 this.manager = Objects.requireNonNull(manager, "manager");
131 }
132
133 @Override
134 public void append(final LogEvent event) {
135 if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
136 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
137 } else {
138 try {
139 tryAppend(event);
140 } catch (final Exception e) {
141 error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
142 }
143 }
144 }
145
146 private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
147 final Layout<? extends Serializable> layout = getLayout();
148 byte[] data;
149 if (layout instanceof SerializedLayout) {
150 final byte[] header = layout.getHeader();
151 final byte[] body = layout.toByteArray(event);
152 data = new byte[header.length + body.length];
153 System.arraycopy(header, 0, data, 0, header.length);
154 System.arraycopy(body, 0, data, header.length, body.length);
155 } else {
156 data = layout.toByteArray(event);
157 }
158 manager.send(data);
159 }
160
161 @Override
162 public void start() {
163 super.start();
164 manager.startup();
165 }
166
167 @Override
168 public boolean stop(final long timeout, final TimeUnit timeUnit) {
169 setStopping();
170 boolean stopped = super.stop(timeout, timeUnit, false);
171 stopped &= manager.stop(timeout, timeUnit);
172 setStopped();
173 return stopped;
174 }
175
176 @Override
177 public String toString() {
178 return "KafkaAppender{" +
179 "name=" + getName() +
180 ", state=" + getState() +
181 ", topic=" + manager.getTopic() +
182 '}';
183 }
184 }