View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.io.Serializable;
23  import java.net.ConnectException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.Socket;
27  import java.net.UnknownHostException;
28  import java.util.HashMap;
29  import java.util.Map;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.logging.log4j.core.Layout;
33  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34  import org.apache.logging.log4j.core.appender.ManagerFactory;
35  import org.apache.logging.log4j.core.appender.OutputStreamManager;
36  import org.apache.logging.log4j.util.Strings;
37  
38  /**
39   * Manager of TCP Socket connections.
40   */
41  public class TcpSocketManager extends AbstractSocketManager {
42      /**
43        The default reconnection delay (30000 milliseconds or 30 seconds).
44       */
45      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
46      /**
47        The default port number of remote logging server (4560).
48       */
49      private static final int DEFAULT_PORT = 4560;
50  
51      private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
52  
53      private final int reconnectionDelay;
54  
55      private Reconnector connector;
56  
57      private Socket socket;
58  
59      private final boolean retry;
60  
61      private final boolean immediateFail;
62      
63      private final int connectTimeoutMillis;
64  
65      /**
66       * The Constructor.
67       * @param name The unique name of this connection.
68       * @param os The OutputStream.
69       * @param sock The Socket.
70       * @param inetAddress The Internet address of the host.
71       * @param host The name of the host.
72       * @param port The port number on the host.
73       * @param connectTimeoutMillis the connect timeout in milliseconds.
74       * @param delay Reconnection interval.
75       * @param immediateFail
76       * @param layout The Layout.
77       */
78      public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
79                              final String host, final int port, int connectTimeoutMillis, final int delay,
80                              final boolean immediateFail, final Layout<? extends Serializable> layout) {
81          super(name, os, inetAddress, host, port, layout);
82          this.connectTimeoutMillis = connectTimeoutMillis;
83          this.reconnectionDelay = delay;
84          this.socket = sock;
85          this.immediateFail = immediateFail;
86          retry = delay > 0;
87          if (sock == null) {
88              connector = new Reconnector(this);
89              connector.setDaemon(true);
90              connector.setPriority(Thread.MIN_PRIORITY);
91              connector.start();
92          }
93      }
94  
95      /**
96       * Obtain a TcpSocketManager.
97       * @param host The host to connect to.
98       * @param port The port on the host.
99       * @param connectTimeoutMillis the connect timeout in milliseconds
100      * @param delayMillis The interval to pause between retries.
101      * @return A TcpSocketManager.
102      */
103     public static TcpSocketManager getSocketManager(final String host, int port, int connectTimeoutMillis,
104             int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105         if (Strings.isEmpty(host)) {
106             throw new IllegalArgumentException("A host name is required");
107         }
108         if (port <= 0) {
109             port = DEFAULT_PORT;
110         }
111         if (delayMillis == 0) {
112             delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113         }
114         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115                 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116     }
117 
118     @Override
119     protected void write(final byte[] bytes, final int offset, final int length)  {
120         if (socket == null) {
121             if (connector != null && !immediateFail) {
122                 connector.latch();
123             }
124             if (socket == null) {
125                 final String msg = "Error writing to " + getName() + " socket not available";
126                 throw new AppenderLoggingException(msg);
127             }
128         }
129         synchronized (this) {
130             try {
131                 getOutputStream().write(bytes, offset, length);
132             } catch (final IOException ex) {
133                 if (retry && connector == null) {
134                     connector = new Reconnector(this);
135                     connector.setDaemon(true);
136                     connector.setPriority(Thread.MIN_PRIORITY);
137                     connector.start();
138                 }
139                 final String msg = "Error writing to " + getName();
140                 throw new AppenderLoggingException(msg, ex);
141             }
142         }
143     }
144 
145     @Override
146     protected synchronized void close() {
147         super.close();
148         if (connector != null) {
149             connector.shutdown();
150             connector.interrupt();
151             connector = null;
152         }
153     }
154 
155     public int getConnectTimeoutMillis() {
156         return connectTimeoutMillis;
157     }
158 
159     /**
160      * Gets this TcpSocketManager's content format. Specified by:
161      * <ul>
162      * <li>Key: "protocol" Value: "tcp"</li>
163      * <li>Key: "direction" Value: "out"</li>
164      * </ul>
165      * 
166      * @return Map of content format keys supporting TcpSocketManager
167      */
168     @Override
169     public Map<String, String> getContentFormat() {
170         final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
171         result.put("protocol", "tcp");
172         result.put("direction", "out");
173         return result;
174     }
175 
176     /**
177      * Handles reconnecting to a Thread.
178      */
179     private class Reconnector extends Thread {
180 
181         private final CountDownLatch latch = new CountDownLatch(1);
182 
183         private boolean shutdown = false;
184 
185         private final Object owner;
186 
187         public Reconnector(final OutputStreamManager owner) {
188             this.owner = owner;
189         }
190 
191         public void latch()  {
192             try {
193                 latch.await();
194             } catch (final InterruptedException ex) {
195                 // Ignore the exception.
196             }
197         }
198 
199         public void shutdown() {
200             shutdown = true;
201         }
202 
203         @Override
204         public void run() {
205             while (!shutdown) {
206                 try {
207                     sleep(reconnectionDelay);
208                     final Socket sock = createSocket(inetAddress, port);
209                     final OutputStream newOS = sock.getOutputStream();
210                     synchronized (owner) {
211                         try {
212                             getOutputStream().close();
213                         } catch (final IOException ioe) {
214                             // Ignore this.
215                         }
216 
217                         setOutputStream(newOS);
218                         socket = sock;
219                         connector = null;
220                         shutdown = true;
221                     }
222                     LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
223                 } catch (final InterruptedException ie) {
224                     LOGGER.debug("Reconnection interrupted.");
225                 } catch (final ConnectException ex) {
226                     LOGGER.debug(host + ':' + port + " refused connection");
227                 } catch (final IOException ioe) {
228                     LOGGER.debug("Unable to reconnect to " + host + ':' + port);
229                 } finally {
230                     latch.countDown();
231                 }
232             }
233         }
234     }
235 
236     protected Socket createSocket(final InetAddress host, final int port) throws IOException {
237         return createSocket(host.getHostName(), port);
238     }
239 
240     protected Socket createSocket(final String host, final int port) throws IOException {
241         final InetSocketAddress address = new InetSocketAddress(host, port);
242         final Socket newSocket = new Socket();
243         newSocket.connect(address, connectTimeoutMillis);
244         return newSocket;
245     }
246 
247     /**
248      * Data for the factory.
249      */
250     private static class FactoryData {
251         private final String host;
252         private final int port;
253         private final int connectTimeoutMillis;
254         private final int delayMillis;
255         private final boolean immediateFail;
256         private final Layout<? extends Serializable> layout;
257 
258         public FactoryData(final String host, final int port, int connectTimeoutMillis, final int delayMillis,
259                            final boolean immediateFail, final Layout<? extends Serializable> layout) {
260             this.host = host;
261             this.port = port;
262             this.connectTimeoutMillis = connectTimeoutMillis;
263             this.delayMillis = delayMillis;
264             this.immediateFail = immediateFail;
265             this.layout = layout;
266         }
267     }
268 
269     /**
270      * Factory to create a TcpSocketManager.
271      */
272     protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
273         @Override
274         public TcpSocketManager createManager(final String name, final FactoryData data) {
275 
276             InetAddress inetAddress;
277             OutputStream os;
278             try {
279                 inetAddress = InetAddress.getByName(data.host);
280             } catch (final UnknownHostException ex) {
281                 LOGGER.error("Could not find address of " + data.host, ex);
282                 return null;
283             }
284             try {
285                 final Socket socket = new Socket(data.host, data.port);
286                 os = socket.getOutputStream();
287                 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
288                         data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
289             } catch (final IOException ex) {
290                 LOGGER.error("TcpSocketManager (" + name + ") " + ex);
291                 os = new ByteArrayOutputStream();
292             }
293             if (data.delayMillis == 0) {
294                 return null;
295             }
296             return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
297                     data.delayMillis, data.immediateFail, data.layout);
298         }
299     }
300 
301 }