001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.io.OutputStream; 022import java.io.Serializable; 023import java.net.ConnectException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.Socket; 027import java.net.UnknownHostException; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.CountDownLatch; 031 032import org.apache.logging.log4j.core.Layout; 033import org.apache.logging.log4j.core.appender.AppenderLoggingException; 034import org.apache.logging.log4j.core.appender.ManagerFactory; 035import org.apache.logging.log4j.core.appender.OutputStreamManager; 036import org.apache.logging.log4j.util.Strings; 037 038/** 039 * Manager of TCP Socket connections. 040 */ 041public class TcpSocketManager extends AbstractSocketManager { 042 /** 043 The default reconnection delay (30000 milliseconds or 30 seconds). 044 */ 045 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 046 /** 047 The default port number of remote logging server (4560). 048 */ 049 private static final int DEFAULT_PORT = 4560; 050 051 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory(); 052 053 private final int reconnectionDelay; 054 055 private Reconnector connector; 056 057 private Socket socket; 058 059 private final boolean retry; 060 061 private final boolean immediateFail; 062 063 private final int connectTimeoutMillis; 064 065 /** 066 * The Constructor. 067 * @param name The unique name of this connection. 068 * @param os The OutputStream. 069 * @param sock The Socket. 070 * @param inetAddress The Internet address of the host. 071 * @param host The name of the host. 072 * @param port The port number on the host. 073 * @param connectTimeoutMillis the connect timeout in milliseconds. 074 * @param delay Reconnection interval. 075 * @param immediateFail 076 * @param layout The Layout. 077 */ 078 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress, 079 final String host, final int port, int connectTimeoutMillis, final int delay, 080 final boolean immediateFail, final Layout<? extends Serializable> layout) { 081 super(name, os, inetAddress, host, port, layout); 082 this.connectTimeoutMillis = connectTimeoutMillis; 083 this.reconnectionDelay = delay; 084 this.socket = sock; 085 this.immediateFail = immediateFail; 086 retry = delay > 0; 087 if (sock == null) { 088 connector = new Reconnector(this); 089 connector.setDaemon(true); 090 connector.setPriority(Thread.MIN_PRIORITY); 091 connector.start(); 092 } 093 } 094 095 /** 096 * Obtain a TcpSocketManager. 097 * @param host The host to connect to. 098 * @param port The port on the host. 099 * @param connectTimeoutMillis the connect timeout in milliseconds 100 * @param delayMillis The interval to pause between retries. 101 * @return A TcpSocketManager. 102 */ 103 public static TcpSocketManager getSocketManager(final String host, int port, int connectTimeoutMillis, 104 int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) { 105 if (Strings.isEmpty(host)) { 106 throw new IllegalArgumentException("A host name is required"); 107 } 108 if (port <= 0) { 109 port = DEFAULT_PORT; 110 } 111 if (delayMillis == 0) { 112 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 113 } 114 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port, 115 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY); 116 } 117 118 @Override 119 protected void write(final byte[] bytes, final int offset, final int length) { 120 if (socket == null) { 121 if (connector != null && !immediateFail) { 122 connector.latch(); 123 } 124 if (socket == null) { 125 final String msg = "Error writing to " + getName() + " socket not available"; 126 throw new AppenderLoggingException(msg); 127 } 128 } 129 synchronized (this) { 130 try { 131 getOutputStream().write(bytes, offset, length); 132 } catch (final IOException ex) { 133 if (retry && connector == null) { 134 connector = new Reconnector(this); 135 connector.setDaemon(true); 136 connector.setPriority(Thread.MIN_PRIORITY); 137 connector.start(); 138 } 139 final String msg = "Error writing to " + getName(); 140 throw new AppenderLoggingException(msg, ex); 141 } 142 } 143 } 144 145 @Override 146 protected synchronized void close() { 147 super.close(); 148 if (connector != null) { 149 connector.shutdown(); 150 connector.interrupt(); 151 connector = null; 152 } 153 } 154 155 public int getConnectTimeoutMillis() { 156 return connectTimeoutMillis; 157 } 158 159 /** 160 * Gets this TcpSocketManager's content format. Specified by: 161 * <ul> 162 * <li>Key: "protocol" Value: "tcp"</li> 163 * <li>Key: "direction" Value: "out"</li> 164 * </ul> 165 * 166 * @return Map of content format keys supporting TcpSocketManager 167 */ 168 @Override 169 public Map<String, String> getContentFormat() { 170 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat()); 171 result.put("protocol", "tcp"); 172 result.put("direction", "out"); 173 return result; 174 } 175 176 /** 177 * Handles reconnecting to a Thread. 178 */ 179 private class Reconnector extends Thread { 180 181 private final CountDownLatch latch = new CountDownLatch(1); 182 183 private boolean shutdown = false; 184 185 private final Object owner; 186 187 public Reconnector(final OutputStreamManager owner) { 188 this.owner = owner; 189 } 190 191 public void latch() { 192 try { 193 latch.await(); 194 } catch (final InterruptedException ex) { 195 // Ignore the exception. 196 } 197 } 198 199 public void shutdown() { 200 shutdown = true; 201 } 202 203 @Override 204 public void run() { 205 while (!shutdown) { 206 try { 207 sleep(reconnectionDelay); 208 final Socket sock = createSocket(inetAddress, port); 209 final OutputStream newOS = sock.getOutputStream(); 210 synchronized (owner) { 211 try { 212 getOutputStream().close(); 213 } catch (final IOException ioe) { 214 // Ignore this. 215 } 216 217 setOutputStream(newOS); 218 socket = sock; 219 connector = null; 220 shutdown = true; 221 } 222 LOGGER.debug("Connection to " + host + ':' + port + " reestablished."); 223 } catch (final InterruptedException ie) { 224 LOGGER.debug("Reconnection interrupted."); 225 } catch (final ConnectException ex) { 226 LOGGER.debug(host + ':' + port + " refused connection"); 227 } catch (final IOException ioe) { 228 LOGGER.debug("Unable to reconnect to " + host + ':' + port); 229 } finally { 230 latch.countDown(); 231 } 232 } 233 } 234 } 235 236 protected Socket createSocket(final InetAddress host, final int port) throws IOException { 237 return createSocket(host.getHostName(), port); 238 } 239 240 protected Socket createSocket(final String host, final int port) throws IOException { 241 final InetSocketAddress address = new InetSocketAddress(host, port); 242 final Socket newSocket = new Socket(); 243 newSocket.connect(address, connectTimeoutMillis); 244 return newSocket; 245 } 246 247 /** 248 * Data for the factory. 249 */ 250 private static class FactoryData { 251 private final String host; 252 private final int port; 253 private final int connectTimeoutMillis; 254 private final int delayMillis; 255 private final boolean immediateFail; 256 private final Layout<? extends Serializable> layout; 257 258 public FactoryData(final String host, final int port, int connectTimeoutMillis, final int delayMillis, 259 final boolean immediateFail, final Layout<? extends Serializable> layout) { 260 this.host = host; 261 this.port = port; 262 this.connectTimeoutMillis = connectTimeoutMillis; 263 this.delayMillis = delayMillis; 264 this.immediateFail = immediateFail; 265 this.layout = layout; 266 } 267 } 268 269 /** 270 * Factory to create a TcpSocketManager. 271 */ 272 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> { 273 @Override 274 public TcpSocketManager createManager(final String name, final FactoryData data) { 275 276 InetAddress inetAddress; 277 OutputStream os; 278 try { 279 inetAddress = InetAddress.getByName(data.host); 280 } catch (final UnknownHostException ex) { 281 LOGGER.error("Could not find address of " + data.host, ex); 282 return null; 283 } 284 try { 285 final Socket socket = new Socket(data.host, data.port); 286 os = socket.getOutputStream(); 287 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, 288 data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout); 289 } catch (final IOException ex) { 290 LOGGER.error("TcpSocketManager (" + name + ") " + ex); 291 os = new ByteArrayOutputStream(); 292 } 293 if (data.delayMillis == 0) { 294 return null; 295 } 296 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis, 297 data.delayMillis, data.immediateFail, data.layout); 298 } 299 } 300 301}