1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.util.Arrays;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.logging.log4j.LogManager;
25 import org.apache.logging.log4j.core.appender.AbstractManager;
26 import org.apache.logging.log4j.core.appender.ManagerFactory;
27 import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
28 import org.apache.logging.log4j.util.PropertiesUtil;
29 import org.zeromq.ZMQ;
30
31
32
33
34
35
36 public class JeroMqManager extends AbstractManager {
37
38
39
40
41 public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
42
43
44
45
46 public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
47
48 private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
49 private static final ZMQ.Context CONTEXT;
50
51 static {
52 LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
53
54 final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
55 LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
56 CONTEXT = ZMQ.context(ioThreads);
57
58 final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
59 SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
60 if (enableShutdownHook) {
61 ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
62 @Override
63 public void run() {
64 CONTEXT.close();
65 }
66 });
67 }
68 }
69
70 private final ZMQ.Socket publisher;
71
72 private JeroMqManager(final String name, final JeroMqConfiguration config) {
73 super(null, name);
74 publisher = CONTEXT.socket(ZMQ.PUB);
75 publisher.setAffinity(config.affinity);
76 publisher.setBacklog(config.backlog);
77 publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
78 if (config.identity != null) {
79 publisher.setIdentity(config.identity);
80 }
81 publisher.setIPv4Only(config.ipv4Only);
82 publisher.setLinger(config.linger);
83 publisher.setMaxMsgSize(config.maxMsgSize);
84 publisher.setRcvHWM(config.rcvHwm);
85 publisher.setReceiveBufferSize(config.receiveBufferSize);
86 publisher.setReceiveTimeOut(config.receiveTimeOut);
87 publisher.setReconnectIVL(config.reconnectIVL);
88 publisher.setReconnectIVLMax(config.reconnectIVLMax);
89 publisher.setSendBufferSize(config.sendBufferSize);
90 publisher.setSendTimeOut(config.sendTimeOut);
91 publisher.setSndHWM(config.sndHwm);
92 publisher.setTCPKeepAlive(config.tcpKeepAlive);
93 publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
94 publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
95 publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
96 publisher.setXpubVerbose(config.xpubVerbose);
97 for (final String endpoint : config.endpoints) {
98 publisher.bind(endpoint);
99 }
100 LOGGER.debug("Created JeroMqManager with {}", config);
101 }
102
103 public boolean send(final byte[] data) {
104 return publisher.send(data);
105 }
106
107 @Override
108 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
109 publisher.close();
110 return true;
111 }
112
113 public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
114 final boolean delayAttachOnConnect, final byte[] identity,
115 final boolean ipv4Only, final long linger, final long maxMsgSize,
116 final long rcvHwm, final long receiveBufferSize,
117 final int receiveTimeOut, final long reconnectIVL,
118 final long reconnectIVLMax, final long sendBufferSize,
119 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
120 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
121 final long tcpKeepAliveInterval, final boolean xpubVerbose,
122 final List<String> endpoints) {
123 return getManager(name, FACTORY,
124 new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
125 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
126 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
127 endpoints));
128 }
129
130 public static ZMQ.Context getContext() {
131 return CONTEXT;
132 }
133
134 private static class JeroMqConfiguration {
135 private final long affinity;
136 private final long backlog;
137 private final boolean delayAttachOnConnect;
138 private final byte[] identity;
139 private final boolean ipv4Only;
140 private final long linger;
141 private final long maxMsgSize;
142 private final long rcvHwm;
143 private final long receiveBufferSize;
144 private final int receiveTimeOut;
145 private final long reconnectIVL;
146 private final long reconnectIVLMax;
147 private final long sendBufferSize;
148 private final int sendTimeOut;
149 private final long sndHwm;
150 private final int tcpKeepAlive;
151 private final long tcpKeepAliveCount;
152 private final long tcpKeepAliveIdle;
153 private final long tcpKeepAliveInterval;
154 private final boolean xpubVerbose;
155 private final List<String> endpoints;
156
157 private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
158 final byte[] identity, final boolean ipv4Only, final long linger,
159 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
160 final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
161 final long sendBufferSize, final int sendTimeOut, final long sndHwm,
162 final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
163 final long tcpKeepAliveInterval, final boolean xpubVerbose,
164 final List<String> endpoints) {
165 this.affinity = affinity;
166 this.backlog = backlog;
167 this.delayAttachOnConnect = delayAttachOnConnect;
168 this.identity = identity;
169 this.ipv4Only = ipv4Only;
170 this.linger = linger;
171 this.maxMsgSize = maxMsgSize;
172 this.rcvHwm = rcvHwm;
173 this.receiveBufferSize = receiveBufferSize;
174 this.receiveTimeOut = receiveTimeOut;
175 this.reconnectIVL = reconnectIVL;
176 this.reconnectIVLMax = reconnectIVLMax;
177 this.sendBufferSize = sendBufferSize;
178 this.sendTimeOut = sendTimeOut;
179 this.sndHwm = sndHwm;
180 this.tcpKeepAlive = tcpKeepAlive;
181 this.tcpKeepAliveCount = tcpKeepAliveCount;
182 this.tcpKeepAliveIdle = tcpKeepAliveIdle;
183 this.tcpKeepAliveInterval = tcpKeepAliveInterval;
184 this.xpubVerbose = xpubVerbose;
185 this.endpoints = endpoints;
186 }
187
188 @Override
189 public String toString() {
190 return "JeroMqConfiguration{" +
191 "affinity=" + affinity +
192 ", backlog=" + backlog +
193 ", delayAttachOnConnect=" + delayAttachOnConnect +
194 ", identity=" + Arrays.toString(identity) +
195 ", ipv4Only=" + ipv4Only +
196 ", linger=" + linger +
197 ", maxMsgSize=" + maxMsgSize +
198 ", rcvHwm=" + rcvHwm +
199 ", receiveBufferSize=" + receiveBufferSize +
200 ", receiveTimeOut=" + receiveTimeOut +
201 ", reconnectIVL=" + reconnectIVL +
202 ", reconnectIVLMax=" + reconnectIVLMax +
203 ", sendBufferSize=" + sendBufferSize +
204 ", sendTimeOut=" + sendTimeOut +
205 ", sndHwm=" + sndHwm +
206 ", tcpKeepAlive=" + tcpKeepAlive +
207 ", tcpKeepAliveCount=" + tcpKeepAliveCount +
208 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
209 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
210 ", xpubVerbose=" + xpubVerbose +
211 ", endpoints=" + endpoints +
212 '}';
213 }
214 }
215
216 private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
217 @Override
218 public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
219 return new JeroMqManager(name, data);
220 }
221 }
222 }