001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.logging.log4j.core.Appender;
026import org.apache.logging.log4j.core.Filter;
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LogEvent;
029import org.apache.logging.log4j.core.appender.AbstractAppender;
030import org.apache.logging.log4j.core.config.Node;
031import org.apache.logging.log4j.core.config.Property;
032import org.apache.logging.log4j.core.config.plugins.Plugin;
033import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
034import org.apache.logging.log4j.core.config.plugins.PluginElement;
035import org.apache.logging.log4j.core.config.plugins.PluginFactory;
036import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
037import org.apache.logging.log4j.core.layout.PatternLayout;
038import org.apache.logging.log4j.util.Strings;
039
040/**
041 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
042 * <p>
043 * Requires the JeroMQ jar (LGPL as of 0.3.5)
044 * </p>
045 */
046// TODO
047// Some methods are synchronized because a ZMQ.Socket is not thread-safe
048// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
049// some issue on threads owning certain resources as opposed to others.
050@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
051public final class JeroMqAppender extends AbstractAppender {
052
053    private static final int DEFAULT_BACKLOG = 100;
054
055    private static final int DEFAULT_IVL = 100;
056
057    private static final int DEFAULT_RCV_HWM = 1000;
058
059    private static final int DEFAULT_SND_HWM = 1000;
060
061    private final JeroMqManager manager;
062    private final List<String> endpoints;
063    private int sendRcFalse;
064    private int sendRcTrue;
065
066    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
067            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
068            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
069            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
070            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
071            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
072            final long tcpKeepAliveInterval, final boolean xpubVerbose, final Property[] properties) {
073        super(name, filter, layout, ignoreExceptions, properties);
074        this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
075            linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
076            sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
077            tcpKeepAliveInterval, xpubVerbose, endpoints);
078        this.endpoints = endpoints;
079    }
080
081    // The ZMQ.Socket class has other set methods that we do not cover because
082    // they throw unsupported operation exceptions.
083    @PluginFactory
084    public static JeroMqAppender createAppender(
085            // @formatter:off
086            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
087            @PluginElement("Layout") Layout<?> layout,
088            @PluginElement("Filter") final Filter filter,
089            @PluginElement("Properties") final Property[] properties,
090            // Super attributes
091            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
092            // ZMQ attributes; defaults picked from zmq.Options.
093            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
094            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
095            @PluginAttribute(value = "delayAttachOnConnect") final boolean delayAttachOnConnect,
096            @PluginAttribute(value = "identity") final byte[] identity,
097            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
098            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
099            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
100            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
101            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
102            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
103            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
104            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
105            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
106            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
107            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
108            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
109            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
110            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
111            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
112            @PluginAttribute(value = "xpubVerbose") final boolean xpubVerbose
113            // @formatter:on
114    ) {
115        if (layout == null) {
116            layout = PatternLayout.createDefaultLayout();
117        }
118        List<String> endpoints;
119        if (properties == null) {
120            endpoints = new ArrayList<>(0);
121        } else {
122            endpoints = new ArrayList<>(properties.length);
123            for (final Property property : properties) {
124                if ("endpoint".equalsIgnoreCase(property.getName())) {
125                    final String value = property.getValue();
126                    if (Strings.isNotEmpty(value)) {
127                        endpoints.add(value);
128                    }
129                }
130            }
131        }
132        LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
133                name, filter, layout, ignoreExceptions, endpoints);
134        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
135                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
136                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
137                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, null);
138    }
139
140    @Override
141    public synchronized void append(final LogEvent event) {
142        final Layout<? extends Serializable> layout = getLayout();
143        final byte[] formattedMessage = layout.toByteArray(event);
144        if (manager.send(getLayout().toByteArray(event))) {
145            sendRcTrue++;
146        } else {
147            sendRcFalse++;
148            LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
149        }
150    }
151
152    @Override
153    public boolean stop(final long timeout, final TimeUnit timeUnit) {
154        setStopping();
155        boolean stopped = super.stop(timeout, timeUnit, false);
156        stopped &= manager.stop(timeout, timeUnit);
157        setStopped();
158        return stopped;
159    }
160
161    // not public, handy for testing
162    int getSendRcFalse() {
163        return sendRcFalse;
164    }
165
166    // not public, handy for testing
167    int getSendRcTrue() {
168        return sendRcTrue;
169    }
170
171    // not public, handy for testing
172    void resetSendRcs() {
173        sendRcTrue = sendRcFalse = 0;
174    }
175
176    @Override
177    public String toString() {
178        return "JeroMqAppender{" +
179            "name=" + getName() +
180            ", state=" + getState() +
181            ", manager=" + manager +
182            ", endpoints=" + endpoints +
183            '}';
184    }
185}