001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.config;
018
019import java.util.Date;
020import java.util.Queue;
021import java.util.concurrent.Callable;
022import java.util.concurrent.ScheduledExecutorService;
023import java.util.concurrent.ScheduledFuture;
024import java.util.concurrent.ScheduledThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.logging.log4j.Logger;
028import org.apache.logging.log4j.core.AbstractLifeCycle;
029import org.apache.logging.log4j.core.util.CronExpression;
030import org.apache.logging.log4j.core.util.Log4jThreadFactory;
031import org.apache.logging.log4j.status.StatusLogger;
032
033/**
034 *
035 */
036public class ConfigurationScheduler extends AbstractLifeCycle {
037
038    private static final Logger LOGGER = StatusLogger.getLogger();
039    private static final String SIMPLE_NAME = "Log4j2 " + ConfigurationScheduler.class.getSimpleName();
040    private static final int MAX_SCHEDULED_ITEMS = 5;
041
042    private volatile ScheduledExecutorService executorService;
043    private int scheduledItems = 0;
044    private final String name;
045
046    public ConfigurationScheduler() {
047        this(SIMPLE_NAME);
048    }
049
050    public ConfigurationScheduler(final String name) {
051        this.name = name;
052    }
053
054    @Override
055    public void start() {
056        super.start();
057    }
058
059    @Override
060    public boolean stop(final long timeout, final TimeUnit timeUnit) {
061        setStopping();
062        if (isExecutorServiceSet()) {
063            LOGGER.debug("{} shutting down threads in {}", name, getExecutorService());
064            executorService.shutdown();
065            try {
066                executorService.awaitTermination(timeout, timeUnit);
067            } catch (final InterruptedException ie) {
068                executorService.shutdownNow();
069                try {
070                    executorService.awaitTermination(timeout, timeUnit);
071                } catch (final InterruptedException inner) {
072                    LOGGER.warn("{} stopped but some scheduled services may not have completed.", name);
073                }
074                // Preserve interrupt status
075                Thread.currentThread().interrupt();
076            }
077        }
078        setStopped();
079        return true;
080    }
081
082    public boolean isExecutorServiceSet() {
083        return executorService != null;
084    }
085
086    /**
087     * Increment the number of threads in the pool.
088     */
089    public void incrementScheduledItems() {
090        if (isExecutorServiceSet()) {
091            LOGGER.error("{} attempted to increment scheduled items after start", name);
092        } else {
093            ++scheduledItems;
094        }
095    }
096
097    /**
098     * Decrement the number of threads in the pool
099     */
100    public void decrementScheduledItems() {
101        if (!isStarted() && scheduledItems > 0) {
102            --scheduledItems;
103        }
104    }
105
106    /**
107     * Creates and executes a ScheduledFuture that becomes enabled after the given delay.
108     * @param <V> The result type returned by this Future
109     * @param callable the function to execute.
110     * @param delay the time from now to delay execution.
111     * @param unit the time unit of the delay parameter.
112     * @return a ScheduledFuture that can be used to extract result or cancel.
113     *
114     */
115    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
116        return getExecutorService().schedule(callable, delay, unit);
117    }
118
119    /**
120     * Creates and executes a one-shot action that becomes enabled after the given delay.
121     * @param command the task to execute.
122     * @param delay the time from now to delay execution.
123     * @param unit the time unit of the delay parameter.
124     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null
125     * upon completion.
126     */
127    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
128        return getExecutorService().schedule(command, delay, unit);
129    }
130
131
132    /**
133     * Creates and executes an action that first based on a cron expression.
134     * @param cronExpression the cron expression describing the schedule.
135     * @param command The Runnable to run,
136     * @return a ScheduledFuture representing the next time the command will run.
137     */
138    public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Runnable command) {
139        return scheduleWithCron(cronExpression, new Date(), command);
140    }
141
142    /**
143     * Creates and executes an action that first based on a cron expression.
144     * @param cronExpression the cron expression describing the schedule.
145     * @param startDate The time to use as the time to begin the cron expression. Defaults to the current date and time.
146     * @param command The Runnable to run,
147     * @return a ScheduledFuture representing the next time the command will run.
148     */
149    public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Date startDate, final Runnable command) {
150        final Date fireDate = cronExpression.getNextValidTimeAfter(startDate == null ? new Date() : startDate);
151        final CronRunnable runnable = new CronRunnable(command, cronExpression);
152        final ScheduledFuture<?> future = schedule(runnable, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
153        final CronScheduledFuture<?> cronScheduledFuture = new CronScheduledFuture<>(future, fireDate);
154        runnable.setScheduledFuture(cronScheduledFuture);
155        LOGGER.debug("{} scheduled cron expression {} to fire at {}", name, cronExpression.getCronExpression(), fireDate);
156        return cronScheduledFuture;
157    }
158
159
160    /**
161     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
162     * with the given period; that is executions will commence after initialDelay then initialDelay+period,
163     * then initialDelay + 2 * period, and so on.
164     * @param command the task to execute.
165     * @param initialDelay the time to delay first execution.
166     * @param period the period between successive executions.
167     * @param unit the time unit of the initialDelay and period parameters.
168     * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
169     * exception upon cancellation
170     */
171    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
172        return getExecutorService().scheduleAtFixedRate(command, initialDelay, period, unit);
173    }
174
175    /**
176     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and
177     * subsequently with the given delay between the termination of one execution and the commencement of the next.
178     * @param command the task to execute.
179     * @param initialDelay the time to delay first execution.
180     * @param delay the delay between the termination of one execution and the commencement of the next.
181     * @param unit the time unit of the initialDelay and delay parameters
182     * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
183     * exception upon cancellation
184     */
185    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
186        return getExecutorService().scheduleWithFixedDelay(command, initialDelay, delay, unit);
187    }
188
189    public long nextFireInterval(final Date fireDate) {
190        return fireDate.getTime() - new Date().getTime();
191    }
192
193    private ScheduledExecutorService getExecutorService() {
194        if (executorService == null) {
195            synchronized (this) {
196                if (executorService == null) {
197                    if (scheduledItems > 0) {
198                        LOGGER.debug("{} starting {} threads", name, scheduledItems);
199                        scheduledItems = Math.min(scheduledItems, MAX_SCHEDULED_ITEMS);
200                        final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(scheduledItems,
201                                Log4jThreadFactory.createDaemonThreadFactory("Scheduled"));
202                        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
203                        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
204                        this.executorService = executor;
205
206                    } else {
207                        LOGGER.debug("{}: No scheduled items", name);
208                    }
209                }
210            }
211        }
212        return executorService;
213    }
214
215    public class CronRunnable implements Runnable {
216
217        private final CronExpression cronExpression;
218        private final Runnable runnable;
219        private CronScheduledFuture<?> scheduledFuture;
220
221        public CronRunnable(final Runnable runnable, final CronExpression cronExpression) {
222            this.cronExpression = cronExpression;
223            this.runnable = runnable;
224        }
225
226        public void setScheduledFuture(final CronScheduledFuture<?> future) {
227            this.scheduledFuture = future;
228        }
229
230        @Override
231        public void run() {
232            try {
233                final long millis = scheduledFuture.getFireTime().getTime() - System.currentTimeMillis();
234                if (millis > 0) {
235                    LOGGER.debug("{} Cron thread woke up {} millis early. Sleeping", name, millis);
236                    try {
237                        Thread.sleep(millis);
238                    } catch (final InterruptedException ie) {
239                        // Ignore the interruption.
240                    }
241                }
242                runnable.run();
243            } catch(final Throwable ex) {
244                LOGGER.error("{} caught error running command", name, ex);
245            } finally {
246                final Date fireDate = cronExpression.getNextValidTimeAfter(new Date());
247                final ScheduledFuture<?> future = schedule(this, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
248                LOGGER.debug("{} Cron expression {} scheduled to fire again at {}", name, cronExpression.getCronExpression(),
249                        fireDate);
250                scheduledFuture.reset(future, fireDate);
251            }
252        }
253
254        @Override
255        public String toString() {
256            return "CronRunnable{" + cronExpression.getCronExpression() + " - " + scheduledFuture.getFireTime();
257        }
258    }
259
260    @Override
261    public String toString() {
262        final StringBuilder sb = new StringBuilder("ConfigurationScheduler [name=");
263        sb.append(name);
264        sb.append(", [");
265        if (executorService != null) {
266            final Queue<Runnable> queue = ((ScheduledThreadPoolExecutor) executorService).getQueue();
267            boolean first = true;
268            for (final Runnable runnable : queue) {
269                if (!first) {
270                    sb.append(", ");
271                }
272                sb.append(runnable.toString());
273                first = false;
274            }
275        }
276        sb.append("]");
277        return sb.toString();
278    }
279
280}