View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.ByteBuffer;
23  import java.util.Objects;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.logging.log4j.core.Layout;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
29  import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
30  import org.apache.logging.log4j.core.util.Constants;
31  
32  /**
33   * Manages an OutputStream so that it can be shared by multiple Appenders and will
34   * allow appenders to reconfigure without requiring a new stream.
35   */
36  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
37      protected final Layout<?> layout;
38      protected ByteBuffer byteBuffer;
39      private volatile OutputStream outputStream;
40      private boolean skipFooter;
41  
42      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
43              final boolean writeHeader) {
44          // Can't use new ctor because it throws an exception
45          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
46      }
47  
48      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
49              final boolean writeHeader, final int bufferSize) {
50          // Can't use new ctor because it throws an exception
51          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
52      }
53  
54      /**
55       * @since 2.6
56       * @deprecated
57       */
58      @Deprecated
59      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
60              final boolean writeHeader, final ByteBuffer byteBuffer) {
61          super(null, streamName);
62          this.outputStream = os;
63          this.layout = layout;
64          if (writeHeader && layout != null) {
65              final byte[] header = layout.getHeader();
66              if (header != null) {
67                  try {
68                      getOutputStream().write(header, 0, header.length);
69                  } catch (final IOException e) {
70                      logError("Unable to write header", e);
71                  }
72              }
73          }
74          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
75      }
76  
77      /**
78       * @since 2.7
79       */
80      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
81              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
82              final ByteBuffer byteBuffer) {
83          super(loggerContext, streamName);
84          if (createOnDemand && os != null) {
85              LOGGER.error(
86                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
87                      streamName);
88          }
89          this.layout = layout;
90          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
91          this.outputStream = os;
92          if (writeHeader && layout != null) {
93              final byte[] header = layout.getHeader();
94              if (header != null) {
95                  try {
96                      getOutputStream().write(header, 0, header.length);
97                  } catch (final IOException e) {
98                      logError("Unable to write header for " + streamName, e);
99                  }
100             }
101         }
102     }
103 
104     /**
105      * Creates a Manager.
106      *
107      * @param name The name of the stream to manage.
108      * @param data The data to pass to the Manager.
109      * @param factory The factory to use to create the Manager.
110      * @param <T> The type of the OutputStreamManager.
111      * @return An OutputStreamManager.
112      */
113     public static <T> OutputStreamManager getManager(final String name, final T data,
114                                                  final ManagerFactory<? extends OutputStreamManager, T> factory) {
115         return AbstractManager.getManager(name, factory, data);
116     }
117 
118     @SuppressWarnings("unused")
119     protected OutputStream createOutputStream() throws IOException {
120         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
121     }
122 
123     /**
124      * Indicate whether the footer should be skipped or not.
125      * @param skipFooter true if the footer should be skipped.
126      */
127     public void skipFooter(final boolean skipFooter) {
128         this.skipFooter = skipFooter;
129     }
130 
131     /**
132      * Default hook to write footer during close.
133      */
134     @Override
135     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
136         writeFooter();
137         return closeOutputStream();
138     }
139 
140     /**
141      * Writes the footer.
142      */
143     protected void writeFooter() {
144         if (layout == null || skipFooter) {
145             return;
146         }
147         final byte[] footer = layout.getFooter();
148         if (footer != null) {
149             write(footer);
150         }
151     }
152 
153     /**
154      * Returns the status of the stream.
155      * @return true if the stream is open, false if it is not.
156      */
157     public boolean isOpen() {
158         return getCount() > 0;
159     }
160 
161     public boolean hasOutputStream() {
162         return outputStream != null;
163     }
164 
165     protected OutputStream getOutputStream() throws IOException {
166         if (outputStream == null) {
167             outputStream = createOutputStream();
168         }
169         return outputStream;
170     }
171 
172     protected void setOutputStream(final OutputStream os) {
173         final byte[] header = layout.getHeader();
174         if (header != null) {
175             try {
176                 os.write(header, 0, header.length);
177                 this.outputStream = os; // only update field if os.write() succeeded
178             } catch (final IOException ioe) {
179                 logError("Unable to write header", ioe);
180             }
181         } else {
182             this.outputStream = os;
183         }
184     }
185 
186     /**
187      * Some output streams synchronize writes while others do not.
188      * @param bytes The serialized Log event.
189      * @throws AppenderLoggingException if an error occurs.
190      */
191     protected void write(final byte[] bytes)  {
192         write(bytes, 0, bytes.length, false);
193     }
194 
195     /**
196      * Some output streams synchronize writes while others do not.
197      * @param bytes The serialized Log event.
198      * @param immediateFlush If true, flushes after writing.
199      * @throws AppenderLoggingException if an error occurs.
200      */
201     protected void write(final byte[] bytes, final boolean immediateFlush)  {
202         write(bytes, 0, bytes.length, immediateFlush);
203     }
204 
205     @Override
206     public void writeBytes(final byte[] data, final int offset, final int length) {
207         write(data, offset, length, false);
208     }
209 
210     /**
211      * Some output streams synchronize writes while others do not. Synchronizing here insures that
212      * log events won't be intertwined.
213      * @param bytes The serialized Log event.
214      * @param offset The offset into the byte array.
215      * @param length The number of bytes to write.
216      * @throws AppenderLoggingException if an error occurs.
217      */
218     protected void write(final byte[] bytes, final int offset, final int length) {
219         writeBytes(bytes, offset, length);
220     }
221 
222     /**
223      * Some output streams synchronize writes while others do not. Synchronizing here insures that
224      * log events won't be intertwined.
225      * @param bytes The serialized Log event.
226      * @param offset The offset into the byte array.
227      * @param length The number of bytes to write.
228      * @param immediateFlush flushes immediately after writing.
229      * @throws AppenderLoggingException if an error occurs.
230      */
231     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
232         if (immediateFlush && byteBuffer.position() == 0) {
233             writeToDestination(bytes, offset, length);
234             flushDestination();
235             return;
236         }
237         if (length >= byteBuffer.capacity()) {
238             // if request length exceeds buffer capacity, flush the buffer and write the data directly
239             flush();
240             writeToDestination(bytes, offset, length);
241         } else {
242             if (length > byteBuffer.remaining()) {
243                 flush();
244             }
245             byteBuffer.put(bytes, offset, length);
246         }
247         if (immediateFlush) {
248             flush();
249         }
250     }
251 
252     /**
253      * Writes the specified section of the specified byte array to the stream.
254      *
255      * @param bytes the array containing data
256      * @param offset from where to write
257      * @param length how many bytes to write
258      * @since 2.6
259      */
260     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
261         try {
262             getOutputStream().write(bytes, offset, length);
263         } catch (final IOException ex) {
264             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
265         }
266     }
267 
268     /**
269      * Calls {@code flush()} on the underlying output stream.
270      * @since 2.6
271      */
272     protected synchronized void flushDestination() {
273         final OutputStream stream = outputStream; // access volatile field only once per method
274         if (stream != null) {
275             try {
276                 stream.flush();
277             } catch (final IOException ex) {
278                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
279             }
280         }
281     }
282 
283     /**
284      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
285      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
286      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
287      *
288      * @see #flushDestination()
289      * @since 2.6
290      */
291     protected synchronized void flushBuffer(final ByteBuffer buf) {
292         buf.flip();
293         if (buf.remaining() > 0) {
294             writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
295         }
296         buf.clear();
297     }
298 
299     /**
300      * Flushes any buffers.
301      */
302     public synchronized void flush() {
303         flushBuffer(byteBuffer);
304         flushDestination();
305     }
306 
307     protected synchronized boolean closeOutputStream() {
308         flush();
309         final OutputStream stream = outputStream; // access volatile field only once per method
310         if (stream == null || stream == System.out || stream == System.err) {
311             return true;
312         }
313         try {
314             stream.close();
315         } catch (final IOException ex) {
316             logError("Unable to close stream", ex);
317             return false;
318         }
319         return true;
320     }
321 
322     /**
323      * Returns this {@code ByteBufferDestination}'s buffer.
324      * @return the buffer
325      * @since 2.6
326      */
327     @Override
328     public ByteBuffer getByteBuffer() {
329         return byteBuffer;
330     }
331 
332     /**
333      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
334      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
335      * <p>
336      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
337      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
338      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
339      * buffer.
340      * </p><p>
341      * To just flush the buffered contents to the underlying stream, call
342      * {@link #flushBuffer(ByteBuffer)} directly instead.
343      * </p>
344      *
345      * @param buf the buffer whose contents to write the the destination
346      * @return the specified buffer
347      * @since 2.6
348      */
349     @Override
350     public ByteBuffer drain(final ByteBuffer buf) {
351         flushBuffer(buf);
352         return buf;
353     }
354 
355     @Override
356     public void writeBytes(final ByteBuffer data) {
357         if (data.remaining() == 0) {
358           return;
359         }
360         synchronized (this) {
361           ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
362         }
363     }
364 }