001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import java.io.ByteArrayOutputStream; 020 import java.io.IOException; 021 import java.io.OutputStream; 022 import java.io.Serializable; 023 import java.net.ConnectException; 024 import java.net.InetAddress; 025 import java.net.Socket; 026 import java.net.UnknownHostException; 027 import java.util.HashMap; 028 import java.util.Map; 029 import java.util.concurrent.CountDownLatch; 030 031 import org.apache.logging.log4j.core.Layout; 032 import org.apache.logging.log4j.core.appender.AppenderLoggingException; 033 import org.apache.logging.log4j.core.appender.ManagerFactory; 034 import org.apache.logging.log4j.core.appender.OutputStreamManager; 035 import org.apache.logging.log4j.util.Strings; 036 037 /** 038 * Manager of TCP Socket connections. 039 */ 040 public class TcpSocketManager extends AbstractSocketManager { 041 /** 042 The default reconnection delay (30000 milliseconds or 30 seconds). 043 */ 044 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 045 /** 046 The default port number of remote logging server (4560). 047 */ 048 private static final int DEFAULT_PORT = 4560; 049 050 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory(); 051 052 private final int reconnectionDelay; 053 054 private Reconnector connector = null; 055 056 private Socket socket; 057 058 private final boolean retry; 059 060 private final boolean immediateFail; 061 062 /** 063 * The Constructor. 064 * @param name The unique name of this connection. 065 * @param os The OutputStream. 066 * @param sock The Socket. 067 * @param inetAddress The internet address of the host. 068 * @param host The name of the host. 069 * @param port The port number on the host. 070 * @param delay Reconnection interval. 071 * @param immediateFail 072 * @param layout The Layout. 073 */ 074 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress, 075 final String host, final int port, final int delay, final boolean immediateFail, 076 final Layout<? extends Serializable> layout) { 077 super(name, os, inetAddress, host, port, layout); 078 this.reconnectionDelay = delay; 079 this.socket = sock; 080 this.immediateFail = immediateFail; 081 retry = delay > 0; 082 if (sock == null) { 083 connector = new Reconnector(this); 084 connector.setDaemon(true); 085 connector.setPriority(Thread.MIN_PRIORITY); 086 connector.start(); 087 } 088 } 089 090 /** 091 * Obtain a TcpSocketManager. 092 * @param host The host to connect to. 093 * @param port The port on the host. 094 * @param delayMillis The interval to pause between retries. 095 * @return A TcpSocketManager. 096 */ 097 public static TcpSocketManager getSocketManager(final String host, int port, int delayMillis, 098 final boolean immediateFail, final Layout<? extends Serializable> layout ) { 099 if (Strings.isEmpty(host)) { 100 throw new IllegalArgumentException("A host name is required"); 101 } 102 if (port <= 0) { 103 port = DEFAULT_PORT; 104 } 105 if (delayMillis == 0) { 106 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 107 } 108 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, 109 new FactoryData(host, port, delayMillis, immediateFail, layout), FACTORY); 110 } 111 112 @Override 113 protected void write(final byte[] bytes, final int offset, final int length) { 114 if (socket == null) { 115 if (connector != null && !immediateFail) { 116 connector.latch(); 117 } 118 if (socket == null) { 119 final String msg = "Error writing to " + getName() + " socket not available"; 120 throw new AppenderLoggingException(msg); 121 } 122 } 123 synchronized (this) { 124 try { 125 getOutputStream().write(bytes, offset, length); 126 } catch (final IOException ex) { 127 if (retry && connector == null) { 128 connector = new Reconnector(this); 129 connector.setDaemon(true); 130 connector.setPriority(Thread.MIN_PRIORITY); 131 connector.start(); 132 } 133 final String msg = "Error writing to " + getName(); 134 throw new AppenderLoggingException(msg, ex); 135 } 136 } 137 } 138 139 @Override 140 protected synchronized void close() { 141 super.close(); 142 if (connector != null) { 143 connector.shutdown(); 144 connector.interrupt(); 145 connector = null; 146 } 147 } 148 149 /** 150 * Gets this TcpSocketManager's content format. Specified by: 151 * <ul> 152 * <li>Key: "protocol" Value: "tcp"</li> 153 * <li>Key: "direction" Value: "out"</li> 154 * </ul> 155 * 156 * @return Map of content format keys supporting TcpSocketManager 157 */ 158 @Override 159 public Map<String, String> getContentFormat() { 160 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat()); 161 result.put("protocol", "tcp"); 162 result.put("direction", "out"); 163 return result; 164 } 165 166 /** 167 * Handles reconnecting to a Thread. 168 */ 169 private class Reconnector extends Thread { 170 171 private final CountDownLatch latch = new CountDownLatch(1); 172 173 private boolean shutdown = false; 174 175 private final Object owner; 176 177 public Reconnector(final OutputStreamManager owner) { 178 this.owner = owner; 179 } 180 181 public void latch() { 182 try { 183 latch.await(); 184 } catch (final InterruptedException ex) { 185 // Ignore the exception. 186 } 187 } 188 189 public void shutdown() { 190 shutdown = true; 191 } 192 193 @Override 194 public void run() { 195 while (!shutdown) { 196 try { 197 sleep(reconnectionDelay); 198 final Socket sock = createSocket(inetAddress, port); 199 final OutputStream newOS = sock.getOutputStream(); 200 synchronized (owner) { 201 try { 202 getOutputStream().close(); 203 } catch (final IOException ioe) { 204 // Ignore this. 205 } 206 207 setOutputStream(newOS); 208 socket = sock; 209 connector = null; 210 shutdown = true; 211 } 212 LOGGER.debug("Connection to " + host + ':' + port + " reestablished."); 213 } catch (final InterruptedException ie) { 214 LOGGER.debug("Reconnection interrupted."); 215 } catch (final ConnectException ex) { 216 LOGGER.debug(host + ':' + port + " refused connection"); 217 } catch (final IOException ioe) { 218 LOGGER.debug("Unable to reconnect to " + host + ':' + port); 219 } finally { 220 latch.countDown(); 221 } 222 } 223 } 224 } 225 226 protected Socket createSocket(final InetAddress host, final int port) throws IOException { 227 return createSocket(host.getHostName(), port); 228 } 229 230 protected Socket createSocket(final String host, final int port) throws IOException { 231 return new Socket(host, port); 232 } 233 234 /** 235 * Data for the factory. 236 */ 237 private static class FactoryData { 238 private final String host; 239 private final int port; 240 private final int delayMillis; 241 private final boolean immediateFail; 242 private final Layout<? extends Serializable> layout; 243 244 public FactoryData(final String host, final int port, final int delayMillis, final boolean immediateFail, 245 final Layout<? extends Serializable> layout) { 246 this.host = host; 247 this.port = port; 248 this.delayMillis = delayMillis; 249 this.immediateFail = immediateFail; 250 this.layout = layout; 251 } 252 } 253 254 /** 255 * Factory to create a TcpSocketManager. 256 */ 257 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> { 258 @Override 259 public TcpSocketManager createManager(final String name, final FactoryData data) { 260 261 InetAddress inetAddress; 262 OutputStream os; 263 try { 264 inetAddress = InetAddress.getByName(data.host); 265 } catch (final UnknownHostException ex) { 266 LOGGER.error("Could not find address of " + data.host, ex); 267 return null; 268 } 269 try { 270 final Socket socket = new Socket(data.host, data.port); 271 os = socket.getOutputStream(); 272 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, data.delayMillis, 273 data.immediateFail, data.layout); 274 } catch (final IOException ex) { 275 LOGGER.error("TcpSocketManager (" + name + ") " + ex); 276 os = new ByteArrayOutputStream(); 277 } 278 if (data.delayMillis == 0) { 279 return null; 280 } 281 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.delayMillis, data.immediateFail, 282 data.layout); 283 } 284 } 285 }