001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.nio.charset.StandardCharsets; 021import java.util.Objects; 022import java.util.Properties; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027 028import org.apache.kafka.clients.producer.Callback; 029import org.apache.kafka.clients.producer.Producer; 030import org.apache.kafka.clients.producer.ProducerRecord; 031import org.apache.kafka.clients.producer.RecordMetadata; 032import org.apache.logging.log4j.core.LoggerContext; 033import org.apache.logging.log4j.core.appender.AbstractManager; 034import org.apache.logging.log4j.core.config.Property; 035import org.apache.logging.log4j.core.util.Log4jThread; 036 037public class KafkaManager extends AbstractManager { 038 039 public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; 040 041 /** 042 * package-private access for testing. 043 */ 044 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); 045 046 private final Properties config = new Properties(); 047 private Producer<byte[], byte[]> producer; 048 private final int timeoutMillis; 049 050 private final String topic; 051 private final String key; 052 private final boolean syncSend; 053 054 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, 055 final Property[] properties, final String key) { 056 super(loggerContext, name); 057 this.topic = Objects.requireNonNull(topic, "topic"); 058 this.syncSend = syncSend; 059 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 060 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 061 config.setProperty("batch.size", "0"); 062 for (final Property property : properties) { 063 config.setProperty(property.getName(), property.getValue()); 064 } 065 066 this.key = key; 067 068 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); 069 } 070 071 @Override 072 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 073 if (timeout > 0) { 074 closeProducer(timeout, timeUnit); 075 } else { 076 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS); 077 } 078 return true; 079 } 080 081 private void closeProducer(final long timeout, final TimeUnit timeUnit) { 082 if (producer != null) { 083 // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660 084 final Thread closeThread = new Log4jThread(new Runnable() { 085 @Override 086 public void run() { 087 if (producer != null) { 088 producer.close(); 089 } 090 } 091 }, "KafkaManager-CloseThread"); 092 closeThread.setDaemon(true); // avoid blocking JVM shutdown 093 closeThread.start(); 094 try { 095 closeThread.join(timeUnit.toMillis(timeout)); 096 } catch (final InterruptedException ignore) { 097 Thread.currentThread().interrupt(); 098 // ignore 099 } 100 } 101 } 102 103 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { 104 if (producer != null) { 105 byte[] newKey = null; 106 107 if(key != null && key.contains("${")) { 108 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8); 109 } else if (key != null) { 110 newKey = key.getBytes(StandardCharsets.UTF_8); 111 } 112 113 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); 114 if (syncSend) { 115 final Future<RecordMetadata> response = producer.send(newRecord); 116 response.get(timeoutMillis, TimeUnit.MILLISECONDS); 117 } else { 118 producer.send(newRecord, new Callback() { 119 @Override 120 public void onCompletion(final RecordMetadata metadata, final Exception e) { 121 if (e != null) { 122 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); 123 } 124 } 125 }); 126 } 127 } 128 } 129 130 public void startup() { 131 producer = producerFactory.newKafkaProducer(config); 132 } 133 134 public String getTopic() { 135 return topic; 136 } 137 138}