001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.nio.ByteBuffer;
023import java.util.Objects;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.logging.log4j.core.Layout;
027import org.apache.logging.log4j.core.LoggerContext;
028import org.apache.logging.log4j.core.layout.ByteBufferDestination;
029import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
030import org.apache.logging.log4j.core.util.Constants;
031
032/**
033 * Manages an OutputStream so that it can be shared by multiple Appenders and will
034 * allow appenders to reconfigure without requiring a new stream.
035 */
036public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
037    protected final Layout<?> layout;
038    protected ByteBuffer byteBuffer;
039    private volatile OutputStream outputStream;
040    private boolean skipFooter;
041
042    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
043            final boolean writeHeader) {
044        // Can't use new ctor because it throws an exception
045        this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
046    }
047
048    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
049            final boolean writeHeader, final int bufferSize) {
050        // Can't use new ctor because it throws an exception
051        this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
052    }
053
054    /**
055     * @since 2.6
056     * @deprecated
057     */
058    @Deprecated
059    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
060            final boolean writeHeader, final ByteBuffer byteBuffer) {
061        super(null, streamName);
062        this.outputStream = os;
063        this.layout = layout;
064        if (writeHeader && layout != null) {
065            final byte[] header = layout.getHeader();
066            if (header != null) {
067                try {
068                    getOutputStream().write(header, 0, header.length);
069                } catch (final IOException e) {
070                    logError("Unable to write header", e);
071                }
072            }
073        }
074        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
075    }
076
077    /**
078     * @since 2.7
079     */
080    protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
081            final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
082            final ByteBuffer byteBuffer) {
083        super(loggerContext, streamName);
084        if (createOnDemand && os != null) {
085            LOGGER.error(
086                    "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
087                    streamName);
088        }
089        this.layout = layout;
090        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
091        this.outputStream = os;
092        if (writeHeader && layout != null) {
093            final byte[] header = layout.getHeader();
094            if (header != null) {
095                try {
096                    getOutputStream().write(header, 0, header.length);
097                } catch (final IOException e) {
098                    logError("Unable to write header for " + streamName, e);
099                }
100            }
101        }
102    }
103
104    /**
105     * Creates a Manager.
106     *
107     * @param name The name of the stream to manage.
108     * @param data The data to pass to the Manager.
109     * @param factory The factory to use to create the Manager.
110     * @param <T> The type of the OutputStreamManager.
111     * @return An OutputStreamManager.
112     */
113    public static <T> OutputStreamManager getManager(final String name, final T data,
114                                                 final ManagerFactory<? extends OutputStreamManager, T> factory) {
115        return AbstractManager.getManager(name, factory, data);
116    }
117
118    @SuppressWarnings("unused")
119    protected OutputStream createOutputStream() throws IOException {
120        throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
121    }
122
123    /**
124     * Indicate whether the footer should be skipped or not.
125     * @param skipFooter true if the footer should be skipped.
126     */
127    public void skipFooter(final boolean skipFooter) {
128        this.skipFooter = skipFooter;
129    }
130
131    /**
132     * Default hook to write footer during close.
133     */
134    @Override
135    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
136        writeFooter();
137        return closeOutputStream();
138    }
139
140    /**
141     * Writes the footer.
142     */
143    protected void writeFooter() {
144        if (layout == null || skipFooter) {
145            return;
146        }
147        final byte[] footer = layout.getFooter();
148        if (footer != null) {
149            write(footer);
150        }
151    }
152
153    /**
154     * Returns the status of the stream.
155     * @return true if the stream is open, false if it is not.
156     */
157    public boolean isOpen() {
158        return getCount() > 0;
159    }
160
161    public boolean hasOutputStream() {
162        return outputStream != null;
163    }
164
165    protected OutputStream getOutputStream() throws IOException {
166        if (outputStream == null) {
167            outputStream = createOutputStream();
168        }
169        return outputStream;
170    }
171
172    protected void setOutputStream(final OutputStream os) {
173        final byte[] header = layout.getHeader();
174        if (header != null) {
175            try {
176                os.write(header, 0, header.length);
177                this.outputStream = os; // only update field if os.write() succeeded
178            } catch (final IOException ioe) {
179                logError("Unable to write header", ioe);
180            }
181        } else {
182            this.outputStream = os;
183        }
184    }
185
186    /**
187     * Some output streams synchronize writes while others do not.
188     * @param bytes The serialized Log event.
189     * @throws AppenderLoggingException if an error occurs.
190     */
191    protected void write(final byte[] bytes)  {
192        write(bytes, 0, bytes.length, false);
193    }
194
195    /**
196     * Some output streams synchronize writes while others do not.
197     * @param bytes The serialized Log event.
198     * @param immediateFlush If true, flushes after writing.
199     * @throws AppenderLoggingException if an error occurs.
200     */
201    protected void write(final byte[] bytes, final boolean immediateFlush)  {
202        write(bytes, 0, bytes.length, immediateFlush);
203    }
204
205    @Override
206    public void writeBytes(final byte[] data, final int offset, final int length) {
207        write(data, offset, length, false);
208    }
209
210    /**
211     * Some output streams synchronize writes while others do not. Synchronizing here insures that
212     * log events won't be intertwined.
213     * @param bytes The serialized Log event.
214     * @param offset The offset into the byte array.
215     * @param length The number of bytes to write.
216     * @throws AppenderLoggingException if an error occurs.
217     */
218    protected void write(final byte[] bytes, final int offset, final int length) {
219        writeBytes(bytes, offset, length);
220    }
221
222    /**
223     * Some output streams synchronize writes while others do not. Synchronizing here insures that
224     * log events won't be intertwined.
225     * @param bytes The serialized Log event.
226     * @param offset The offset into the byte array.
227     * @param length The number of bytes to write.
228     * @param immediateFlush flushes immediately after writing.
229     * @throws AppenderLoggingException if an error occurs.
230     */
231    protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
232        if (immediateFlush && byteBuffer.position() == 0) {
233            writeToDestination(bytes, offset, length);
234            flushDestination();
235            return;
236        }
237        if (length >= byteBuffer.capacity()) {
238            // if request length exceeds buffer capacity, flush the buffer and write the data directly
239            flush();
240            writeToDestination(bytes, offset, length);
241        } else {
242            if (length > byteBuffer.remaining()) {
243                flush();
244            }
245            byteBuffer.put(bytes, offset, length);
246        }
247        if (immediateFlush) {
248            flush();
249        }
250    }
251
252    /**
253     * Writes the specified section of the specified byte array to the stream.
254     *
255     * @param bytes the array containing data
256     * @param offset from where to write
257     * @param length how many bytes to write
258     * @since 2.6
259     */
260    protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
261        try {
262            getOutputStream().write(bytes, offset, length);
263        } catch (final IOException ex) {
264            throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
265        }
266    }
267
268    /**
269     * Calls {@code flush()} on the underlying output stream.
270     * @since 2.6
271     */
272    protected synchronized void flushDestination() {
273        final OutputStream stream = outputStream; // access volatile field only once per method
274        if (stream != null) {
275            try {
276                stream.flush();
277            } catch (final IOException ex) {
278                throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
279            }
280        }
281    }
282
283    /**
284     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
285     * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
286     * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
287     *
288     * @see #flushDestination()
289     * @since 2.6
290     */
291    protected synchronized void flushBuffer(final ByteBuffer buf) {
292        buf.flip();
293        if (buf.remaining() > 0) {
294            writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
295        }
296        buf.clear();
297    }
298
299    /**
300     * Flushes any buffers.
301     */
302    public synchronized void flush() {
303        flushBuffer(byteBuffer);
304        flushDestination();
305    }
306
307    protected synchronized boolean closeOutputStream() {
308        flush();
309        final OutputStream stream = outputStream; // access volatile field only once per method
310        if (stream == null || stream == System.out || stream == System.err) {
311            return true;
312        }
313        try {
314            stream.close();
315        } catch (final IOException ex) {
316            logError("Unable to close stream", ex);
317            return false;
318        }
319        return true;
320    }
321
322    /**
323     * Returns this {@code ByteBufferDestination}'s buffer.
324     * @return the buffer
325     * @since 2.6
326     */
327    @Override
328    public ByteBuffer getByteBuffer() {
329        return byteBuffer;
330    }
331
332    /**
333     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
334     * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
335     * <p>
336     * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
337     * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
338     * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
339     * buffer.
340     * </p><p>
341     * To just flush the buffered contents to the underlying stream, call
342     * {@link #flushBuffer(ByteBuffer)} directly instead.
343     * </p>
344     *
345     * @param buf the buffer whose contents to write the the destination
346     * @return the specified buffer
347     * @since 2.6
348     */
349    @Override
350    public ByteBuffer drain(final ByteBuffer buf) {
351        flushBuffer(buf);
352        return buf;
353    }
354
355    @Override
356    public void writeBytes(final ByteBuffer data) {
357        if (data.remaining() == 0) {
358          return;
359        }
360        synchronized (this) {
361          ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
362        }
363    }
364}