001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.nio.charset.StandardCharsets; 021import java.util.Objects; 022import java.util.Properties; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027 028import org.apache.kafka.clients.producer.Producer; 029import org.apache.kafka.clients.producer.ProducerRecord; 030import org.apache.kafka.clients.producer.RecordMetadata; 031import org.apache.logging.log4j.core.LoggerContext; 032import org.apache.logging.log4j.core.appender.AbstractManager; 033import org.apache.logging.log4j.core.appender.ManagerFactory; 034import org.apache.logging.log4j.core.config.Property; 035import org.apache.logging.log4j.core.util.Log4jThread; 036 037public class KafkaManager extends AbstractManager { 038 039 public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; 040 041 /** 042 * package-private access for testing. 043 */ 044 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); 045 046 private final Properties config = new Properties(); 047 private Producer<byte[], byte[]> producer; 048 private final int timeoutMillis; 049 050 private final String topic; 051 private final String key; 052 private final boolean syncSend; 053 private static final KafkaManagerFactory factory = new KafkaManagerFactory(); 054 055 /* 056 * The Constructor should have been declared private as all Managers are create 057 * by the internal factory; 058 */ 059 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, 060 final boolean syncSend, final Property[] properties, final String key) { 061 super(loggerContext, name); 062 this.topic = Objects.requireNonNull(topic, "topic"); 063 this.syncSend = syncSend; 064 065 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 066 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 067 config.setProperty("batch.size", "0"); 068 069 for (final Property property : properties) { 070 config.setProperty(property.getName(), property.getValue()); 071 } 072 073 this.key = key; 074 075 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); 076 } 077 078 @Override 079 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 080 if (timeout > 0) { 081 closeProducer(timeout, timeUnit); 082 } else { 083 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS); 084 } 085 return true; 086 } 087 088 private void closeProducer(final long timeout, final TimeUnit timeUnit) { 089 if (producer != null) { 090 // This thread is a workaround for this Kafka issue: 091 // https://issues.apache.org/jira/browse/KAFKA-1660 092 final Thread closeThread = new Log4jThread(() -> { 093 if (producer != null) { 094 producer.close(); 095 } 096 }, "KafkaManager-CloseThread"); 097 closeThread.setDaemon(true); // avoid blocking JVM shutdown 098 closeThread.start(); 099 try { 100 closeThread.join(timeUnit.toMillis(timeout)); 101 } catch (final InterruptedException ignore) { 102 Thread.currentThread().interrupt(); 103 // ignore 104 } 105 } 106 } 107 108 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { 109 if (producer != null) { 110 byte[] newKey = null; 111 112 if (key != null && key.contains("${")) { 113 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key) 114 .getBytes(StandardCharsets.UTF_8); 115 } else if (key != null) { 116 newKey = key.getBytes(StandardCharsets.UTF_8); 117 } 118 119 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); 120 if (syncSend) { 121 final Future<RecordMetadata> response = producer.send(newRecord); 122 response.get(timeoutMillis, TimeUnit.MILLISECONDS); 123 } else { 124 producer.send(newRecord, (metadata, e) -> { 125 if (e != null) { 126 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); 127 } 128 }); 129 } 130 } 131 } 132 133 public void startup() { 134 if (producer == null) { 135 producer = producerFactory.newKafkaProducer(config); 136 } 137 } 138 139 public String getTopic() { 140 return topic; 141 } 142 143 public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic, 144 final boolean syncSend, final Property[] properties, final String key) { 145 StringBuilder sb = new StringBuilder(name); 146 for (Property prop : properties) { 147 sb.append(" ").append(prop.getName()).append("=").append(prop.getValue()); 148 } 149 return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key)); 150 } 151 152 private static class FactoryData { 153 private final LoggerContext loggerContext; 154 private final String topic; 155 private final boolean syncSend; 156 private final Property[] properties; 157 private final String key; 158 159 public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend, 160 final Property[] properties, final String key) { 161 this.loggerContext = loggerContext; 162 this.topic = topic; 163 this.syncSend = syncSend; 164 this.properties = properties; 165 this.key = key; 166 } 167 168 } 169 170 private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> { 171 @Override 172 public KafkaManager createManager(String name, FactoryData data) { 173 return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key); 174 } 175 } 176 177}