001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.io.Serializable; 021import java.util.Objects; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TimeoutException; 025 026import org.apache.logging.log4j.core.AbstractLifeCycle; 027import org.apache.logging.log4j.core.Appender; 028import org.apache.logging.log4j.core.Filter; 029import org.apache.logging.log4j.core.Layout; 030import org.apache.logging.log4j.core.LogEvent; 031import org.apache.logging.log4j.core.appender.AbstractAppender; 032import org.apache.logging.log4j.core.config.Configuration; 033import org.apache.logging.log4j.core.config.Node; 034import org.apache.logging.log4j.core.config.Property; 035import org.apache.logging.log4j.core.config.plugins.Plugin; 036import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 037import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; 038import org.apache.logging.log4j.core.layout.SerializedLayout; 039 040/** 041 * Sends log events to an Apache Kafka topic. 042 */ 043@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) 044public final class KafkaAppender extends AbstractAppender { 045 046 /** 047 * Builds KafkaAppender instances. 048 * @param <B> The type to build 049 */ 050 public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B> 051 implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> { 052 053 @PluginAttribute("topic") 054 private String topic; 055 056 @PluginAttribute("key") 057 private String key; 058 059 @PluginAttribute(value = "syncSend", defaultBoolean = true) 060 private boolean syncSend; 061 062 @SuppressWarnings("resource") 063 @Override 064 public KafkaAppender build() { 065 final Layout<? extends Serializable> layout = getLayout(); 066 if (layout == null) { 067 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); 068 return null; 069 } 070 final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), 071 getName(), topic, syncSend, getPropertyArray(), key); 072 return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager, 073 getPropertyArray()); 074 } 075 076 public String getTopic() { 077 return topic; 078 } 079 080 public boolean isSyncSend() { 081 return syncSend; 082 } 083 084 public B setTopic(final String topic) { 085 this.topic = topic; 086 return asBuilder(); 087 } 088 089 public B setSyncSend(final boolean syncSend) { 090 this.syncSend = syncSend; 091 return asBuilder(); 092 } 093 094 } 095 096 @Deprecated 097 public static KafkaAppender createAppender( 098 final Layout<? extends Serializable> layout, 099 final Filter filter, 100 final String name, 101 final boolean ignoreExceptions, 102 final String topic, 103 final Property[] properties, 104 final Configuration configuration, 105 final String key) { 106 107 if (layout == null) { 108 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); 109 return null; 110 } 111 final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, 112 true, properties, key); 113 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null); 114 } 115 116 /** 117 * Creates a builder for a KafkaAppender. 118 * @return a builder for a KafkaAppender. 119 */ 120 @PluginBuilderFactory 121 public static <B extends Builder<B>> B newBuilder() { 122 return new Builder<B>().asBuilder(); 123 } 124 125 private final KafkaManager manager; 126 127 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, 128 final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties) { 129 super(name, filter, layout, ignoreExceptions, properties); 130 this.manager = Objects.requireNonNull(manager, "manager"); 131 } 132 133 @Override 134 public void append(final LogEvent event) { 135 if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) { 136 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); 137 } else { 138 try { 139 tryAppend(event); 140 } catch (final Exception e) { 141 error("Unable to write to Kafka in appender [" + getName() + "]", event, e); 142 } 143 } 144 } 145 146 private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException { 147 final Layout<? extends Serializable> layout = getLayout(); 148 byte[] data; 149 if (layout instanceof SerializedLayout) { 150 final byte[] header = layout.getHeader(); 151 final byte[] body = layout.toByteArray(event); 152 data = new byte[header.length + body.length]; 153 System.arraycopy(header, 0, data, 0, header.length); 154 System.arraycopy(body, 0, data, header.length, body.length); 155 } else { 156 data = layout.toByteArray(event); 157 } 158 manager.send(data); 159 } 160 161 @Override 162 public void start() { 163 super.start(); 164 manager.startup(); 165 } 166 167 @Override 168 public boolean stop(final long timeout, final TimeUnit timeUnit) { 169 setStopping(); 170 boolean stopped = super.stop(timeout, timeUnit, false); 171 stopped &= manager.stop(timeout, timeUnit); 172 setStopped(); 173 return stopped; 174 } 175 176 @Override 177 public String toString() { 178 return "KafkaAppender{" + 179 "name=" + getName() + 180 ", state=" + getState() + 181 ", topic=" + manager.getTopic() + 182 '}'; 183 } 184}