View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.ByteBuffer;
23  import java.util.Objects;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.logging.log4j.core.Layout;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
29  import org.apache.logging.log4j.core.util.Constants;
30  
31  /**
32   * Manages an OutputStream so that it can be shared by multiple Appenders and will
33   * allow appenders to reconfigure without requiring a new stream.
34   */
35  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
36      protected final Layout<?> layout;
37      protected ByteBuffer byteBuffer;
38      private volatile OutputStream os;
39      private boolean skipFooter;
40  
41      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
42              final boolean writeHeader) {
43          // Can't use new ctor because it throws an exception
44          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
45      }
46  
47      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
48              final boolean writeHeader, final int bufferSize) {
49          // Can't use new ctor because it throws an exception
50          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
51      }
52  
53      /**
54       * @since 2.6
55       * @deprecated
56       */
57      @Deprecated
58      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
59              final boolean writeHeader, final ByteBuffer byteBuffer) {
60          super(null, streamName);
61          this.os = os;
62          this.layout = layout;
63          if (writeHeader && layout != null) {
64              final byte[] header = layout.getHeader();
65              if (header != null) {
66                  try {
67                      getOutputStream().write(header, 0, header.length);
68                  } catch (final IOException e) {
69                      logError("Unable to write header", e);
70                  }
71              }
72          }
73          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
74      }
75  
76      /**
77       * @since 2.7
78       */
79      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
80              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
81              final ByteBuffer byteBuffer) {
82          super(loggerContext, streamName);
83          if (createOnDemand && os != null) {
84              LOGGER.error(
85                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
86                      streamName);
87          }
88          this.layout = layout;
89          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
90          this.os = os;
91          if (writeHeader && layout != null) {
92              final byte[] header = layout.getHeader();
93              if (header != null) {
94                  try {
95                      getOutputStream().write(header, 0, header.length);
96                  } catch (final IOException e) {
97                      logError("Unable to write header for " + streamName, e);
98                  }
99              }
100         }
101     }
102 
103     /**
104      * Creates a Manager.
105      *
106      * @param name The name of the stream to manage.
107      * @param data The data to pass to the Manager.
108      * @param factory The factory to use to create the Manager.
109      * @param <T> The type of the OutputStreamManager.
110      * @return An OutputStreamManager.
111      */
112     public static <T> OutputStreamManager getManager(final String name, final T data,
113                                                  final ManagerFactory<? extends OutputStreamManager, T> factory) {
114         return AbstractManager.getManager(name, factory, data);
115     }
116 
117     @SuppressWarnings("unused")
118     protected OutputStream createOutputStream() throws IOException {
119         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
120     }
121     
122     /**
123      * Indicate whether the footer should be skipped or not.
124      * @param skipFooter true if the footer should be skipped.
125      */
126     public void skipFooter(final boolean skipFooter) {
127         this.skipFooter = skipFooter;
128     }
129 
130     /**
131      * Default hook to write footer during close.
132      */
133     @Override
134     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
135         writeFooter();
136         return closeOutputStream();
137     }
138 
139     /**
140      * Writes the footer.
141      */
142     protected void writeFooter() {
143         if (layout == null || skipFooter) {
144             return;
145         }
146         final byte[] footer = layout.getFooter();
147         if (footer != null) {
148             write(footer);
149         }
150     }
151 
152     /**
153      * Returns the status of the stream.
154      * @return true if the stream is open, false if it is not.
155      */
156     public boolean isOpen() {
157         return getCount() > 0;
158     }
159 
160     public boolean hasOutputStream() {
161         return os != null;
162     }
163 
164     protected OutputStream getOutputStream() throws IOException {
165         if (os == null) {
166             os = createOutputStream();
167         }
168         return os;
169     }
170 
171     protected void setOutputStream(final OutputStream os) {
172         final byte[] header = layout.getHeader();
173         if (header != null) {
174             try {
175                 os.write(header, 0, header.length);
176                 this.os = os; // only update field if os.write() succeeded
177             } catch (final IOException ioe) {
178                 logError("Unable to write header", ioe);
179             }
180         } else {
181             this.os = os;
182         }
183     }
184 
185     /**
186      * Some output streams synchronize writes while others do not.
187      * @param bytes The serialized Log event.
188      * @throws AppenderLoggingException if an error occurs.
189      */
190     protected void write(final byte[] bytes)  {
191         write(bytes, 0, bytes.length, false);
192     }
193 
194     /**
195      * Some output streams synchronize writes while others do not.
196      * @param bytes The serialized Log event.
197      * @param immediateFlush If true, flushes after writing.
198      * @throws AppenderLoggingException if an error occurs.
199      */
200     protected void write(final byte[] bytes, final boolean immediateFlush)  {
201         write(bytes, 0, bytes.length, immediateFlush);
202     }
203 
204     /**
205      * Some output streams synchronize writes while others do not. Synchronizing here insures that
206      * log events won't be intertwined.
207      * @param bytes The serialized Log event.
208      * @param offset The offset into the byte array.
209      * @param length The number of bytes to write.
210      * @throws AppenderLoggingException if an error occurs.
211      */
212     protected void write(final byte[] bytes, final int offset, final int length) {
213         write(bytes, offset, length, false);
214     }
215 
216     /**
217      * Some output streams synchronize writes while others do not. Synchronizing here insures that
218      * log events won't be intertwined.
219      * @param bytes The serialized Log event.
220      * @param offset The offset into the byte array.
221      * @param length The number of bytes to write.
222      * @param immediateFlush flushes immediately after writing.
223      * @throws AppenderLoggingException if an error occurs.
224      */
225     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
226         if (immediateFlush && byteBuffer.position() == 0) {
227             writeToDestination(bytes, offset, length);
228             flushDestination();
229             return;
230         }
231         if (length >= byteBuffer.capacity()) {
232             // if request length exceeds buffer capacity, flush the buffer and write the data directly
233             flush();
234             writeToDestination(bytes, offset, length);
235         } else {
236             if (length > byteBuffer.remaining()) {
237                 flush();
238             }
239             byteBuffer.put(bytes, offset, length);
240         }
241         if (immediateFlush) {
242             flush();
243         }
244     }
245 
246     /**
247      * Writes the specified section of the specified byte array to the stream.
248      *
249      * @param bytes the array containing data
250      * @param offset from where to write
251      * @param length how many bytes to write
252      * @since 2.6
253      */
254     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
255         try {
256             getOutputStream().write(bytes, offset, length);
257         } catch (final IOException ex) {
258             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
259         }
260     }
261 
262     /**
263      * Calls {@code flush()} on the underlying output stream.
264      * @since 2.6
265      */
266     protected synchronized void flushDestination() {
267         final OutputStream stream = os; // access volatile field only once per method
268         if (stream != null) {
269             try {
270                 stream.flush();
271             } catch (final IOException ex) {
272                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
273             }
274         }
275     }
276 
277     /**
278      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
279      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
280      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
281      *
282      * @see #flushDestination()
283      * @since 2.6
284      */
285     protected synchronized void flushBuffer(final ByteBuffer buf) {
286         buf.flip();
287         if (buf.limit() > 0) {
288             writeToDestination(buf.array(), 0, buf.limit());
289         }
290         buf.clear();
291     }
292 
293     /**
294      * Flushes any buffers.
295      */
296     public synchronized void flush() {
297         flushBuffer(byteBuffer);
298         flushDestination();
299     }
300 
301     protected synchronized boolean closeOutputStream() {
302         flush();
303         final OutputStream stream = os; // access volatile field only once per method
304         if (stream == null || stream == System.out || stream == System.err) {
305             return true;
306         }
307         try {
308             stream.close();
309         } catch (final IOException ex) {
310             logError("Unable to close stream", ex);
311             return false;
312         }
313         return true;
314     }
315 
316     /**
317      * Returns this {@code ByteBufferDestination}'s buffer.
318      * @return the buffer
319      * @since 2.6
320      */
321     @Override
322     public ByteBuffer getByteBuffer() {
323         return byteBuffer;
324     }
325 
326     /**
327      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
328      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
329      * <p>
330      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
331      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
332      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
333      * buffer.
334      * </p><p>
335      * To just flush the buffered contents to the underlying stream, call
336      * {@link #flushBuffer(ByteBuffer)} directly instead.
337      * </p>
338      *
339      * @param buf the buffer whose contents to write the the destination
340      * @return the specified buffer
341      * @since 2.6
342      */
343     @Override
344     public ByteBuffer drain(final ByteBuffer buf) {
345         flushBuffer(buf);
346         return buf;
347     }
348 }