001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.util.Arrays;
021import java.util.List;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.logging.log4j.LogManager;
025import org.apache.logging.log4j.core.appender.AbstractManager;
026import org.apache.logging.log4j.core.appender.ManagerFactory;
027import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
028import org.apache.logging.log4j.util.PropertiesUtil;
029import org.zeromq.ZMQ;
030
031/**
032 * Manager for publishing messages via JeroMq.
033 *
034 * @since 2.6
035 */
036public class JeroMqManager extends AbstractManager {
037
038    /**
039     * System property to enable shutdown hook.
040     */
041    public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
042
043    /**
044     * System property to control JeroMQ I/O thread count.
045     */
046    public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
047
048    private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
049    private static final ZMQ.Context CONTEXT;
050
051    static {
052        LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
053
054        final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
055        LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
056        CONTEXT = ZMQ.context(ioThreads);
057
058        final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
059            SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
060        if (enableShutdownHook) {
061            ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
062                @Override
063                public void run() {
064                    CONTEXT.close();
065                }
066            });
067        }
068    }
069
070    private final ZMQ.Socket publisher;
071
072    private JeroMqManager(final String name, final JeroMqConfiguration config) {
073        super(null, name);
074        publisher = CONTEXT.socket(ZMQ.PUB);
075        publisher.setAffinity(config.affinity);
076        publisher.setBacklog(config.backlog);
077        publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
078        if (config.identity != null) {
079            publisher.setIdentity(config.identity);
080        }
081        publisher.setIPv4Only(config.ipv4Only);
082        publisher.setLinger(config.linger);
083        publisher.setMaxMsgSize(config.maxMsgSize);
084        publisher.setRcvHWM(config.rcvHwm);
085        publisher.setReceiveBufferSize(config.receiveBufferSize);
086        publisher.setReceiveTimeOut(config.receiveTimeOut);
087        publisher.setReconnectIVL(config.reconnectIVL);
088        publisher.setReconnectIVLMax(config.reconnectIVLMax);
089        publisher.setSendBufferSize(config.sendBufferSize);
090        publisher.setSendTimeOut(config.sendTimeOut);
091        publisher.setSndHWM(config.sndHwm);
092        publisher.setTCPKeepAlive(config.tcpKeepAlive);
093        publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
094        publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
095        publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
096        publisher.setXpubVerbose(config.xpubVerbose);
097        for (final String endpoint : config.endpoints) {
098            publisher.bind(endpoint);
099        }
100        LOGGER.debug("Created JeroMqManager with {}", config);
101    }
102
103    public boolean send(final byte[] data) {
104        return publisher.send(data);
105    }
106
107    @Override
108    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
109        publisher.close();
110        return true;
111    }
112
113    public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
114                                                 final boolean delayAttachOnConnect, final byte[] identity,
115                                                 final boolean ipv4Only, final long linger, final long maxMsgSize,
116                                                 final long rcvHwm, final long receiveBufferSize,
117                                                 final int receiveTimeOut, final long reconnectIVL,
118                                                 final long reconnectIVLMax, final long sendBufferSize,
119                                                 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
120                                                 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
121                                                 final long tcpKeepAliveInterval, final boolean xpubVerbose,
122                                                 final List<String> endpoints) {
123        return getManager(name, FACTORY,
124            new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
125                rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
126                sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
127                endpoints));
128    }
129
130    public static ZMQ.Context getContext() {
131        return CONTEXT;
132    }
133
134    private static class JeroMqConfiguration {
135        private final long affinity;
136        private final long backlog;
137        private final boolean delayAttachOnConnect;
138        private final byte[] identity;
139        private final boolean ipv4Only;
140        private final long linger;
141        private final long maxMsgSize;
142        private final long rcvHwm;
143        private final long receiveBufferSize;
144        private final int receiveTimeOut;
145        private final long reconnectIVL;
146        private final long reconnectIVLMax;
147        private final long sendBufferSize;
148        private final int sendTimeOut;
149        private final long sndHwm;
150        private final int tcpKeepAlive;
151        private final long tcpKeepAliveCount;
152        private final long tcpKeepAliveIdle;
153        private final long tcpKeepAliveInterval;
154        private final boolean xpubVerbose;
155        private final List<String> endpoints;
156
157        private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
158                                    final byte[] identity, final boolean ipv4Only, final long linger,
159                                    final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
160                                    final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
161                                    final long sendBufferSize, final int sendTimeOut, final long sndHwm,
162                                    final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
163                                    final long tcpKeepAliveInterval, final boolean xpubVerbose,
164                                    final List<String> endpoints) {
165            this.affinity = affinity;
166            this.backlog = backlog;
167            this.delayAttachOnConnect = delayAttachOnConnect;
168            this.identity = identity;
169            this.ipv4Only = ipv4Only;
170            this.linger = linger;
171            this.maxMsgSize = maxMsgSize;
172            this.rcvHwm = rcvHwm;
173            this.receiveBufferSize = receiveBufferSize;
174            this.receiveTimeOut = receiveTimeOut;
175            this.reconnectIVL = reconnectIVL;
176            this.reconnectIVLMax = reconnectIVLMax;
177            this.sendBufferSize = sendBufferSize;
178            this.sendTimeOut = sendTimeOut;
179            this.sndHwm = sndHwm;
180            this.tcpKeepAlive = tcpKeepAlive;
181            this.tcpKeepAliveCount = tcpKeepAliveCount;
182            this.tcpKeepAliveIdle = tcpKeepAliveIdle;
183            this.tcpKeepAliveInterval = tcpKeepAliveInterval;
184            this.xpubVerbose = xpubVerbose;
185            this.endpoints = endpoints;
186        }
187
188        @Override
189        public String toString() {
190            return "JeroMqConfiguration{" +
191                "affinity=" + affinity +
192                ", backlog=" + backlog +
193                ", delayAttachOnConnect=" + delayAttachOnConnect +
194                ", identity=" + Arrays.toString(identity) +
195                ", ipv4Only=" + ipv4Only +
196                ", linger=" + linger +
197                ", maxMsgSize=" + maxMsgSize +
198                ", rcvHwm=" + rcvHwm +
199                ", receiveBufferSize=" + receiveBufferSize +
200                ", receiveTimeOut=" + receiveTimeOut +
201                ", reconnectIVL=" + reconnectIVL +
202                ", reconnectIVLMax=" + reconnectIVLMax +
203                ", sendBufferSize=" + sendBufferSize +
204                ", sendTimeOut=" + sendTimeOut +
205                ", sndHwm=" + sndHwm +
206                ", tcpKeepAlive=" + tcpKeepAlive +
207                ", tcpKeepAliveCount=" + tcpKeepAliveCount +
208                ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
209                ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
210                ", xpubVerbose=" + xpubVerbose +
211                ", endpoints=" + endpoints +
212                '}';
213        }
214    }
215
216    private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
217        @Override
218        public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
219            return new JeroMqManager(name, data);
220        }
221    }
222}