001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.io.Serializable; 022import java.net.ConnectException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.Socket; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.concurrent.CountDownLatch; 032 033import org.apache.logging.log4j.core.Layout; 034import org.apache.logging.log4j.core.appender.AppenderLoggingException; 035import org.apache.logging.log4j.core.appender.ManagerFactory; 036import org.apache.logging.log4j.core.appender.OutputStreamManager; 037import org.apache.logging.log4j.core.util.Closer; 038import org.apache.logging.log4j.core.util.Log4jThread; 039import org.apache.logging.log4j.core.util.NullOutputStream; 040import org.apache.logging.log4j.util.Strings; 041 042/** 043 * Manager of TCP Socket connections. 044 */ 045public class TcpSocketManager extends AbstractSocketManager { 046 /** 047 * The default reconnection delay (30000 milliseconds or 30 seconds). 048 */ 049 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 050 /** 051 * The default port number of remote logging server (4560). 052 */ 053 private static final int DEFAULT_PORT = 4560; 054 055 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>(); 056 057 private final int reconnectionDelayMillis; 058 059 private Reconnector reconnector; 060 061 private Socket socket; 062 063 private final SocketOptions socketOptions; 064 065 private final boolean retry; 066 067 private final boolean immediateFail; 068 069 private final int connectTimeoutMillis; 070 071 /** 072 * Constructs. 073 * 074 * @param name 075 * The unique name of this connection. 076 * @param os 077 * The OutputStream. 078 * @param socket 079 * The Socket. 080 * @param inetAddress 081 * The Internet address of the host. 082 * @param host 083 * The name of the host. 084 * @param port 085 * The port number on the host. 086 * @param connectTimeoutMillis 087 * the connect timeout in milliseconds. 088 * @param reconnectionDelayMillis 089 * Reconnection interval. 090 * @param immediateFail 091 * True if the write should fail if no socket is immediately available. 092 * @param layout 093 * The Layout. 094 * @param bufferSize 095 * The buffer size. 096 * @deprecated Use 097 * {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}. 098 */ 099 @Deprecated 100 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 101 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 102 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 103 final int bufferSize) { 104 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail, 105 layout, bufferSize, null); 106 } 107 108 /** 109 * Constructs. 110 * 111 * @param name 112 * The unique name of this connection. 113 * @param os 114 * The OutputStream. 115 * @param socket 116 * The Socket. 117 * @param inetAddress 118 * The Internet address of the host. 119 * @param host 120 * The name of the host. 121 * @param port 122 * The port number on the host. 123 * @param connectTimeoutMillis 124 * the connect timeout in milliseconds. 125 * @param reconnectionDelayMillis 126 * Reconnection interval. 127 * @param immediateFail 128 * True if the write should fail if no socket is immediately available. 129 * @param layout 130 * The Layout. 131 * @param bufferSize 132 * The buffer size. 133 */ 134 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 135 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 136 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 137 final int bufferSize, final SocketOptions socketOptions) { 138 super(name, os, inetAddress, host, port, layout, true, bufferSize); 139 this.connectTimeoutMillis = connectTimeoutMillis; 140 this.reconnectionDelayMillis = reconnectionDelayMillis; 141 this.socket = socket; 142 this.immediateFail = immediateFail; 143 this.retry = reconnectionDelayMillis > 0; 144 if (socket == null) { 145 this.reconnector = createReconnector(); 146 this.reconnector.start(); 147 } 148 this.socketOptions = socketOptions; 149 } 150 151 /** 152 * Obtains a TcpSocketManager. 153 * 154 * @param host 155 * The host to connect to. 156 * @param port 157 * The port on the host. 158 * @param connectTimeoutMillis 159 * the connect timeout in milliseconds 160 * @param reconnectDelayMillis 161 * The interval to pause between retries. 162 * @param bufferSize 163 * The buffer size. 164 * @return A TcpSocketManager. 165 * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}. 166 */ 167 @Deprecated 168 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis, 169 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 170 final int bufferSize) { 171 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, 172 bufferSize, null); 173 } 174 175 /** 176 * Obtains a TcpSocketManager. 177 * 178 * @param host 179 * The host to connect to. 180 * @param port 181 * The port on the host. 182 * @param connectTimeoutMillis 183 * the connect timeout in milliseconds 184 * @param reconnectDelayMillis 185 * The interval to pause between retries. 186 * @param bufferSize 187 * The buffer size. 188 * @return A TcpSocketManager. 189 */ 190 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis, 191 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 192 final int bufferSize, final SocketOptions socketOptions) { 193 if (Strings.isEmpty(host)) { 194 throw new IllegalArgumentException("A host name is required"); 195 } 196 if (port <= 0) { 197 port = DEFAULT_PORT; 198 } 199 if (reconnectDelayMillis == 0) { 200 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 201 } 202 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port, 203 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY); 204 } 205 206 @SuppressWarnings("sync-override") // synchronization on "this" is done within the method 207 @Override 208 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { 209 if (socket == null) { 210 if (reconnector != null && !immediateFail) { 211 reconnector.latch(); 212 } 213 if (socket == null) { 214 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available"); 215 } 216 } 217 synchronized (this) { 218 try { 219 writeAndFlush(bytes, offset, length, immediateFlush); 220 } catch (final IOException causeEx) { 221 final String config = inetAddress + ":" + port; 222 if (retry && reconnector == null) { 223 reconnector = createReconnector(); 224 try { 225 reconnector.reconnect(); 226 } catch (final IOException reconnEx) { 227 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}", 228 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 229 reconnector.start(); 230 throw new AppenderLoggingException( 231 String.format("Error sending to %s for %s", getName(), config), causeEx); 232 } 233 try { 234 writeAndFlush(bytes, offset, length, immediateFlush); 235 } catch (final IOException e) { 236 throw new AppenderLoggingException( 237 String.format("Error writing to %s after reestablishing connection for %s", getName(), 238 config), 239 causeEx); 240 } 241 return; 242 } 243 final String message = String.format("Error writing to %s for connection %s", getName(), config); 244 throw new AppenderLoggingException(message, causeEx); 245 } 246 } 247 } 248 249 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) 250 throws IOException { 251 @SuppressWarnings("resource") // outputStream is managed by this class 252 final OutputStream outputStream = getOutputStream(); 253 outputStream.write(bytes, offset, length); 254 if (immediateFlush) { 255 outputStream.flush(); 256 } 257 } 258 259 @Override 260 protected synchronized boolean closeOutputStream() { 261 final boolean closed = super.closeOutputStream(); 262 if (reconnector != null) { 263 reconnector.shutdown(); 264 reconnector.interrupt(); 265 reconnector = null; 266 } 267 final Socket oldSocket = socket; 268 socket = null; 269 if (oldSocket != null) { 270 try { 271 oldSocket.close(); 272 } catch (final IOException e) { 273 LOGGER.error("Could not close socket {}", socket); 274 return false; 275 } 276 } 277 return closed; 278 } 279 280 public int getConnectTimeoutMillis() { 281 return connectTimeoutMillis; 282 } 283 284 /** 285 * Gets this TcpSocketManager's content format. Specified by: 286 * <ul> 287 * <li>Key: "protocol" Value: "tcp"</li> 288 * <li>Key: "direction" Value: "out"</li> 289 * </ul> 290 * 291 * @return Map of content format keys supporting TcpSocketManager 292 */ 293 @Override 294 public Map<String, String> getContentFormat() { 295 final Map<String, String> result = new HashMap<>(super.getContentFormat()); 296 result.put("protocol", "tcp"); 297 result.put("direction", "out"); 298 return result; 299 } 300 301 /** 302 * Handles reconnecting to a Socket on a Thread. 303 */ 304 private class Reconnector extends Log4jThread { 305 306 private final CountDownLatch latch = new CountDownLatch(1); 307 308 private boolean shutdown = false; 309 310 private final Object owner; 311 312 public Reconnector(final OutputStreamManager owner) { 313 super("TcpSocketManager-Reconnector"); 314 this.owner = owner; 315 } 316 317 public void latch() { 318 try { 319 latch.await(); 320 } catch (final InterruptedException ex) { 321 // Ignore the exception. 322 } 323 } 324 325 public void shutdown() { 326 shutdown = true; 327 } 328 329 @Override 330 public void run() { 331 while (!shutdown) { 332 try { 333 sleep(reconnectionDelayMillis); 334 reconnect(); 335 } catch (final InterruptedException ie) { 336 LOGGER.debug("Reconnection interrupted."); 337 } catch (final ConnectException ex) { 338 LOGGER.debug("{}:{} refused connection", host, port); 339 } catch (final IOException ioe) { 340 LOGGER.debug("Unable to reconnect to {}:{}", host, port); 341 } finally { 342 latch.countDown(); 343 } 344 } 345 } 346 347 void reconnect() throws IOException { 348 List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port); 349 if (socketAddresses.size() == 1) { 350 LOGGER.debug("Reconnecting " + socketAddresses.get(0)); 351 connect(socketAddresses.get(0)); 352 } else { 353 IOException ioe = null; 354 for (InetSocketAddress socketAddress : socketAddresses) { 355 try { 356 LOGGER.debug("Reconnecting " + socketAddress); 357 connect(socketAddress); 358 return; 359 } catch (IOException ex) { 360 ioe = ex; 361 } 362 } 363 throw ioe; 364 } 365 } 366 367 private void connect(InetSocketAddress socketAddress) throws IOException { 368 final Socket sock = createSocket(socketAddress); 369 @SuppressWarnings("resource") // newOS is managed by the enclosing Manager. 370 final OutputStream newOS = sock.getOutputStream(); 371 InetAddress prev = socket != null ? socket.getInetAddress() : null; 372 synchronized (owner) { 373 Closer.closeSilently(getOutputStream()); 374 setOutputStream(newOS); 375 socket = sock; 376 reconnector = null; 377 shutdown = true; 378 } 379 String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ? 380 "reestablished" : "established"; 381 LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket); 382 } 383 384 @Override 385 public String toString() { 386 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]"; 387 } 388 } 389 390 private Reconnector createReconnector() { 391 final Reconnector recon = new Reconnector(this); 392 recon.setDaemon(true); 393 recon.setPriority(Thread.MIN_PRIORITY); 394 return recon; 395 } 396 397 protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException { 398 return createSocket(socketAddress, socketOptions, connectTimeoutMillis); 399 } 400 401 protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions, 402 final int connectTimeoutMillis) throws IOException { 403 LOGGER.debug("Creating socket {}", socketAddress.toString()); 404 final Socket newSocket = new Socket(); 405 if (socketOptions != null) { 406 // Not sure which options must be applied before or after the connect() call. 407 socketOptions.apply(newSocket); 408 } 409 newSocket.connect(socketAddress, connectTimeoutMillis); 410 if (socketOptions != null) { 411 // Not sure which options must be applied before or after the connect() call. 412 socketOptions.apply(newSocket); 413 } 414 return newSocket; 415 } 416 417 /** 418 * Data for the factory. 419 */ 420 static class FactoryData { 421 protected final String host; 422 protected final int port; 423 protected final int connectTimeoutMillis; 424 protected final int reconnectDelayMillis; 425 protected final boolean immediateFail; 426 protected final Layout<? extends Serializable> layout; 427 protected final int bufferSize; 428 protected final SocketOptions socketOptions; 429 430 public FactoryData(final String host, final int port, final int connectTimeoutMillis, 431 final int reconnectDelayMillis, final boolean immediateFail, 432 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) { 433 this.host = host; 434 this.port = port; 435 this.connectTimeoutMillis = connectTimeoutMillis; 436 this.reconnectDelayMillis = reconnectDelayMillis; 437 this.immediateFail = immediateFail; 438 this.layout = layout; 439 this.bufferSize = bufferSize; 440 this.socketOptions = socketOptions; 441 } 442 443 @Override 444 public String toString() { 445 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis 446 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail 447 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]"; 448 } 449 } 450 451 /** 452 * Factory to create a TcpSocketManager. 453 * 454 * @param <M> 455 * The manager type. 456 * @param <T> 457 * The factory data type. 458 */ 459 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData> 460 implements ManagerFactory<M, T> { 461 462 static HostResolver resolver = new HostResolver(); 463 464 @SuppressWarnings("resource") 465 @Override 466 public M createManager(final String name, final T data) { 467 InetAddress inetAddress; 468 OutputStream os; 469 try { 470 inetAddress = InetAddress.getByName(data.host); 471 } catch (final UnknownHostException ex) { 472 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex); 473 return null; 474 } 475 Socket socket = null; 476 try { 477 // LOG4J2-1042 478 socket = createSocket(data); 479 os = socket.getOutputStream(); 480 return createManager(name, os, socket, inetAddress, data); 481 } catch (final IOException ex) { 482 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex); 483 os = NullOutputStream.getInstance(); 484 } 485 if (data.reconnectDelayMillis == 0) { 486 Closer.closeSilently(socket); 487 return null; 488 } 489 return createManager(name, os, null, inetAddress, data); 490 } 491 492 @SuppressWarnings("unchecked") 493 M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) { 494 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, 495 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, 496 data.bufferSize, data.socketOptions); 497 } 498 499 Socket createSocket(final T data) throws IOException { 500 List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port); 501 IOException ioe = null; 502 for (InetSocketAddress socketAddress : socketAddresses) { 503 try { 504 return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis); 505 } catch (IOException ex) { 506 ioe = ex; 507 } 508 } 509 throw new IOException(errorMessage(data, socketAddresses) , ioe); 510 } 511 512 protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) { 513 StringBuilder sb = new StringBuilder("Unable to create socket for "); 514 sb.append(data.host).append(" at port ").append(data.port); 515 if (socketAddresses.size() == 1) { 516 if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) { 517 sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress()); 518 sb.append(" and port ").append(socketAddresses.get(0).getPort()); 519 } 520 } else { 521 sb.append(" using ip addresses and ports "); 522 for (int i = 0; i < socketAddresses.size(); ++i) { 523 if (i > 0) { 524 sb.append(", "); 525 sb.append(socketAddresses.get(i).getAddress().getHostAddress()); 526 sb.append(":").append(socketAddresses.get(i).getPort()); 527 } 528 } 529 } 530 return sb.toString(); 531 } 532 533 } 534 535 /** 536 * This method is only for unit testing. It is not Thread-safe. 537 * @param resolver the HostResolver. 538 */ 539 public static void setHostResolver(HostResolver resolver) { 540 TcpSocketManagerFactory.resolver = resolver; 541 } 542 543 public static class HostResolver { 544 545 public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException { 546 InetAddress[] addresses = InetAddress.getAllByName(host); 547 List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length); 548 for (InetAddress address: addresses) { 549 socketAddresses.add(new InetSocketAddress(address, port)); 550 } 551 return socketAddresses; 552 } 553 } 554 555 /** 556 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 557 */ 558 public SocketOptions getSocketOptions() { 559 return socketOptions; 560 } 561 562 /** 563 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 564 */ 565 public Socket getSocket() { 566 return socket; 567 } 568 569 public int getReconnectionDelayMillis() { 570 return reconnectionDelayMillis; 571 } 572 573 @Override 574 public String toString() { 575 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector 576 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail=" 577 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress 578 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count=" 579 + count + "]"; 580 } 581 582}