001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.config;
018
019import java.util.Date;
020import java.util.Queue;
021import java.util.concurrent.Callable;
022import java.util.concurrent.ScheduledExecutorService;
023import java.util.concurrent.ScheduledFuture;
024import java.util.concurrent.ScheduledThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.logging.log4j.Logger;
028import org.apache.logging.log4j.core.AbstractLifeCycle;
029import org.apache.logging.log4j.core.util.CronExpression;
030import org.apache.logging.log4j.core.util.Log4jThreadFactory;
031import org.apache.logging.log4j.status.StatusLogger;
032
033/**
034 *
035 */
036public class ConfigurationScheduler extends AbstractLifeCycle {
037
038    private static final Logger LOGGER = StatusLogger.getLogger();
039    private static final String SIMPLE_NAME = "Log4j2 " + ConfigurationScheduler.class.getSimpleName();
040    private static final int MAX_SCHEDULED_ITEMS = 5;
041
042    private ScheduledExecutorService executorService;
043    private int scheduledItems = 0;
044    private final String name;
045
046    public ConfigurationScheduler() {
047        this(SIMPLE_NAME);
048    }
049
050    public ConfigurationScheduler(final String name) {
051        super();
052        this.name = name;
053    }
054
055    @Override
056    public void start() {
057        super.start();
058    }
059
060    @Override
061    public boolean stop(final long timeout, final TimeUnit timeUnit) {
062        setStopping();
063        if (isExecutorServiceSet()) {
064            LOGGER.debug("{} shutting down threads in {}", name, getExecutorService());
065            executorService.shutdown();
066            try {
067                executorService.awaitTermination(timeout, timeUnit);
068            } catch (final InterruptedException ie) {
069                executorService.shutdownNow();
070                try {
071                    executorService.awaitTermination(timeout, timeUnit);
072                } catch (final InterruptedException inner) {
073                    LOGGER.warn("{} stopped but some scheduled services may not have completed.", name);
074                }
075                // Preserve interrupt status
076                Thread.currentThread().interrupt();
077            }
078        }
079        setStopped();
080        return true;
081    }
082
083    public boolean isExecutorServiceSet() {
084        return executorService != null;
085    }
086
087    /**
088     * Increment the number of threads in the pool.
089     */
090    public void incrementScheduledItems() {
091        if (isExecutorServiceSet()) {
092            LOGGER.error("{} attempted to increment scheduled items after start", name);
093        } else {
094            ++scheduledItems;
095        }
096    }
097
098    /**
099     * Decrement the number of threads in the pool
100     */
101    public void decrementScheduledItems() {
102        if (!isStarted() && scheduledItems > 0) {
103            --scheduledItems;
104        }
105    }
106
107    /**
108     * Creates and executes a ScheduledFuture that becomes enabled after the given delay.
109     * @param <V> The result type returned by this Future
110     * @param callable the function to execute.
111     * @param delay the time from now to delay execution.
112     * @param unit the time unit of the delay parameter.
113     * @return a ScheduledFuture that can be used to extract result or cancel.
114     *
115     */
116    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
117        return getExecutorService().schedule(callable, delay, unit);
118    }
119
120    /**
121     * Creates and executes a one-shot action that becomes enabled after the given delay.
122     * @param command the task to execute.
123     * @param delay the time from now to delay execution.
124     * @param unit the time unit of the delay parameter.
125     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null
126     * upon completion.
127     */
128    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
129        return getExecutorService().schedule(command, delay, unit);
130    }
131
132
133    /**
134     * Creates and executes an action that first based on a cron expression.
135     * @param cronExpression the cron expression describing the schedule.
136     * @param command The Runnable to run,
137     * @return a ScheduledFuture representing the next time the command will run.
138     */
139    public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Runnable command) {
140        return scheduleWithCron(cronExpression, new Date(), command);
141    }
142
143    /**
144     * Creates and executes an action that first based on a cron expression.
145     * @param cronExpression the cron expression describing the schedule.
146     * @param startDate The time to use as the time to begin the cron expression. Defaults to the current date and time.
147     * @param command The Runnable to run,
148     * @return a ScheduledFuture representing the next time the command will run.
149     */
150    public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Date startDate, final Runnable command) {
151        final Date fireDate = cronExpression.getNextValidTimeAfter(startDate == null ? new Date() : startDate);
152        final CronRunnable runnable = new CronRunnable(command, cronExpression);
153        final ScheduledFuture<?> future = schedule(runnable, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
154        final CronScheduledFuture<?> cronScheduledFuture = new CronScheduledFuture<>(future, fireDate);
155        runnable.setScheduledFuture(cronScheduledFuture);
156        LOGGER.debug("{} scheduled cron expression {} to fire at {}", name, cronExpression.getCronExpression(), fireDate);
157        return cronScheduledFuture;
158    }
159
160
161    /**
162     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
163     * with the given period; that is executions will commence after initialDelay then initialDelay+period,
164     * then initialDelay + 2 * period, and so on.
165     * @param command the task to execute.
166     * @param initialDelay the time to delay first execution.
167     * @param period the period between successive executions.
168     * @param unit the time unit of the initialDelay and period parameters.
169     * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
170     * exception upon cancellation
171     */
172    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
173        return getExecutorService().scheduleAtFixedRate(command, initialDelay, period, unit);
174    }
175
176    /**
177     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and
178     * subsequently with the given delay between the termination of one execution and the commencement of the next.
179     * @param command the task to execute.
180     * @param initialDelay the time to delay first execution.
181     * @param delay the delay between the termination of one execution and the commencement of the next.
182     * @param unit the time unit of the initialDelay and delay parameters
183     * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
184     * exception upon cancellation
185     */
186    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
187        return getExecutorService().scheduleWithFixedDelay(command, initialDelay, delay, unit);
188    }
189
190    public long nextFireInterval(final Date fireDate) {
191        return fireDate.getTime() - new Date().getTime();
192    }
193
194    private ScheduledExecutorService getExecutorService() {
195        if (executorService == null) {
196            if (scheduledItems > 0) {
197                LOGGER.debug("{} starting {} threads", name, scheduledItems);
198                scheduledItems = Math.min(scheduledItems, MAX_SCHEDULED_ITEMS);
199                final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(scheduledItems,
200                        Log4jThreadFactory.createDaemonThreadFactory("Scheduled"));
201                executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
202                executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
203                this.executorService = executor;
204
205            } else {
206                LOGGER.debug("{}: No scheduled items", name);
207            }
208        }
209        return executorService;
210    }
211
212    public class CronRunnable implements Runnable {
213
214        private final CronExpression cronExpression;
215        private final Runnable runnable;
216        private CronScheduledFuture<?> scheduledFuture;
217
218        public CronRunnable(final Runnable runnable, final CronExpression cronExpression) {
219            this.cronExpression = cronExpression;
220            this.runnable = runnable;
221        }
222
223        public void setScheduledFuture(final CronScheduledFuture<?> future) {
224            this.scheduledFuture = future;
225        }
226
227        @Override
228        public void run() {
229            try {
230                final long millis = scheduledFuture.getFireTime().getTime() - System.currentTimeMillis();
231                if (millis > 0) {
232                    LOGGER.debug("{} Cron thread woke up {} millis early. Sleeping", name, millis);
233                    try {
234                        Thread.sleep(millis);
235                    } catch (final InterruptedException ie) {
236                        // Ignore the interruption.
237                    }
238                }
239                runnable.run();
240            } catch(final Throwable ex) {
241                LOGGER.error("{} caught error running command", name, ex);
242            } finally {
243                final Date fireDate = cronExpression.getNextValidTimeAfter(new Date());
244                final ScheduledFuture<?> future = schedule(this, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
245                LOGGER.debug("{} Cron expression {} scheduled to fire again at {}", name, cronExpression.getCronExpression(),
246                        fireDate);
247                scheduledFuture.reset(future, fireDate);
248            }
249        }
250
251        @Override
252        public String toString() {
253            return "CronRunnable{" + cronExpression.getCronExpression() + " - " + scheduledFuture.getFireTime();
254        }
255    }
256
257    @Override
258    public String toString() {
259        final StringBuilder sb = new StringBuilder("ConfigurationScheduler [name=");
260        sb.append(name);
261        sb.append(", [");
262        if (executorService != null) {
263            final Queue<Runnable> queue = ((ScheduledThreadPoolExecutor) executorService).getQueue();
264            boolean first = true;
265            for (final Runnable runnable : queue) {
266                if (!first) {
267                    sb.append(", ");
268                }
269                sb.append(runnable.toString());
270                first = false;
271            }
272        }
273        sb.append("]");
274        return sb.toString();
275    }
276
277}