001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.io.Serializable; 022import java.net.ConnectException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.Socket; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.concurrent.CountDownLatch; 032 033import org.apache.logging.log4j.core.Layout; 034import org.apache.logging.log4j.core.appender.AppenderLoggingException; 035import org.apache.logging.log4j.core.appender.ManagerFactory; 036import org.apache.logging.log4j.core.appender.OutputStreamManager; 037import org.apache.logging.log4j.core.util.Closer; 038import org.apache.logging.log4j.core.util.Log4jThread; 039import org.apache.logging.log4j.core.util.NullOutputStream; 040import org.apache.logging.log4j.util.Strings; 041 042/** 043 * Manager of TCP Socket connections. 044 */ 045public class TcpSocketManager extends AbstractSocketManager { 046 /** 047 * The default reconnection delay (30000 milliseconds or 30 seconds). 048 */ 049 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 050 /** 051 * The default port number of remote logging server (4560). 052 */ 053 private static final int DEFAULT_PORT = 4560; 054 055 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>(); 056 057 private final int reconnectionDelayMillis; 058 059 private Reconnector reconnector; 060 061 private Socket socket; 062 063 private final SocketOptions socketOptions; 064 065 private final boolean retry; 066 067 private final boolean immediateFail; 068 069 private final int connectTimeoutMillis; 070 071 /** 072 * Constructs. 073 * 074 * @param name 075 * The unique name of this connection. 076 * @param os 077 * The OutputStream. 078 * @param socket 079 * The Socket. 080 * @param inetAddress 081 * The Internet address of the host. 082 * @param host 083 * The name of the host. 084 * @param port 085 * The port number on the host. 086 * @param connectTimeoutMillis 087 * the connect timeout in milliseconds. 088 * @param reconnectionDelayMillis 089 * Reconnection interval. 090 * @param immediateFail 091 * True if the write should fail if no socket is immediately available. 092 * @param layout 093 * The Layout. 094 * @param bufferSize 095 * The buffer size. 096 * @deprecated Use 097 * {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}. 098 */ 099 @Deprecated 100 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 101 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 102 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 103 final int bufferSize) { 104 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail, 105 layout, bufferSize, null); 106 } 107 108 /** 109 * Constructs. 110 * 111 * @param name 112 * The unique name of this connection. 113 * @param os 114 * The OutputStream. 115 * @param socket 116 * The Socket. 117 * @param inetAddress 118 * The Internet address of the host. 119 * @param host 120 * The name of the host. 121 * @param port 122 * The port number on the host. 123 * @param connectTimeoutMillis 124 * the connect timeout in milliseconds. 125 * @param reconnectionDelayMillis 126 * Reconnection interval. 127 * @param immediateFail 128 * True if the write should fail if no socket is immediately available. 129 * @param layout 130 * The Layout. 131 * @param bufferSize 132 * The buffer size. 133 */ 134 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, 135 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, 136 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 137 final int bufferSize, final SocketOptions socketOptions) { 138 super(name, os, inetAddress, host, port, layout, true, bufferSize); 139 this.connectTimeoutMillis = connectTimeoutMillis; 140 this.reconnectionDelayMillis = reconnectionDelayMillis; 141 this.socket = socket; 142 this.immediateFail = immediateFail; 143 this.retry = reconnectionDelayMillis > 0; 144 if (socket == null) { 145 this.reconnector = createReconnector(); 146 this.reconnector.start(); 147 } 148 this.socketOptions = socketOptions; 149 } 150 151 /** 152 * Obtains a TcpSocketManager. 153 * 154 * @param host 155 * The host to connect to. 156 * @param port 157 * The port on the host. 158 * @param connectTimeoutMillis 159 * the connect timeout in milliseconds 160 * @param reconnectDelayMillis 161 * The interval to pause between retries. 162 * @param bufferSize 163 * The buffer size. 164 * @return A TcpSocketManager. 165 * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}. 166 */ 167 @Deprecated 168 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis, 169 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 170 final int bufferSize) { 171 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, 172 bufferSize, null); 173 } 174 175 /** 176 * Obtains a TcpSocketManager. 177 * 178 * @param host 179 * The host to connect to. 180 * @param port 181 * The port on the host. 182 * @param connectTimeoutMillis 183 * the connect timeout in milliseconds 184 * @param reconnectDelayMillis 185 * The interval to pause between retries. 186 * @param bufferSize 187 * The buffer size. 188 * @return A TcpSocketManager. 189 */ 190 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis, 191 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, 192 final int bufferSize, final SocketOptions socketOptions) { 193 if (Strings.isEmpty(host)) { 194 throw new IllegalArgumentException("A host name is required"); 195 } 196 if (port <= 0) { 197 port = DEFAULT_PORT; 198 } 199 if (reconnectDelayMillis == 0) { 200 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 201 } 202 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port, 203 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY); 204 } 205 206 @SuppressWarnings("sync-override") // synchronization on "this" is done within the method 207 @Override 208 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { 209 if (socket == null) { 210 if (reconnector != null && !immediateFail) { 211 reconnector.latch(); 212 } 213 if (socket == null) { 214 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available"); 215 } 216 } 217 synchronized (this) { 218 try { 219 writeAndFlush(bytes, offset, length, immediateFlush); 220 } catch (final IOException causeEx) { 221 if (retry && reconnector == null) { 222 final String config = inetAddress + ":" + port; 223 reconnector = createReconnector(); 224 try { 225 reconnector.reconnect(); 226 } catch (final IOException reconnEx) { 227 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}", 228 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 229 reconnector.start(); 230 throw new AppenderLoggingException( 231 String.format("Error sending to %s for %s", getName(), config), causeEx); 232 } 233 try { 234 writeAndFlush(bytes, offset, length, immediateFlush); 235 } catch (final IOException e) { 236 throw new AppenderLoggingException( 237 String.format("Error writing to %s after reestablishing connection for %s", getName(), 238 config), 239 causeEx); 240 } 241 } 242 } 243 } 244 } 245 246 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) 247 throws IOException { 248 @SuppressWarnings("resource") // outputStream is managed by this class 249 final OutputStream outputStream = getOutputStream(); 250 outputStream.write(bytes, offset, length); 251 if (immediateFlush) { 252 outputStream.flush(); 253 } 254 } 255 256 @Override 257 protected synchronized boolean closeOutputStream() { 258 final boolean closed = super.closeOutputStream(); 259 if (reconnector != null) { 260 reconnector.shutdown(); 261 reconnector.interrupt(); 262 reconnector = null; 263 } 264 final Socket oldSocket = socket; 265 socket = null; 266 if (oldSocket != null) { 267 try { 268 oldSocket.close(); 269 } catch (final IOException e) { 270 LOGGER.error("Could not close socket {}", socket); 271 return false; 272 } 273 } 274 return closed; 275 } 276 277 public int getConnectTimeoutMillis() { 278 return connectTimeoutMillis; 279 } 280 281 /** 282 * Gets this TcpSocketManager's content format. Specified by: 283 * <ul> 284 * <li>Key: "protocol" Value: "tcp"</li> 285 * <li>Key: "direction" Value: "out"</li> 286 * </ul> 287 * 288 * @return Map of content format keys supporting TcpSocketManager 289 */ 290 @Override 291 public Map<String, String> getContentFormat() { 292 final Map<String, String> result = new HashMap<>(super.getContentFormat()); 293 result.put("protocol", "tcp"); 294 result.put("direction", "out"); 295 return result; 296 } 297 298 /** 299 * Handles reconnecting to a Socket on a Thread. 300 */ 301 private class Reconnector extends Log4jThread { 302 303 private final CountDownLatch latch = new CountDownLatch(1); 304 305 private boolean shutdown = false; 306 307 private final Object owner; 308 309 public Reconnector(final OutputStreamManager owner) { 310 super("TcpSocketManager-Reconnector"); 311 this.owner = owner; 312 } 313 314 public void latch() { 315 try { 316 latch.await(); 317 } catch (final InterruptedException ex) { 318 // Ignore the exception. 319 } 320 } 321 322 public void shutdown() { 323 shutdown = true; 324 } 325 326 @Override 327 public void run() { 328 while (!shutdown) { 329 try { 330 sleep(reconnectionDelayMillis); 331 reconnect(); 332 } catch (final InterruptedException ie) { 333 LOGGER.debug("Reconnection interrupted."); 334 } catch (final ConnectException ex) { 335 LOGGER.debug("{}:{} refused connection", host, port); 336 } catch (final IOException ioe) { 337 LOGGER.debug("Unable to reconnect to {}:{}", host, port); 338 } finally { 339 latch.countDown(); 340 } 341 } 342 } 343 344 void reconnect() throws IOException { 345 List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port); 346 if (socketAddresses.size() == 1) { 347 LOGGER.debug("Reconnecting " + socketAddresses.get(0)); 348 connect(socketAddresses.get(0)); 349 } else { 350 IOException ioe = null; 351 for (InetSocketAddress socketAddress : socketAddresses) { 352 try { 353 LOGGER.debug("Reconnecting " + socketAddress); 354 connect(socketAddress); 355 return; 356 } catch (IOException ex) { 357 ioe = ex; 358 } 359 } 360 throw ioe; 361 } 362 } 363 364 private void connect(InetSocketAddress socketAddress) throws IOException { 365 final Socket sock = createSocket(socketAddress); 366 @SuppressWarnings("resource") // newOS is managed by the enclosing Manager. 367 final OutputStream newOS = sock.getOutputStream(); 368 InetAddress prev = socket != null ? socket.getInetAddress() : null; 369 synchronized (owner) { 370 Closer.closeSilently(getOutputStream()); 371 setOutputStream(newOS); 372 socket = sock; 373 reconnector = null; 374 shutdown = true; 375 } 376 String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ? 377 "reestablished" : "established"; 378 LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket); 379 } 380 381 @Override 382 public String toString() { 383 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]"; 384 } 385 } 386 387 private Reconnector createReconnector() { 388 final Reconnector recon = new Reconnector(this); 389 recon.setDaemon(true); 390 recon.setPriority(Thread.MIN_PRIORITY); 391 return recon; 392 } 393 394 protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException { 395 return createSocket(socketAddress, socketOptions, connectTimeoutMillis); 396 } 397 398 protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions, 399 final int connectTimeoutMillis) throws IOException { 400 LOGGER.debug("Creating socket {}", socketAddress.toString()); 401 final Socket newSocket = new Socket(); 402 if (socketOptions != null) { 403 // Not sure which options must be applied before or after the connect() call. 404 socketOptions.apply(newSocket); 405 } 406 newSocket.connect(socketAddress, connectTimeoutMillis); 407 if (socketOptions != null) { 408 // Not sure which options must be applied before or after the connect() call. 409 socketOptions.apply(newSocket); 410 } 411 return newSocket; 412 } 413 414 /** 415 * Data for the factory. 416 */ 417 static class FactoryData { 418 protected final String host; 419 protected final int port; 420 protected final int connectTimeoutMillis; 421 protected final int reconnectDelayMillis; 422 protected final boolean immediateFail; 423 protected final Layout<? extends Serializable> layout; 424 protected final int bufferSize; 425 protected final SocketOptions socketOptions; 426 427 public FactoryData(final String host, final int port, final int connectTimeoutMillis, 428 final int reconnectDelayMillis, final boolean immediateFail, 429 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) { 430 this.host = host; 431 this.port = port; 432 this.connectTimeoutMillis = connectTimeoutMillis; 433 this.reconnectDelayMillis = reconnectDelayMillis; 434 this.immediateFail = immediateFail; 435 this.layout = layout; 436 this.bufferSize = bufferSize; 437 this.socketOptions = socketOptions; 438 } 439 440 @Override 441 public String toString() { 442 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis 443 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail 444 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]"; 445 } 446 } 447 448 /** 449 * Factory to create a TcpSocketManager. 450 * 451 * @param <M> 452 * The manager type. 453 * @param <T> 454 * The factory data type. 455 */ 456 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData> 457 implements ManagerFactory<M, T> { 458 459 static HostResolver resolver = new HostResolver(); 460 461 @SuppressWarnings("resource") 462 @Override 463 public M createManager(final String name, final T data) { 464 InetAddress inetAddress; 465 OutputStream os; 466 try { 467 inetAddress = InetAddress.getByName(data.host); 468 } catch (final UnknownHostException ex) { 469 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex); 470 return null; 471 } 472 Socket socket = null; 473 try { 474 // LOG4J2-1042 475 socket = createSocket(data); 476 os = socket.getOutputStream(); 477 return createManager(name, os, socket, inetAddress, data); 478 } catch (final IOException ex) { 479 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex); 480 os = NullOutputStream.getInstance(); 481 } 482 if (data.reconnectDelayMillis == 0) { 483 Closer.closeSilently(socket); 484 return null; 485 } 486 return createManager(name, os, null, inetAddress, data); 487 } 488 489 @SuppressWarnings("unchecked") 490 M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) { 491 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, 492 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, 493 data.bufferSize, data.socketOptions); 494 } 495 496 Socket createSocket(final T data) throws IOException { 497 List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port); 498 IOException ioe = null; 499 for (InetSocketAddress socketAddress : socketAddresses) { 500 try { 501 return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis); 502 } catch (IOException ex) { 503 ioe = ex; 504 } 505 } 506 throw new IOException(errorMessage(data, socketAddresses) , ioe); 507 } 508 509 protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) { 510 StringBuilder sb = new StringBuilder("Unable to create socket for "); 511 sb.append(data.host).append(" at port ").append(data.port); 512 if (socketAddresses.size() == 1) { 513 if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) { 514 sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress()); 515 sb.append(" and port ").append(socketAddresses.get(0).getPort()); 516 } 517 } else { 518 sb.append(" using ip addresses and ports "); 519 for (int i = 0; i < socketAddresses.size(); ++i) { 520 if (i > 0) { 521 sb.append(", "); 522 sb.append(socketAddresses.get(i).getAddress().getHostAddress()); 523 sb.append(":").append(socketAddresses.get(i).getPort()); 524 } 525 } 526 } 527 return sb.toString(); 528 } 529 530 } 531 532 /** 533 * This method is only for unit testing. It is not Thread-safe. 534 * @param resolver the HostResolver. 535 */ 536 public static void setHostResolver(HostResolver resolver) { 537 TcpSocketManagerFactory.resolver = resolver; 538 } 539 540 public static class HostResolver { 541 542 public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException { 543 InetAddress[] addresses = InetAddress.getAllByName(host); 544 List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length); 545 for (InetAddress address: addresses) { 546 socketAddresses.add(new InetSocketAddress(address, port)); 547 } 548 return socketAddresses; 549 } 550 } 551 552 /** 553 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 554 */ 555 public SocketOptions getSocketOptions() { 556 return socketOptions; 557 } 558 559 /** 560 * USE AT YOUR OWN RISK, method is public for testing purpose only for now. 561 */ 562 public Socket getSocket() { 563 return socket; 564 } 565 566 public int getReconnectionDelayMillis() { 567 return reconnectionDelayMillis; 568 } 569 570 @Override 571 public String toString() { 572 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector 573 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail=" 574 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress 575 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count=" 576 + count + "]"; 577 } 578 579}