001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.logging.log4j.core.Appender; 026import org.apache.logging.log4j.core.Filter; 027import org.apache.logging.log4j.core.Layout; 028import org.apache.logging.log4j.core.LogEvent; 029import org.apache.logging.log4j.core.appender.AbstractAppender; 030import org.apache.logging.log4j.core.config.Node; 031import org.apache.logging.log4j.core.config.Property; 032import org.apache.logging.log4j.core.config.plugins.Plugin; 033import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 034import org.apache.logging.log4j.core.config.plugins.PluginElement; 035import org.apache.logging.log4j.core.config.plugins.PluginFactory; 036import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 037import org.apache.logging.log4j.core.layout.PatternLayout; 038import org.apache.logging.log4j.util.Strings; 039 040/** 041 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. 042 * <p> 043 * Requires the JeroMQ jar (LGPL as of 0.3.5) 044 * </p> 045 */ 046// TODO 047// Some methods are synchronized because a ZMQ.Socket is not thread-safe 048// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be 049// some issue on threads owning certain resources as opposed to others. 050@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) 051public final class JeroMqAppender extends AbstractAppender { 052 053 private static final int DEFAULT_BACKLOG = 100; 054 055 private static final int DEFAULT_IVL = 100; 056 057 private static final int DEFAULT_RCV_HWM = 1000; 058 059 private static final int DEFAULT_SND_HWM = 1000; 060 061 private final JeroMqManager manager; 062 private final List<String> endpoints; 063 private int sendRcFalse; 064 private int sendRcTrue; 065 066 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 067 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, 068 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, 069 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, 070 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, 071 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 072 final long tcpKeepAliveInterval, final boolean xpubVerbose, final Property[] properties) { 073 super(name, filter, layout, ignoreExceptions, properties); 074 this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, 075 linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, 076 sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, 077 tcpKeepAliveInterval, xpubVerbose, endpoints); 078 this.endpoints = endpoints; 079 } 080 081 // The ZMQ.Socket class has other set methods that we do not cover because 082 // they throw unsupported operation exceptions. 083 @PluginFactory 084 public static JeroMqAppender createAppender( 085 // @formatter:off 086 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, 087 @PluginElement("Layout") Layout<?> layout, 088 @PluginElement("Filter") final Filter filter, 089 @PluginElement("Properties") final Property[] properties, 090 // Super attributes 091 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, 092 // ZMQ attributes; defaults picked from zmq.Options. 093 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, 094 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, 095 @PluginAttribute(value = "delayAttachOnConnect") final boolean delayAttachOnConnect, 096 @PluginAttribute(value = "identity") final byte[] identity, 097 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, 098 @PluginAttribute(value = "linger", defaultLong = -1) final long linger, 099 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, 100 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, 101 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, 102 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, 103 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, 104 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, 105 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, 106 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, 107 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, 108 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, 109 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, 110 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, 111 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, 112 @PluginAttribute(value = "xpubVerbose") final boolean xpubVerbose 113 // @formatter:on 114 ) { 115 if (layout == null) { 116 layout = PatternLayout.createDefaultLayout(); 117 } 118 List<String> endpoints; 119 if (properties == null) { 120 endpoints = new ArrayList<>(0); 121 } else { 122 endpoints = new ArrayList<>(properties.length); 123 for (final Property property : properties) { 124 if ("endpoint".equalsIgnoreCase(property.getName())) { 125 final String value = property.getValue(); 126 if (Strings.isNotEmpty(value)) { 127 endpoints.add(value); 128 } 129 } 130 } 131 } 132 LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", 133 name, filter, layout, ignoreExceptions, endpoints); 134 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, 135 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, 136 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, 137 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, null); 138 } 139 140 @Override 141 public synchronized void append(final LogEvent event) { 142 final Layout<? extends Serializable> layout = getLayout(); 143 final byte[] formattedMessage = layout.toByteArray(event); 144 if (manager.send(getLayout().toByteArray(event))) { 145 sendRcTrue++; 146 } else { 147 sendRcFalse++; 148 LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); 149 } 150 } 151 152 @Override 153 public boolean stop(final long timeout, final TimeUnit timeUnit) { 154 setStopping(); 155 boolean stopped = super.stop(timeout, timeUnit, false); 156 stopped &= manager.stop(timeout, timeUnit); 157 setStopped(); 158 return stopped; 159 } 160 161 // not public, handy for testing 162 int getSendRcFalse() { 163 return sendRcFalse; 164 } 165 166 // not public, handy for testing 167 int getSendRcTrue() { 168 return sendRcTrue; 169 } 170 171 // not public, handy for testing 172 void resetSendRcs() { 173 sendRcTrue = sendRcFalse = 0; 174 } 175 176 @Override 177 public String toString() { 178 return "JeroMqAppender{" + 179 "name=" + getName() + 180 ", state=" + getState() + 181 ", manager=" + manager + 182 ", endpoints=" + endpoints + 183 '}'; 184 } 185}