001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.kafka;
019
020import java.io.Serializable;
021import java.util.Objects;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025
026import org.apache.logging.log4j.core.AbstractLifeCycle;
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.Layout;
030import org.apache.logging.log4j.core.LogEvent;
031import org.apache.logging.log4j.core.appender.AbstractAppender;
032import org.apache.logging.log4j.core.config.Configuration;
033import org.apache.logging.log4j.core.config.Node;
034import org.apache.logging.log4j.core.config.Property;
035import org.apache.logging.log4j.core.config.plugins.Plugin;
036import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
037import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
038import org.apache.logging.log4j.core.config.plugins.PluginElement;
039import org.apache.logging.log4j.core.layout.SerializedLayout;
040
041/**
042 * Sends log events to an Apache Kafka topic.
043 */
044@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
045public final class KafkaAppender extends AbstractAppender {
046
047    /**
048     * Builds KafkaAppender instances.
049     * @param <B> The type to build
050     */
051    public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
052            implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
053
054        @PluginAttribute("topic") 
055        private String topic;
056
057        @PluginAttribute("key")
058        private String key;
059        
060        @PluginAttribute(value = "syncSend", defaultBoolean = true)
061        private boolean syncSend;
062
063        @PluginElement("Properties") 
064        private Property[] properties;
065
066        @SuppressWarnings("resource")
067        @Override
068        public KafkaAppender build() {
069            final Layout<? extends Serializable> layout = getLayout();
070            if (layout == null) {
071                AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
072                return null;
073            }
074            final KafkaManager kafkaManager =
075                    new KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, properties, key);
076            return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager);
077        }
078
079        public String getTopic() {
080            return topic;
081        }
082
083        public boolean isSyncSend() {
084            return syncSend;
085        }
086
087        public Property[] getProperties() {
088            return properties;
089        }
090
091        public B setTopic(final String topic) {
092            this.topic = topic;
093            return asBuilder();
094        }
095
096        public B setSyncSend(final boolean syncSend) {
097            this.syncSend = syncSend;
098            return asBuilder();
099        }
100
101        public B setProperties(final Property[] properties) {
102            this.properties = properties;
103            return asBuilder();
104        }
105    }
106    
107    @Deprecated
108    public static KafkaAppender createAppender(
109            final Layout<? extends Serializable> layout,
110            final Filter filter,
111            final String name,
112            final boolean ignoreExceptions,
113            final String topic,
114            final Property[] properties,
115            final Configuration configuration,
116            final String key) {
117
118        if (layout == null) {
119            AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
120            return null;
121        }
122        final KafkaManager kafkaManager =
123                new KafkaManager(configuration.getLoggerContext(), name, topic, true, properties, key);
124        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
125    }
126
127    /**
128     * Creates a builder for a KafkaAppender.
129     * @return a builder for a KafkaAppender.
130     */
131    @PluginBuilderFactory
132    public static <B extends Builder<B>> B newBuilder() {
133        return new Builder<B>().asBuilder();
134    }
135
136    private final KafkaManager manager;
137
138    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
139            final boolean ignoreExceptions, final KafkaManager manager) {
140        super(name, filter, layout, ignoreExceptions);
141        this.manager = Objects.requireNonNull(manager, "manager");
142    }
143
144    @Override
145    public void append(final LogEvent event) {
146        if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
147            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
148        } else {
149            try {
150                tryAppend(event);
151            } catch (final Exception e) {
152                error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
153            }
154        }
155    }
156
157    private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
158        final Layout<? extends Serializable> layout = getLayout();
159        byte[] data;
160        if (layout instanceof SerializedLayout) {
161            final byte[] header = layout.getHeader();
162            final byte[] body = layout.toByteArray(event);
163            data = new byte[header.length + body.length];
164            System.arraycopy(header, 0, data, 0, header.length);
165            System.arraycopy(body, 0, data, header.length, body.length);
166        } else {
167            data = layout.toByteArray(event);
168        }
169        manager.send(data);
170    }
171
172    @Override
173    public void start() {
174        super.start();
175        manager.startup();
176    }
177
178    @Override
179    public boolean stop(final long timeout, final TimeUnit timeUnit) {
180        setStopping();
181        boolean stopped = super.stop(timeout, timeUnit, false);
182        stopped &= manager.stop(timeout, timeUnit);
183        setStopped();
184        return stopped;
185    }
186
187    @Override
188    public String toString() {
189        return "KafkaAppender{" +
190            "name=" + getName() +
191            ", state=" + getState() +
192            ", topic=" + manager.getTopic() +
193            '}';
194    }
195}