001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.util.Arrays; 021import java.util.List; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.logging.log4j.LogManager; 025import org.apache.logging.log4j.core.appender.AbstractManager; 026import org.apache.logging.log4j.core.appender.ManagerFactory; 027import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; 028import org.apache.logging.log4j.util.PropertiesUtil; 029import org.zeromq.ZMQ; 030 031/** 032 * Manager for publishing messages via JeroMq. 033 * 034 * @since 2.6 035 */ 036public class JeroMqManager extends AbstractManager { 037 038 /** 039 * System property to enable shutdown hook. 040 */ 041 public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; 042 043 /** 044 * System property to control JeroMQ I/O thread count. 045 */ 046 public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; 047 048 private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory(); 049 private static final ZMQ.Context CONTEXT; 050 051 static { 052 LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString()); 053 054 final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); 055 LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads); 056 CONTEXT = ZMQ.context(ioThreads); 057 058 final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty( 059 SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); 060 if (enableShutdownHook) { 061 ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { 062 @Override 063 public void run() { 064 CONTEXT.close(); 065 } 066 }); 067 } 068 } 069 070 private final ZMQ.Socket publisher; 071 072 private JeroMqManager(final String name, final JeroMqConfiguration config) { 073 super(null, name); 074 publisher = CONTEXT.socket(ZMQ.PUB); 075 publisher.setAffinity(config.affinity); 076 publisher.setBacklog(config.backlog); 077 publisher.setDelayAttachOnConnect(config.delayAttachOnConnect); 078 if (config.identity != null) { 079 publisher.setIdentity(config.identity); 080 } 081 publisher.setIPv4Only(config.ipv4Only); 082 publisher.setLinger(config.linger); 083 publisher.setMaxMsgSize(config.maxMsgSize); 084 publisher.setRcvHWM(config.rcvHwm); 085 publisher.setReceiveBufferSize(config.receiveBufferSize); 086 publisher.setReceiveTimeOut(config.receiveTimeOut); 087 publisher.setReconnectIVL(config.reconnectIVL); 088 publisher.setReconnectIVLMax(config.reconnectIVLMax); 089 publisher.setSendBufferSize(config.sendBufferSize); 090 publisher.setSendTimeOut(config.sendTimeOut); 091 publisher.setSndHWM(config.sndHwm); 092 publisher.setTCPKeepAlive(config.tcpKeepAlive); 093 publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount); 094 publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle); 095 publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval); 096 publisher.setXpubVerbose(config.xpubVerbose); 097 for (final String endpoint : config.endpoints) { 098 publisher.bind(endpoint); 099 } 100 LOGGER.debug("Created JeroMqManager with {}", config); 101 } 102 103 public boolean send(final byte[] data) { 104 return publisher.send(data); 105 } 106 107 @Override 108 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 109 publisher.close(); 110 return true; 111 } 112 113 public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog, 114 final boolean delayAttachOnConnect, final byte[] identity, 115 final boolean ipv4Only, final long linger, final long maxMsgSize, 116 final long rcvHwm, final long receiveBufferSize, 117 final int receiveTimeOut, final long reconnectIVL, 118 final long reconnectIVLMax, final long sendBufferSize, 119 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive, 120 final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 121 final long tcpKeepAliveInterval, final boolean xpubVerbose, 122 final List<String> endpoints) { 123 return getManager(name, FACTORY, 124 new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, 125 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, 126 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, 127 endpoints)); 128 } 129 130 public static ZMQ.Context getContext() { 131 return CONTEXT; 132 } 133 134 private static class JeroMqConfiguration { 135 private final long affinity; 136 private final long backlog; 137 private final boolean delayAttachOnConnect; 138 private final byte[] identity; 139 private final boolean ipv4Only; 140 private final long linger; 141 private final long maxMsgSize; 142 private final long rcvHwm; 143 private final long receiveBufferSize; 144 private final int receiveTimeOut; 145 private final long reconnectIVL; 146 private final long reconnectIVLMax; 147 private final long sendBufferSize; 148 private final int sendTimeOut; 149 private final long sndHwm; 150 private final int tcpKeepAlive; 151 private final long tcpKeepAliveCount; 152 private final long tcpKeepAliveIdle; 153 private final long tcpKeepAliveInterval; 154 private final boolean xpubVerbose; 155 private final List<String> endpoints; 156 157 private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect, 158 final byte[] identity, final boolean ipv4Only, final long linger, 159 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, 160 final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax, 161 final long sendBufferSize, final int sendTimeOut, final long sndHwm, 162 final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 163 final long tcpKeepAliveInterval, final boolean xpubVerbose, 164 final List<String> endpoints) { 165 this.affinity = affinity; 166 this.backlog = backlog; 167 this.delayAttachOnConnect = delayAttachOnConnect; 168 this.identity = identity; 169 this.ipv4Only = ipv4Only; 170 this.linger = linger; 171 this.maxMsgSize = maxMsgSize; 172 this.rcvHwm = rcvHwm; 173 this.receiveBufferSize = receiveBufferSize; 174 this.receiveTimeOut = receiveTimeOut; 175 this.reconnectIVL = reconnectIVL; 176 this.reconnectIVLMax = reconnectIVLMax; 177 this.sendBufferSize = sendBufferSize; 178 this.sendTimeOut = sendTimeOut; 179 this.sndHwm = sndHwm; 180 this.tcpKeepAlive = tcpKeepAlive; 181 this.tcpKeepAliveCount = tcpKeepAliveCount; 182 this.tcpKeepAliveIdle = tcpKeepAliveIdle; 183 this.tcpKeepAliveInterval = tcpKeepAliveInterval; 184 this.xpubVerbose = xpubVerbose; 185 this.endpoints = endpoints; 186 } 187 188 @Override 189 public String toString() { 190 return "JeroMqConfiguration{" + 191 "affinity=" + affinity + 192 ", backlog=" + backlog + 193 ", delayAttachOnConnect=" + delayAttachOnConnect + 194 ", identity=" + Arrays.toString(identity) + 195 ", ipv4Only=" + ipv4Only + 196 ", linger=" + linger + 197 ", maxMsgSize=" + maxMsgSize + 198 ", rcvHwm=" + rcvHwm + 199 ", receiveBufferSize=" + receiveBufferSize + 200 ", receiveTimeOut=" + receiveTimeOut + 201 ", reconnectIVL=" + reconnectIVL + 202 ", reconnectIVLMax=" + reconnectIVLMax + 203 ", sendBufferSize=" + sendBufferSize + 204 ", sendTimeOut=" + sendTimeOut + 205 ", sndHwm=" + sndHwm + 206 ", tcpKeepAlive=" + tcpKeepAlive + 207 ", tcpKeepAliveCount=" + tcpKeepAliveCount + 208 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle + 209 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval + 210 ", xpubVerbose=" + xpubVerbose + 211 ", endpoints=" + endpoints + 212 '}'; 213 } 214 } 215 216 private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> { 217 @Override 218 public JeroMqManager createManager(final String name, final JeroMqConfiguration data) { 219 return new JeroMqManager(name, data); 220 } 221 } 222}