View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.util.Arrays;
21  import java.util.List;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.logging.log4j.LogManager;
25  import org.apache.logging.log4j.core.appender.AbstractManager;
26  import org.apache.logging.log4j.core.appender.ManagerFactory;
27  import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
28  import org.apache.logging.log4j.util.PropertiesUtil;
29  import org.zeromq.ZMQ;
30  
31  /**
32   * Manager for publishing messages via JeroMq.
33   *
34   * @since 2.6
35   */
36  public class JeroMqManager extends AbstractManager {
37  
38      /**
39       * System property to enable shutdown hook.
40       */
41      public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
42  
43      /**
44       * System property to control JeroMQ I/O thread count.
45       */
46      public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
47  
48      private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
49      private static final ZMQ.Context CONTEXT;
50  
51      static {
52          LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
53  
54          final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
55          LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
56          CONTEXT = ZMQ.context(ioThreads);
57  
58          final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
59              SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
60          if (enableShutdownHook) {
61              ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
62                  @Override
63                  public void run() {
64                      CONTEXT.close();
65                  }
66              });
67          }
68      }
69  
70      private final ZMQ.Socket publisher;
71  
72      private JeroMqManager(final String name, final JeroMqConfiguration config) {
73          super(null, name);
74          publisher = CONTEXT.socket(ZMQ.PUB);
75          publisher.setAffinity(config.affinity);
76          publisher.setBacklog(config.backlog);
77          publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
78          if (config.identity != null) {
79              publisher.setIdentity(config.identity);
80          }
81          publisher.setIPv4Only(config.ipv4Only);
82          publisher.setLinger(config.linger);
83          publisher.setMaxMsgSize(config.maxMsgSize);
84          publisher.setRcvHWM(config.rcvHwm);
85          publisher.setReceiveBufferSize(config.receiveBufferSize);
86          publisher.setReceiveTimeOut(config.receiveTimeOut);
87          publisher.setReconnectIVL(config.reconnectIVL);
88          publisher.setReconnectIVLMax(config.reconnectIVLMax);
89          publisher.setSendBufferSize(config.sendBufferSize);
90          publisher.setSendTimeOut(config.sendTimeOut);
91          publisher.setSndHWM(config.sndHwm);
92          publisher.setTCPKeepAlive(config.tcpKeepAlive);
93          publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
94          publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
95          publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
96          publisher.setXpubVerbose(config.xpubVerbose);
97          for (final String endpoint : config.endpoints) {
98              publisher.bind(endpoint);
99          }
100         LOGGER.debug("Created JeroMqManager with {}", config);
101     }
102 
103     public boolean send(final byte[] data) {
104         return publisher.send(data);
105     }
106 
107     @Override
108     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
109         publisher.close();
110         return true;
111     }
112 
113     public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
114                                                  final boolean delayAttachOnConnect, final byte[] identity,
115                                                  final boolean ipv4Only, final long linger, final long maxMsgSize,
116                                                  final long rcvHwm, final long receiveBufferSize,
117                                                  final int receiveTimeOut, final long reconnectIVL,
118                                                  final long reconnectIVLMax, final long sendBufferSize,
119                                                  final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
120                                                  final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
121                                                  final long tcpKeepAliveInterval, final boolean xpubVerbose,
122                                                  final List<String> endpoints) {
123         return getManager(name, FACTORY,
124             new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
125                 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
126                 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
127                 endpoints));
128     }
129 
130     public static ZMQ.Context getContext() {
131         return CONTEXT;
132     }
133 
134     private static class JeroMqConfiguration {
135         private final long affinity;
136         private final long backlog;
137         private final boolean delayAttachOnConnect;
138         private final byte[] identity;
139         private final boolean ipv4Only;
140         private final long linger;
141         private final long maxMsgSize;
142         private final long rcvHwm;
143         private final long receiveBufferSize;
144         private final int receiveTimeOut;
145         private final long reconnectIVL;
146         private final long reconnectIVLMax;
147         private final long sendBufferSize;
148         private final int sendTimeOut;
149         private final long sndHwm;
150         private final int tcpKeepAlive;
151         private final long tcpKeepAliveCount;
152         private final long tcpKeepAliveIdle;
153         private final long tcpKeepAliveInterval;
154         private final boolean xpubVerbose;
155         private final List<String> endpoints;
156 
157         private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
158                                     final byte[] identity, final boolean ipv4Only, final long linger,
159                                     final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
160                                     final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
161                                     final long sendBufferSize, final int sendTimeOut, final long sndHwm,
162                                     final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
163                                     final long tcpKeepAliveInterval, final boolean xpubVerbose,
164                                     final List<String> endpoints) {
165             this.affinity = affinity;
166             this.backlog = backlog;
167             this.delayAttachOnConnect = delayAttachOnConnect;
168             this.identity = identity;
169             this.ipv4Only = ipv4Only;
170             this.linger = linger;
171             this.maxMsgSize = maxMsgSize;
172             this.rcvHwm = rcvHwm;
173             this.receiveBufferSize = receiveBufferSize;
174             this.receiveTimeOut = receiveTimeOut;
175             this.reconnectIVL = reconnectIVL;
176             this.reconnectIVLMax = reconnectIVLMax;
177             this.sendBufferSize = sendBufferSize;
178             this.sendTimeOut = sendTimeOut;
179             this.sndHwm = sndHwm;
180             this.tcpKeepAlive = tcpKeepAlive;
181             this.tcpKeepAliveCount = tcpKeepAliveCount;
182             this.tcpKeepAliveIdle = tcpKeepAliveIdle;
183             this.tcpKeepAliveInterval = tcpKeepAliveInterval;
184             this.xpubVerbose = xpubVerbose;
185             this.endpoints = endpoints;
186         }
187 
188         @Override
189         public String toString() {
190             return "JeroMqConfiguration{" +
191                 "affinity=" + affinity +
192                 ", backlog=" + backlog +
193                 ", delayAttachOnConnect=" + delayAttachOnConnect +
194                 ", identity=" + Arrays.toString(identity) +
195                 ", ipv4Only=" + ipv4Only +
196                 ", linger=" + linger +
197                 ", maxMsgSize=" + maxMsgSize +
198                 ", rcvHwm=" + rcvHwm +
199                 ", receiveBufferSize=" + receiveBufferSize +
200                 ", receiveTimeOut=" + receiveTimeOut +
201                 ", reconnectIVL=" + reconnectIVL +
202                 ", reconnectIVLMax=" + reconnectIVLMax +
203                 ", sendBufferSize=" + sendBufferSize +
204                 ", sendTimeOut=" + sendTimeOut +
205                 ", sndHwm=" + sndHwm +
206                 ", tcpKeepAlive=" + tcpKeepAlive +
207                 ", tcpKeepAliveCount=" + tcpKeepAliveCount +
208                 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
209                 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
210                 ", xpubVerbose=" + xpubVerbose +
211                 ", endpoints=" + endpoints +
212                 '}';
213         }
214     }
215 
216     private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
217         @Override
218         public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
219             return new JeroMqManager(name, data);
220         }
221     }
222 }