001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.kafka;
019
020import java.io.Serializable;
021import java.util.Objects;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025
026import org.apache.logging.log4j.core.AbstractLifeCycle;
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.Layout;
030import org.apache.logging.log4j.core.LogEvent;
031import org.apache.logging.log4j.core.appender.AbstractAppender;
032import org.apache.logging.log4j.core.config.Configuration;
033import org.apache.logging.log4j.core.config.Node;
034import org.apache.logging.log4j.core.config.Property;
035import org.apache.logging.log4j.core.config.plugins.Plugin;
036import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
037import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
038import org.apache.logging.log4j.core.layout.SerializedLayout;
039
040/**
041 * Sends log events to an Apache Kafka topic.
042 */
043@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
044public final class KafkaAppender extends AbstractAppender {
045
046    /**
047     * Builds KafkaAppender instances.
048     * @param <B> The type to build
049     */
050    public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
051            implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
052
053        @PluginAttribute("topic")
054        private String topic;
055
056        @PluginAttribute("key")
057        private String key;
058
059        @PluginAttribute(value = "syncSend", defaultBoolean = true)
060        private boolean syncSend;
061
062        @SuppressWarnings("resource")
063        @Override
064        public KafkaAppender build() {
065            final Layout<? extends Serializable> layout = getLayout();
066            if (layout == null) {
067                AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
068                return null;
069            }
070            final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
071                getName(), topic, syncSend, getPropertyArray(), key);
072            return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
073                    getPropertyArray());
074        }
075
076        public String getTopic() {
077            return topic;
078        }
079
080        public boolean isSyncSend() {
081            return syncSend;
082        }
083
084        public B setTopic(final String topic) {
085            this.topic = topic;
086            return asBuilder();
087        }
088
089        public B setSyncSend(final boolean syncSend) {
090            this.syncSend = syncSend;
091            return asBuilder();
092        }
093
094    }
095
096    @Deprecated
097    public static KafkaAppender createAppender(
098            final Layout<? extends Serializable> layout,
099            final Filter filter,
100            final String name,
101            final boolean ignoreExceptions,
102            final String topic,
103            final Property[] properties,
104            final Configuration configuration,
105            final String key) {
106
107        if (layout == null) {
108            AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
109            return null;
110        }
111        final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic,
112            true, properties, key);
113        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null);
114    }
115
116    /**
117     * Creates a builder for a KafkaAppender.
118     * @return a builder for a KafkaAppender.
119     */
120    @PluginBuilderFactory
121    public static <B extends Builder<B>> B newBuilder() {
122        return new Builder<B>().asBuilder();
123    }
124
125    private final KafkaManager manager;
126
127    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
128            final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties) {
129        super(name, filter, layout, ignoreExceptions, properties);
130        this.manager = Objects.requireNonNull(manager, "manager");
131    }
132
133    @Override
134    public void append(final LogEvent event) {
135        if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
136            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
137        } else {
138            try {
139                tryAppend(event);
140            } catch (final Exception e) {
141                error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
142            }
143        }
144    }
145
146    private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
147        final Layout<? extends Serializable> layout = getLayout();
148        byte[] data;
149        if (layout instanceof SerializedLayout) {
150            final byte[] header = layout.getHeader();
151            final byte[] body = layout.toByteArray(event);
152            data = new byte[header.length + body.length];
153            System.arraycopy(header, 0, data, 0, header.length);
154            System.arraycopy(body, 0, data, header.length, body.length);
155        } else {
156            data = layout.toByteArray(event);
157        }
158        manager.send(data);
159    }
160
161    @Override
162    public void start() {
163        super.start();
164        manager.startup();
165    }
166
167    @Override
168    public boolean stop(final long timeout, final TimeUnit timeUnit) {
169        setStopping();
170        boolean stopped = super.stop(timeout, timeUnit, false);
171        stopped &= manager.stop(timeout, timeUnit);
172        setStopped();
173        return stopped;
174    }
175
176    @Override
177    public String toString() {
178        return "KafkaAppender{" +
179            "name=" + getName() +
180            ", state=" + getState() +
181            ", topic=" + manager.getTopic() +
182            '}';
183    }
184}