001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.nio.charset.StandardCharsets; 021import java.util.Objects; 022import java.util.Properties; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027 028import org.apache.kafka.clients.producer.Callback; 029import org.apache.kafka.clients.producer.Producer; 030import org.apache.kafka.clients.producer.ProducerRecord; 031import org.apache.kafka.clients.producer.RecordMetadata; 032import org.apache.logging.log4j.core.LoggerContext; 033import org.apache.logging.log4j.core.appender.AbstractManager; 034import org.apache.logging.log4j.core.appender.ManagerFactory; 035import org.apache.logging.log4j.core.config.Property; 036import org.apache.logging.log4j.core.util.Log4jThread; 037 038public class KafkaManager extends AbstractManager { 039 040 public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; 041 042 /** 043 * package-private access for testing. 044 */ 045 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); 046 047 private final Properties config = new Properties(); 048 private Producer<byte[], byte[]> producer; 049 private final int timeoutMillis; 050 051 private final String topic; 052 private final String key; 053 private final boolean syncSend; 054 private static final KafkaManagerFactory factory = new KafkaManagerFactory(); 055 056 /* 057 * The Constructor should have been declared private as all Managers are create by the internal factory; 058 */ 059 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, 060 final Property[] properties, final String key) { 061 super(loggerContext, name); 062 this.topic = Objects.requireNonNull(topic, "topic"); 063 this.syncSend = syncSend; 064 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 065 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 066 config.setProperty("batch.size", "0"); 067 for (final Property property : properties) { 068 config.setProperty(property.getName(), property.getValue()); 069 } 070 071 this.key = key; 072 073 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); 074 } 075 076 @Override 077 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 078 if (timeout > 0) { 079 closeProducer(timeout, timeUnit); 080 } else { 081 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS); 082 } 083 return true; 084 } 085 086 private void closeProducer(final long timeout, final TimeUnit timeUnit) { 087 if (producer != null) { 088 // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660 089 final Thread closeThread = new Log4jThread(new Runnable() { 090 @Override 091 public void run() { 092 if (producer != null) { 093 producer.close(); 094 } 095 } 096 }, "KafkaManager-CloseThread"); 097 closeThread.setDaemon(true); // avoid blocking JVM shutdown 098 closeThread.start(); 099 try { 100 closeThread.join(timeUnit.toMillis(timeout)); 101 } catch (final InterruptedException ignore) { 102 Thread.currentThread().interrupt(); 103 // ignore 104 } 105 } 106 } 107 108 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { 109 if (producer != null) { 110 byte[] newKey = null; 111 112 if(key != null && key.contains("${")) { 113 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8); 114 } else if (key != null) { 115 newKey = key.getBytes(StandardCharsets.UTF_8); 116 } 117 118 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); 119 if (syncSend) { 120 final Future<RecordMetadata> response = producer.send(newRecord); 121 response.get(timeoutMillis, TimeUnit.MILLISECONDS); 122 } else { 123 producer.send(newRecord, new Callback() { 124 @Override 125 public void onCompletion(final RecordMetadata metadata, final Exception e) { 126 if (e != null) { 127 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); 128 } 129 } 130 }); 131 } 132 } 133 } 134 135 public void startup() { 136 producer = producerFactory.newKafkaProducer(config); 137 } 138 139 public String getTopic() { 140 return topic; 141 } 142 143 public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic, 144 final boolean syncSend, final Property[] properties, final String key) { 145 StringBuilder sb = new StringBuilder(name); 146 for (Property prop: properties) { 147 sb.append(" ").append(prop.getName()).append("=").append(prop.getValue()); 148 } 149 return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key)); 150 } 151 152 private static class FactoryData { 153 private final LoggerContext loggerContext; 154 private final String topic; 155 private final boolean syncSend; 156 private final Property[] properties; 157 private final String key; 158 159 public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend, 160 final Property[] properties, final String key) { 161 this.loggerContext = loggerContext; 162 this.topic = topic; 163 this.syncSend = syncSend; 164 this.properties = properties; 165 this.key = key; 166 } 167 168 } 169 170 private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> { 171 @Override 172 public KafkaManager createManager(String name, FactoryData data) { 173 return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key); 174 } 175 } 176 177}