View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.Buffer;
23  import java.nio.ByteBuffer;
24  import java.util.Objects;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LoggerContext;
29  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
30  import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
31  import org.apache.logging.log4j.core.util.Constants;
32  
33  /**
34   * Manages an OutputStream so that it can be shared by multiple Appenders and will
35   * allow appenders to reconfigure without requiring a new stream.
36   */
37  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
38      protected final Layout<?> layout;
39      protected ByteBuffer byteBuffer;
40      private volatile OutputStream outputStream;
41      private boolean skipFooter;
42  
43      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
44              final boolean writeHeader) {
45          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
46      }
47  
48      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
49              final boolean writeHeader, final int bufferSize) {
50          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
51      }
52  
53      /**
54       * @since 2.6
55       * @deprecated
56       */
57      @Deprecated
58      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
59              final boolean writeHeader, final ByteBuffer byteBuffer) {
60          super(null, streamName);
61          this.outputStream = os;
62          this.layout = layout;
63          if (writeHeader && layout != null) {
64              final byte[] header = layout.getHeader();
65              if (header != null) {
66                  try {
67                      getOutputStream().write(header, 0, header.length);
68                  } catch (final IOException e) {
69                      logError("Unable to write header", e);
70                  }
71              }
72          }
73          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
74      }
75  
76      /**
77       * @since 2.7
78       */
79      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
80              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
81              final ByteBuffer byteBuffer) {
82          super(loggerContext, streamName);
83          if (createOnDemand && os != null) {
84              LOGGER.error(
85                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
86                      streamName);
87          }
88          this.layout = layout;
89          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
90          this.outputStream = os;
91          if (writeHeader && layout != null) {
92              final byte[] header = layout.getHeader();
93              if (header != null) {
94                  try {
95                      getOutputStream().write(header, 0, header.length);
96                  } catch (final IOException e) {
97                      logError("Unable to write header for " + streamName, e);
98                  }
99              }
100         }
101     }
102 
103     /**
104      * Creates a Manager.
105      *
106      * @param name The name of the stream to manage.
107      * @param data The data to pass to the Manager.
108      * @param factory The factory to use to create the Manager.
109      * @param <T> The type of the OutputStreamManager.
110      * @return An OutputStreamManager.
111      */
112     public static <T> OutputStreamManager getManager(final String name, final T data,
113                                                  final ManagerFactory<? extends OutputStreamManager, T> factory) {
114         return AbstractManager.getManager(name, factory, data);
115     }
116 
117     @SuppressWarnings("unused")
118     protected OutputStream createOutputStream() throws IOException {
119         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
120     }
121 
122     /**
123      * Indicate whether the footer should be skipped or not.
124      * @param skipFooter true if the footer should be skipped.
125      */
126     public void skipFooter(final boolean skipFooter) {
127         this.skipFooter = skipFooter;
128     }
129 
130     /**
131      * Default hook to write footer during close.
132      */
133     @Override
134     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
135         writeFooter();
136         return closeOutputStream();
137     }
138 
139     /**
140      * Writes the footer.
141      */
142     protected void writeFooter() {
143         if (layout == null || skipFooter) {
144             return;
145         }
146         final byte[] footer = layout.getFooter();
147         if (footer != null) {
148             write(footer);
149         }
150     }
151 
152     /**
153      * Returns the status of the stream.
154      * @return true if the stream is open, false if it is not.
155      */
156     public boolean isOpen() {
157         return getCount() > 0;
158     }
159 
160     public boolean hasOutputStream() {
161         return outputStream != null;
162     }
163 
164     protected OutputStream getOutputStream() throws IOException {
165         if (outputStream == null) {
166             outputStream = createOutputStream();
167         }
168         return outputStream;
169     }
170 
171     protected void setOutputStream(final OutputStream os) {
172         final byte[] header = layout.getHeader();
173         if (header != null) {
174             try {
175                 os.write(header, 0, header.length);
176                 this.outputStream = os; // only update field if os.write() succeeded
177             } catch (final IOException ioe) {
178                 logError("Unable to write header", ioe);
179             }
180         } else {
181             this.outputStream = os;
182         }
183     }
184 
185     /**
186      * Some output streams synchronize writes while others do not.
187      * @param bytes The serialized Log event.
188      * @throws AppenderLoggingException if an error occurs.
189      */
190     protected void write(final byte[] bytes)  {
191         write(bytes, 0, bytes.length, false);
192     }
193 
194     /**
195      * Some output streams synchronize writes while others do not.
196      * @param bytes The serialized Log event.
197      * @param immediateFlush If true, flushes after writing.
198      * @throws AppenderLoggingException if an error occurs.
199      */
200     protected void write(final byte[] bytes, final boolean immediateFlush)  {
201         write(bytes, 0, bytes.length, immediateFlush);
202     }
203 
204     @Override
205     public void writeBytes(final byte[] data, final int offset, final int length) {
206         write(data, offset, length, false);
207     }
208 
209     /**
210      * Some output streams synchronize writes while others do not. Synchronizing here insures that
211      * log events won't be intertwined.
212      * @param bytes The serialized Log event.
213      * @param offset The offset into the byte array.
214      * @param length The number of bytes to write.
215      * @throws AppenderLoggingException if an error occurs.
216      */
217     protected void write(final byte[] bytes, final int offset, final int length) {
218         writeBytes(bytes, offset, length);
219     }
220 
221     /**
222      * Some output streams synchronize writes while others do not. Synchronizing here insures that
223      * log events won't be intertwined.
224      * @param bytes The serialized Log event.
225      * @param offset The offset into the byte array.
226      * @param length The number of bytes to write.
227      * @param immediateFlush flushes immediately after writing.
228      * @throws AppenderLoggingException if an error occurs.
229      */
230     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
231         if (immediateFlush && byteBuffer.position() == 0) {
232             writeToDestination(bytes, offset, length);
233             flushDestination();
234             return;
235         }
236         if (length >= byteBuffer.capacity()) {
237             // if request length exceeds buffer capacity, flush the buffer and write the data directly
238             flush();
239             writeToDestination(bytes, offset, length);
240         } else {
241             if (length > byteBuffer.remaining()) {
242                 flush();
243             }
244             byteBuffer.put(bytes, offset, length);
245         }
246         if (immediateFlush) {
247             flush();
248         }
249     }
250 
251     /**
252      * Writes the specified section of the specified byte array to the stream.
253      *
254      * @param bytes the array containing data
255      * @param offset from where to write
256      * @param length how many bytes to write
257      * @since 2.6
258      */
259     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
260         try {
261             getOutputStream().write(bytes, offset, length);
262         } catch (final IOException ex) {
263             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
264         }
265     }
266 
267     /**
268      * Calls {@code flush()} on the underlying output stream.
269      * @since 2.6
270      */
271     protected synchronized void flushDestination() {
272         final OutputStream stream = outputStream; // access volatile field only once per method
273         if (stream != null) {
274             try {
275                 stream.flush();
276             } catch (final IOException ex) {
277                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
278             }
279         }
280     }
281 
282     /**
283      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
284      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
285      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
286      *
287      * @see #flushDestination()
288      * @since 2.6
289      */
290     protected synchronized void flushBuffer(final ByteBuffer buf) {
291         ((Buffer) buf).flip();
292         if (buf.remaining() > 0) {
293             writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
294         }
295         buf.clear();
296     }
297 
298     /**
299      * Flushes any buffers.
300      */
301     public synchronized void flush() {
302         flushBuffer(byteBuffer);
303         flushDestination();
304     }
305 
306     protected synchronized boolean closeOutputStream() {
307         flush();
308         final OutputStream stream = outputStream; // access volatile field only once per method
309         if (stream == null || stream == System.out || stream == System.err) {
310             return true;
311         }
312         try {
313             stream.close();
314         } catch (final IOException ex) {
315             logError("Unable to close stream", ex);
316             return false;
317         }
318         return true;
319     }
320 
321     /**
322      * Returns this {@code ByteBufferDestination}'s buffer.
323      * @return the buffer
324      * @since 2.6
325      */
326     @Override
327     public ByteBuffer getByteBuffer() {
328         return byteBuffer;
329     }
330 
331     /**
332      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
333      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
334      * <p>
335      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
336      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
337      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
338      * buffer.
339      * </p><p>
340      * To just flush the buffered contents to the underlying stream, call
341      * {@link #flushBuffer(ByteBuffer)} directly instead.
342      * </p>
343      *
344      * @param buf the buffer whose contents to write the the destination
345      * @return the specified buffer
346      * @since 2.6
347      */
348     @Override
349     public ByteBuffer drain(final ByteBuffer buf) {
350         flushBuffer(buf);
351         return buf;
352     }
353 
354     @Override
355     public void writeBytes(final ByteBuffer data) {
356         if (data.remaining() == 0) {
357           return;
358         }
359         synchronized (this) {
360           ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
361         }
362     }
363 }