001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.io.Serializable; 022import java.net.ConnectException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.Socket; 026import java.net.UnknownHostException; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.concurrent.CountDownLatch; 030 031import org.apache.logging.log4j.core.Layout; 032import org.apache.logging.log4j.core.appender.AppenderLoggingException; 033import org.apache.logging.log4j.core.appender.ManagerFactory; 034import org.apache.logging.log4j.core.appender.OutputStreamManager; 035import org.apache.logging.log4j.core.util.Closer; 036import org.apache.logging.log4j.core.util.Log4jThread; 037import org.apache.logging.log4j.core.util.NullOutputStream; 038import org.apache.logging.log4j.util.Strings; 039 040/** 041 * Manager of TCP Socket connections. 042 */ 043public class TcpSocketManager extends AbstractSocketManager { 044 /** 045 * The default reconnection delay (30000 milliseconds or 30 seconds). 046 */ 047 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 048 /** 049 * The default port number of remote logging server (4560). 050 */ 051 private static final int DEFAULT_PORT = 4560; 052 053 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>(); 054 055 private final int reconnectionDelayMillis; 056 057 private Reconnector reconnector; 058 059 private Socket socket; 060 061 private final SocketOptions socketOptions; 062 063 private final boolean retry; 064 065 private final boolean immediateFail; 066 067 private final int connectTimeoutMillis; 068 069 /** 070 * Constructs. 071 * 072 * @param name 073 * The unique name of this connection. 074 * @param os 075 * The OutputStream. 076 * @param socket 077 * The Socket. 078 * @param inetAddress 079 * The Internet address of the host. 080 * @param host 081 * The name of the host. 082 * @param port 083 * The port number on the host. 084 * @param connectTimeoutMillis 085 * the connect timeout in milliseconds. 086 * @param reconnectionDelayMillis 087 * Reconnection interval. 088 * @param immediateFail 089 * True if the write should fail if no socket is immediately available. 090 * @param layout 091 * The Layout. 092 * @param bufferSize 093 * The buffer size. 094 * @deprecated Use 095 * {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}. 096 */ 097 @Deprecated 098 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 099 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 100 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 101 final int bufferSize) { 102 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail, 103 layout, bufferSize, null); 104 } 105 106 /** 107 * Constructs. 108 * 109 * @param name 110 * The unique name of this connection. 111 * @param os 112 * The OutputStream. 113 * @param socket 114 * The Socket. 115 * @param inetAddress 116 * The Internet address of the host. 117 * @param host 118 * The name of the host. 119 * @param port 120 * The port number on the host. 121 * @param connectTimeoutMillis 122 * the connect timeout in milliseconds. 123 * @param reconnectionDelayMillis 124 * Reconnection interval. 125 * @param immediateFail 126 * True if the write should fail if no socket is immediately available. 127 * @param layout 128 * The Layout. 129 * @param bufferSize 130 * The buffer size. 131 */ 132 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 133 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 134 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 135 final int bufferSize, final SocketOptions socketOptions) { 136 super(name, os, inetAddress, host, port, layout, true, bufferSize); 137 this.connectTimeoutMillis = connectTimeoutMillis; 138 this.reconnectionDelayMillis = reconnectionDelayMillis; 139 this.socket = socket; 140 this.immediateFail = immediateFail; 141 this.retry = reconnectionDelayMillis > 0; 142 if (socket == null) { 143 this.reconnector = createReconnector(); 144 this.reconnector.start(); 145 } 146 this.socketOptions = socketOptions; 147 } 148 149 /** 150 * Obtains a TcpSocketManager. 151 * 152 * @param host 153 * The host to connect to. 154 * @param port 155 * The port on the host. 156 * @param connectTimeoutMillis 157 * the connect timeout in milliseconds 158 * @param reconnectDelayMillis 159 * The interval to pause between retries. 160 * @param bufferSize 161 * The buffer size. 162 * @return A TcpSocketManager. 163 * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}. 164 */ 165 @Deprecated 166 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis, 167 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 168 final int bufferSize) { 169 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, 170 bufferSize, null); 171 } 172 173 /** 174 * Obtains a TcpSocketManager. 175 * 176 * @param host 177 * The host to connect to. 178 * @param port 179 * The port on the host. 180 * @param connectTimeoutMillis 181 * the connect timeout in milliseconds 182 * @param reconnectDelayMillis 183 * The interval to pause between retries. 184 * @param bufferSize 185 * The buffer size. 186 * @return A TcpSocketManager. 187 */ 188 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis, 189 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 190 final int bufferSize, final SocketOptions socketOptions) { 191 if (Strings.isEmpty(host)) { 192 throw new IllegalArgumentException("A host name is required"); 193 } 194 if (port <= 0) { 195 port = DEFAULT_PORT; 196 } 197 if (reconnectDelayMillis == 0) { 198 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 199 } 200 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port, 201 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY); 202 } 203 204 @SuppressWarnings("sync-override") // synchronization on "this" is done within the method 205 @Override 206 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { 207 if (socket == null) { 208 if (reconnector != null && !immediateFail) { 209 reconnector.latch(); 210 } 211 if (socket == null) { 212 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available"); 213 } 214 } 215 synchronized (this) { 216 try { 217 writeAndFlush(bytes, offset, length, immediateFlush); 218 } catch (final IOException causeEx) { 219 if (retry && reconnector == null) { 220 final String config = inetAddress + ":" + port; 221 reconnector = createReconnector(); 222 try { 223 reconnector.reconnect(); 224 } catch (final IOException reconnEx) { 225 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}", 226 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 227 reconnector.start(); 228 throw new AppenderLoggingException( 229 String.format("Error sending to %s for %s", getName(), config), causeEx); 230 } 231 try { 232 writeAndFlush(bytes, offset, length, immediateFlush); 233 } catch (final IOException e) { 234 throw new AppenderLoggingException( 235 String.format("Error writing to %s after reestablishing connection for %s", getName(), 236 config), 237 causeEx); 238 } 239 } 240 } 241 } 242 } 243 244 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) 245 throws IOException { 246 @SuppressWarnings("resource") // outputStream is managed by this class 247 final OutputStream outputStream = getOutputStream(); 248 outputStream.write(bytes, offset, length); 249 if (immediateFlush) { 250 outputStream.flush(); 251 } 252 } 253 254 @Override 255 protected synchronized boolean closeOutputStream() { 256 final boolean closed = super.closeOutputStream(); 257 if (reconnector != null) { 258 reconnector.shutdown(); 259 reconnector.interrupt(); 260 reconnector = null; 261 } 262 final Socket oldSocket = socket; 263 socket = null; 264 if (oldSocket != null) { 265 try { 266 oldSocket.close(); 267 } catch (final IOException e) { 268 LOGGER.error("Could not close socket {}", socket); 269 return false; 270 } 271 } 272 return closed; 273 } 274 275 public int getConnectTimeoutMillis() { 276 return connectTimeoutMillis; 277 } 278 279 /** 280 * Gets this TcpSocketManager's content format. Specified by: 281 * <ul> 282 * <li>Key: "protocol" Value: "tcp"</li> 283 * <li>Key: "direction" Value: "out"</li> 284 * </ul> 285 * 286 * @return Map of content format keys supporting TcpSocketManager 287 */ 288 @Override 289 public Map<String, String> getContentFormat() { 290 final Map<String, String> result = new HashMap<>(super.getContentFormat()); 291 result.put("protocol", "tcp"); 292 result.put("direction", "out"); 293 return result; 294 } 295 296 /** 297 * Handles reconnecting to a Socket on a Thread. 298 */ 299 private class Reconnector extends Log4jThread { 300 301 private final CountDownLatch latch = new CountDownLatch(1); 302 303 private boolean shutdown = false; 304 305 private final Object owner; 306 307 public Reconnector(final OutputStreamManager owner) { 308 super("TcpSocketManager-Reconnector"); 309 this.owner = owner; 310 } 311 312 public void latch() { 313 try { 314 latch.await(); 315 } catch (final InterruptedException ex) { 316 // Ignore the exception. 317 } 318 } 319 320 public void shutdown() { 321 shutdown = true; 322 } 323 324 @Override 325 public void run() { 326 while (!shutdown) { 327 try { 328 sleep(reconnectionDelayMillis); 329 reconnect(); 330 } catch (final InterruptedException ie) { 331 LOGGER.debug("Reconnection interrupted."); 332 } catch (final ConnectException ex) { 333 LOGGER.debug("{}:{} refused connection", host, port); 334 } catch (final IOException ioe) { 335 LOGGER.debug("Unable to reconnect to {}:{}", host, port); 336 } finally { 337 latch.countDown(); 338 } 339 } 340 } 341 342 void reconnect() throws IOException { 343 final Socket sock = createSocket(inetAddress.getHostName(), port); 344 @SuppressWarnings("resource") // newOS is managed by the enclosing Manager. 345 final OutputStream newOS = sock.getOutputStream(); 346 synchronized (owner) { 347 Closer.closeSilently(getOutputStream()); 348 setOutputStream(newOS); 349 socket = sock; 350 reconnector = null; 351 shutdown = true; 352 } 353 LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket); 354 } 355 356 @Override 357 public String toString() { 358 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]"; 359 } 360 } 361 362 private Reconnector createReconnector() { 363 final Reconnector recon = new Reconnector(this); 364 recon.setDaemon(true); 365 recon.setPriority(Thread.MIN_PRIORITY); 366 return recon; 367 } 368 369 protected Socket createSocket(final String host, final int port) throws IOException { 370 return createSocket(host, port, socketOptions, connectTimeoutMillis); 371 } 372 373 protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions, 374 final int connectTimeoutMillis) throws IOException { 375 LOGGER.debug("Creating socket {}:{}", host, port); 376 final Socket newSocket = new Socket(); 377 if (socketOptions != null) { 378 // Not sure which options must be applied before or after the connect() call. 379 socketOptions.apply(newSocket); 380 } 381 newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis); 382 if (socketOptions != null) { 383 // Not sure which options must be applied before or after the connect() call. 384 socketOptions.apply(newSocket); 385 } 386 return newSocket; 387 } 388 389 /** 390 * Data for the factory. 391 */ 392 static class FactoryData { 393 protected final String host; 394 protected final int port; 395 protected final int connectTimeoutMillis; 396 protected final int reconnectDelayMillis; 397 protected final boolean immediateFail; 398 protected final Layout<? extends Serializable> layout; 399 protected final int bufferSize; 400 protected final SocketOptions socketOptions; 401 402 public FactoryData(final String host, final int port, final int connectTimeoutMillis, 403 final int reconnectDelayMillis, final boolean immediateFail, 404 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) { 405 this.host = host; 406 this.port = port; 407 this.connectTimeoutMillis = connectTimeoutMillis; 408 this.reconnectDelayMillis = reconnectDelayMillis; 409 this.immediateFail = immediateFail; 410 this.layout = layout; 411 this.bufferSize = bufferSize; 412 this.socketOptions = socketOptions; 413 } 414 415 @Override 416 public String toString() { 417 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis 418 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail 419 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]"; 420 } 421 } 422 423 /** 424 * Factory to create a TcpSocketManager. 425 * 426 * @param <M> 427 * The manager type. 428 * @param <T> 429 * The factory data type. 430 */ 431 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData> 432 implements ManagerFactory<M, T> { 433 434 @SuppressWarnings("resource") 435 @Override 436 public M createManager(final String name, final T data) { 437 InetAddress inetAddress; 438 OutputStream os; 439 try { 440 inetAddress = InetAddress.getByName(data.host); 441 } catch (final UnknownHostException ex) { 442 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex); 443 return null; 444 } 445 Socket socket = null; 446 try { 447 // LOG4J2-1042 448 socket = createSocket(data); 449 os = socket.getOutputStream(); 450 return createManager(name, os, socket, inetAddress, data); 451 } catch (final IOException ex) { 452 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex); 453 os = NullOutputStream.getInstance(); 454 } 455 if (data.reconnectDelayMillis == 0) { 456 Closer.closeSilently(socket); 457 return null; 458 } 459 return createManager(name, os, null, inetAddress, data); 460 } 461 462 @SuppressWarnings("unchecked") 463 M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) { 464 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, 465 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, 466 data.bufferSize, data.socketOptions); 467 } 468 469 Socket createSocket(final T data) throws IOException { 470 return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis); 471 } 472 473 } 474 475 /** 476 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 477 */ 478 public SocketOptions getSocketOptions() { 479 return socketOptions; 480 } 481 482 /** 483 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 484 */ 485 public Socket getSocket() { 486 return socket; 487 } 488 489 public int getReconnectionDelayMillis() { 490 return reconnectionDelayMillis; 491 } 492 493 @Override 494 public String toString() { 495 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector 496 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail=" 497 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress 498 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count=" 499 + count + "]"; 500 } 501 502}