View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.net.ConnectException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.Socket;
26  import java.net.UnknownHostException;
27  import java.util.ArrayList;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.concurrent.CountDownLatch;
32  
33  import org.apache.logging.log4j.core.Layout;
34  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
35  import org.apache.logging.log4j.core.appender.ManagerFactory;
36  import org.apache.logging.log4j.core.appender.OutputStreamManager;
37  import org.apache.logging.log4j.core.util.Closer;
38  import org.apache.logging.log4j.core.util.Log4jThread;
39  import org.apache.logging.log4j.core.util.NullOutputStream;
40  import org.apache.logging.log4j.util.Strings;
41  
42  /**
43   * Manager of TCP Socket connections.
44   */
45  public class TcpSocketManager extends AbstractSocketManager {
46      /**
47       * The default reconnection delay (30000 milliseconds or 30 seconds).
48       */
49      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
50      /**
51       * The default port number of remote logging server (4560).
52       */
53      private static final int DEFAULT_PORT = 4560;
54  
55      private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
56  
57      private final int reconnectionDelayMillis;
58  
59      private Reconnector reconnector;
60  
61      private Socket socket;
62  
63      private final SocketOptions socketOptions;
64  
65      private final boolean retry;
66  
67      private final boolean immediateFail;
68  
69      private final int connectTimeoutMillis;
70  
71      /**
72       * Constructs.
73       *
74       * @param name
75       *            The unique name of this connection.
76       * @param os
77       *            The OutputStream.
78       * @param socket
79       *            The Socket.
80       * @param inetAddress
81       *            The Internet address of the host.
82       * @param host
83       *            The name of the host.
84       * @param port
85       *            The port number on the host.
86       * @param connectTimeoutMillis
87       *            the connect timeout in milliseconds.
88       * @param reconnectionDelayMillis
89       *            Reconnection interval.
90       * @param immediateFail
91       *            True if the write should fail if no socket is immediately available.
92       * @param layout
93       *            The Layout.
94       * @param bufferSize
95       *            The buffer size.
96       * @deprecated Use
97       *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
98       */
99      @Deprecated
100     public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
101             final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
102             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
103             final int bufferSize) {
104         this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
105                 layout, bufferSize, null);
106     }
107 
108     /**
109      * Constructs.
110      *
111      * @param name
112      *            The unique name of this connection.
113      * @param os
114      *            The OutputStream.
115      * @param socket
116      *            The Socket.
117      * @param inetAddress
118      *            The Internet address of the host.
119      * @param host
120      *            The name of the host.
121      * @param port
122      *            The port number on the host.
123      * @param connectTimeoutMillis
124      *            the connect timeout in milliseconds.
125      * @param reconnectionDelayMillis
126      *            Reconnection interval.
127      * @param immediateFail
128      *            True if the write should fail if no socket is immediately available.
129      * @param layout
130      *            The Layout.
131      * @param bufferSize
132      *            The buffer size.
133      */
134     public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
135             final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
136             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
137             final int bufferSize, final SocketOptions socketOptions) {
138         super(name, os, inetAddress, host, port, layout, true, bufferSize);
139         this.connectTimeoutMillis = connectTimeoutMillis;
140         this.reconnectionDelayMillis = reconnectionDelayMillis;
141         this.socket = socket;
142         this.immediateFail = immediateFail;
143         this.retry = reconnectionDelayMillis > 0;
144         if (socket == null) {
145             this.reconnector = createReconnector();
146             this.reconnector.start();
147         }
148         this.socketOptions = socketOptions;
149     }
150 
151     /**
152      * Obtains a TcpSocketManager.
153      *
154      * @param host
155      *            The host to connect to.
156      * @param port
157      *            The port on the host.
158      * @param connectTimeoutMillis
159      *            the connect timeout in milliseconds
160      * @param reconnectDelayMillis
161      *            The interval to pause between retries.
162      * @param bufferSize
163      *            The buffer size.
164      * @return A TcpSocketManager.
165      * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
166      */
167     @Deprecated
168     public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
169             final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
170             final int bufferSize) {
171         return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
172                 bufferSize, null);
173     }
174 
175     /**
176      * Obtains a TcpSocketManager.
177      *
178      * @param host
179      *            The host to connect to.
180      * @param port
181      *            The port on the host.
182      * @param connectTimeoutMillis
183      *            the connect timeout in milliseconds
184      * @param reconnectDelayMillis
185      *            The interval to pause between retries.
186      * @param bufferSize
187      *            The buffer size.
188      * @return A TcpSocketManager.
189      */
190     public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
191             int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
192             final int bufferSize, final SocketOptions socketOptions) {
193         if (Strings.isEmpty(host)) {
194             throw new IllegalArgumentException("A host name is required");
195         }
196         if (port <= 0) {
197             port = DEFAULT_PORT;
198         }
199         if (reconnectDelayMillis == 0) {
200             reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
201         }
202         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
203                 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
204     }
205 
206     @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
207     @Override
208     protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
209         if (socket == null) {
210             if (reconnector != null && !immediateFail) {
211                 reconnector.latch();
212             }
213             if (socket == null) {
214                 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
215             }
216         }
217         synchronized (this) {
218             try {
219                 writeAndFlush(bytes, offset, length, immediateFlush);
220             } catch (final IOException causeEx) {
221                 if (retry && reconnector == null) {
222                     final String config = inetAddress + ":" + port;
223                     reconnector = createReconnector();
224                     try {
225                         reconnector.reconnect();
226                     } catch (final IOException reconnEx) {
227                         LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
228                                 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
229                         reconnector.start();
230                         throw new AppenderLoggingException(
231                                 String.format("Error sending to %s for %s", getName(), config), causeEx);
232                     }
233                     try {
234                         writeAndFlush(bytes, offset, length, immediateFlush);
235                     } catch (final IOException e) {
236                         throw new AppenderLoggingException(
237                                 String.format("Error writing to %s after reestablishing connection for %s", getName(),
238                                         config),
239                                 causeEx);
240                     }
241                 }
242             }
243         }
244     }
245 
246     private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
247             throws IOException {
248         @SuppressWarnings("resource") // outputStream is managed by this class
249         final OutputStream outputStream = getOutputStream();
250         outputStream.write(bytes, offset, length);
251         if (immediateFlush) {
252             outputStream.flush();
253         }
254     }
255 
256     @Override
257     protected synchronized boolean closeOutputStream() {
258         final boolean closed = super.closeOutputStream();
259         if (reconnector != null) {
260             reconnector.shutdown();
261             reconnector.interrupt();
262             reconnector = null;
263         }
264         final Socket oldSocket = socket;
265         socket = null;
266         if (oldSocket != null) {
267             try {
268                 oldSocket.close();
269             } catch (final IOException e) {
270                 LOGGER.error("Could not close socket {}", socket);
271                 return false;
272             }
273         }
274         return closed;
275     }
276 
277     public int getConnectTimeoutMillis() {
278         return connectTimeoutMillis;
279     }
280 
281     /**
282      * Gets this TcpSocketManager's content format. Specified by:
283      * <ul>
284      * <li>Key: "protocol" Value: "tcp"</li>
285      * <li>Key: "direction" Value: "out"</li>
286      * </ul>
287      *
288      * @return Map of content format keys supporting TcpSocketManager
289      */
290     @Override
291     public Map<String, String> getContentFormat() {
292         final Map<String, String> result = new HashMap<>(super.getContentFormat());
293         result.put("protocol", "tcp");
294         result.put("direction", "out");
295         return result;
296     }
297 
298     /**
299      * Handles reconnecting to a Socket on a Thread.
300      */
301     private class Reconnector extends Log4jThread {
302 
303         private final CountDownLatch latch = new CountDownLatch(1);
304 
305         private boolean shutdown = false;
306 
307         private final Object owner;
308 
309         public Reconnector(final OutputStreamManager owner) {
310             super("TcpSocketManager-Reconnector");
311             this.owner = owner;
312         }
313 
314         public void latch() {
315             try {
316                 latch.await();
317             } catch (final InterruptedException ex) {
318                 // Ignore the exception.
319             }
320         }
321 
322         public void shutdown() {
323             shutdown = true;
324         }
325 
326         @Override
327         public void run() {
328             while (!shutdown) {
329                 try {
330                     sleep(reconnectionDelayMillis);
331                     reconnect();
332                 } catch (final InterruptedException ie) {
333                     LOGGER.debug("Reconnection interrupted.");
334                 } catch (final ConnectException ex) {
335                     LOGGER.debug("{}:{} refused connection", host, port);
336                 } catch (final IOException ioe) {
337                     LOGGER.debug("Unable to reconnect to {}:{}", host, port);
338                 } finally {
339                     latch.countDown();
340                 }
341             }
342         }
343 
344         void reconnect() throws IOException {
345             List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port);
346             if (socketAddresses.size() == 1) {
347                 LOGGER.debug("Reconnecting " + socketAddresses.get(0));
348                 connect(socketAddresses.get(0));
349             } else {
350                 IOException ioe = null;
351                 for (InetSocketAddress socketAddress : socketAddresses) {
352                     try {
353                         LOGGER.debug("Reconnecting " + socketAddress);
354                         connect(socketAddress);
355                         return;
356                     } catch (IOException ex) {
357                         ioe = ex;
358                     }
359                 }
360                 throw ioe;
361             }
362         }
363 
364         private void connect(InetSocketAddress socketAddress) throws IOException {
365             final Socket sock = createSocket(socketAddress);
366             @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
367             final OutputStream newOS = sock.getOutputStream();
368             InetAddress prev = socket != null ? socket.getInetAddress() : null;
369             synchronized (owner) {
370                 Closer.closeSilently(getOutputStream());
371                 setOutputStream(newOS);
372                 socket = sock;
373                 reconnector = null;
374                 shutdown = true;
375             }
376             String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ?
377                     "reestablished" : "established";
378             LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket);
379         }
380 
381         @Override
382         public String toString() {
383             return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]";
384         }
385     }
386 
387     private Reconnector createReconnector() {
388         final Reconnector recon = new Reconnector(this);
389         recon.setDaemon(true);
390         recon.setPriority(Thread.MIN_PRIORITY);
391         return recon;
392     }
393 
394     protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException {
395         return createSocket(socketAddress, socketOptions, connectTimeoutMillis);
396     }
397 
398     protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions,
399             final int connectTimeoutMillis) throws IOException {
400         LOGGER.debug("Creating socket {}", socketAddress.toString());
401         final Socket newSocket = new Socket();
402         if (socketOptions != null) {
403             // Not sure which options must be applied before or after the connect() call.
404             socketOptions.apply(newSocket);
405         }
406         newSocket.connect(socketAddress, connectTimeoutMillis);
407         if (socketOptions != null) {
408             // Not sure which options must be applied before or after the connect() call.
409             socketOptions.apply(newSocket);
410         }
411         return newSocket;
412     }
413 
414     /**
415      * Data for the factory.
416      */
417     static class FactoryData {
418         protected final String host;
419         protected final int port;
420         protected final int connectTimeoutMillis;
421         protected final int reconnectDelayMillis;
422         protected final boolean immediateFail;
423         protected final Layout<? extends Serializable> layout;
424         protected final int bufferSize;
425         protected final SocketOptions socketOptions;
426 
427         public FactoryData(final String host, final int port, final int connectTimeoutMillis,
428                 final int reconnectDelayMillis, final boolean immediateFail,
429                 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
430             this.host = host;
431             this.port = port;
432             this.connectTimeoutMillis = connectTimeoutMillis;
433             this.reconnectDelayMillis = reconnectDelayMillis;
434             this.immediateFail = immediateFail;
435             this.layout = layout;
436             this.bufferSize = bufferSize;
437             this.socketOptions = socketOptions;
438         }
439 
440         @Override
441         public String toString() {
442             return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
443                     + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
444                     + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
445         }
446     }
447 
448     /**
449      * Factory to create a TcpSocketManager.
450      *
451      * @param <M>
452      *            The manager type.
453      * @param <T>
454      *            The factory data type.
455      */
456     protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
457             implements ManagerFactory<M, T> {
458 
459         static HostResolver resolver = new HostResolver();
460 
461         @SuppressWarnings("resource")
462         @Override
463         public M createManager(final String name, final T data) {
464             InetAddress inetAddress;
465             OutputStream os;
466             try {
467                 inetAddress = InetAddress.getByName(data.host);
468             } catch (final UnknownHostException ex) {
469                 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
470                 return null;
471             }
472             Socket socket = null;
473             try {
474                 // LOG4J2-1042
475                 socket = createSocket(data);
476                 os = socket.getOutputStream();
477                 return createManager(name, os, socket, inetAddress, data);
478             } catch (final IOException ex) {
479                 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
480                 os = NullOutputStream.getInstance();
481             }
482             if (data.reconnectDelayMillis == 0) {
483                 Closer.closeSilently(socket);
484                 return null;
485             }
486             return createManager(name, os, null, inetAddress, data);
487         }
488 
489         @SuppressWarnings("unchecked")
490         M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
491             return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
492                     data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
493                     data.bufferSize, data.socketOptions);
494         }
495 
496         Socket createSocket(final T data) throws IOException {
497             List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port);
498             IOException ioe = null;
499             for (InetSocketAddress socketAddress : socketAddresses) {
500                 try {
501                     return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis);
502                 } catch (IOException ex) {
503                     ioe = ex;
504                 }
505             }
506             throw new IOException(errorMessage(data, socketAddresses) , ioe);
507         }
508 
509         protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) {
510             StringBuilder sb = new StringBuilder("Unable to create socket for ");
511             sb.append(data.host).append(" at port ").append(data.port);
512             if (socketAddresses.size() == 1) {
513                 if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) {
514                     sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress());
515                     sb.append(" and port ").append(socketAddresses.get(0).getPort());
516                 }
517             } else {
518                 sb.append(" using ip addresses and ports ");
519                 for (int i = 0; i < socketAddresses.size(); ++i) {
520                     if (i > 0) {
521                         sb.append(", ");
522                         sb.append(socketAddresses.get(i).getAddress().getHostAddress());
523                         sb.append(":").append(socketAddresses.get(i).getPort());
524                     }
525                 }
526             }
527             return sb.toString();
528         }
529 
530     }
531 
532     /**
533      * This method is only for unit testing. It is not Thread-safe.
534      * @param resolver the HostResolver.
535      */
536     public static void setHostResolver(HostResolver resolver) {
537         TcpSocketManagerFactory.resolver = resolver;
538     }
539 
540     public static class HostResolver {
541 
542         public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException {
543             InetAddress[] addresses = InetAddress.getAllByName(host);
544             List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length);
545             for (InetAddress address: addresses) {
546                 socketAddresses.add(new InetSocketAddress(address, port));
547             }
548             return socketAddresses;
549         }
550     }
551 
552     /**
553      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
554      */
555     public SocketOptions getSocketOptions() {
556         return socketOptions;
557     }
558 
559     /**
560      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
561      */
562     public Socket getSocket() {
563         return socket;
564     }
565 
566     public int getReconnectionDelayMillis() {
567         return reconnectionDelayMillis;
568     }
569 
570     @Override
571     public String toString() {
572         return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
573                 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
574                 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
575                 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
576                 + count + "]";
577     }
578 
579 }