View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.config;
18  
19  import java.util.Date;
20  import java.util.Queue;
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.concurrent.ScheduledFuture;
24  import java.util.concurrent.ScheduledThreadPoolExecutor;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.logging.log4j.Logger;
28  import org.apache.logging.log4j.core.AbstractLifeCycle;
29  import org.apache.logging.log4j.core.util.CronExpression;
30  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
31  import org.apache.logging.log4j.status.StatusLogger;
32  
33  /**
34   *
35   */
36  public class ConfigurationScheduler extends AbstractLifeCycle {
37  
38      private static final Logger LOGGER = StatusLogger.getLogger();
39      private static final String SIMPLE_NAME = "Log4j2 " + ConfigurationScheduler.class.getSimpleName();
40      private static final int MAX_SCHEDULED_ITEMS = 5;
41  
42      private ScheduledExecutorService executorService;
43      private int scheduledItems = 0;
44      private final String name;
45  
46      public ConfigurationScheduler() {
47          this(SIMPLE_NAME);
48      }
49  
50      public ConfigurationScheduler(final String name) {
51          super();
52          this.name = name;
53      }
54  
55      @Override
56      public void start() {
57          super.start();
58      }
59  
60      @Override
61      public boolean stop(final long timeout, final TimeUnit timeUnit) {
62          setStopping();
63          if (isExecutorServiceSet()) {
64              LOGGER.debug("{} shutting down threads in {}", name, getExecutorService());
65              executorService.shutdown();
66              try {
67                  executorService.awaitTermination(timeout, timeUnit);
68              } catch (final InterruptedException ie) {
69                  executorService.shutdownNow();
70                  try {
71                      executorService.awaitTermination(timeout, timeUnit);
72                  } catch (final InterruptedException inner) {
73                      LOGGER.warn("{} stopped but some scheduled services may not have completed.", name);
74                  }
75                  // Preserve interrupt status
76                  Thread.currentThread().interrupt();
77              }
78          }
79          setStopped();
80          return true;
81      }
82  
83      public boolean isExecutorServiceSet() {
84          return executorService != null;
85      }
86  
87      /**
88       * Increment the number of threads in the pool.
89       */
90      public void incrementScheduledItems() {
91          if (isExecutorServiceSet()) {
92              LOGGER.error("{} attempted to increment scheduled items after start", name);
93          } else {
94              ++scheduledItems;
95          }
96      }
97  
98      /**
99       * Decrement the number of threads in the pool
100      */
101     public void decrementScheduledItems() {
102         if (!isStarted() && scheduledItems > 0) {
103             --scheduledItems;
104         }
105     }
106 
107     /**
108      * Creates and executes a ScheduledFuture that becomes enabled after the given delay.
109      * @param <V> The result type returned by this Future
110      * @param callable the function to execute.
111      * @param delay the time from now to delay execution.
112      * @param unit the time unit of the delay parameter.
113      * @return a ScheduledFuture that can be used to extract result or cancel.
114      *
115      */
116     public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
117         return getExecutorService().schedule(callable, delay, unit);
118     }
119 
120     /**
121      * Creates and executes a one-shot action that becomes enabled after the given delay.
122      * @param command the task to execute.
123      * @param delay the time from now to delay execution.
124      * @param unit the time unit of the delay parameter.
125      * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null
126      * upon completion.
127      */
128     public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
129         return getExecutorService().schedule(command, delay, unit);
130     }
131 
132 
133     /**
134      * Creates and executes an action that first based on a cron expression.
135      * @param cronExpression the cron expression describing the schedule.
136      * @param command The Runnable to run,
137      * @return a ScheduledFuture representing the next time the command will run.
138      */
139     public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Runnable command) {
140         return scheduleWithCron(cronExpression, new Date(), command);
141     }
142 
143     /**
144      * Creates and executes an action that first based on a cron expression.
145      * @param cronExpression the cron expression describing the schedule.
146      * @param startDate The time to use as the time to begin the cron expression. Defaults to the current date and time.
147      * @param command The Runnable to run,
148      * @return a ScheduledFuture representing the next time the command will run.
149      */
150     public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Date startDate, final Runnable command) {
151         final Date fireDate = cronExpression.getNextValidTimeAfter(startDate == null ? new Date() : startDate);
152         final CronRunnable runnable = new CronRunnable(command, cronExpression);
153         final ScheduledFuture<?> future = schedule(runnable, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
154         final CronScheduledFuture<?> cronScheduledFuture = new CronScheduledFuture<>(future, fireDate);
155         runnable.setScheduledFuture(cronScheduledFuture);
156         LOGGER.debug("{} scheduled cron expression {} to fire at {}", name, cronExpression.getCronExpression(), fireDate);
157         return cronScheduledFuture;
158     }
159 
160 
161     /**
162      * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
163      * with the given period; that is executions will commence after initialDelay then initialDelay+period,
164      * then initialDelay + 2 * period, and so on.
165      * @param command the task to execute.
166      * @param initialDelay the time to delay first execution.
167      * @param period the period between successive executions.
168      * @param unit the time unit of the initialDelay and period parameters.
169      * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
170      * exception upon cancellation
171      */
172     public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
173         return getExecutorService().scheduleAtFixedRate(command, initialDelay, period, unit);
174     }
175 
176     /**
177      * Creates and executes a periodic action that becomes enabled first after the given initial delay, and
178      * subsequently with the given delay between the termination of one execution and the commencement of the next.
179      * @param command the task to execute.
180      * @param initialDelay the time to delay first execution.
181      * @param delay the delay between the termination of one execution and the commencement of the next.
182      * @param unit the time unit of the initialDelay and delay parameters
183      * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an
184      * exception upon cancellation
185      */
186     public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
187         return getExecutorService().scheduleWithFixedDelay(command, initialDelay, delay, unit);
188     }
189 
190     public long nextFireInterval(final Date fireDate) {
191         return fireDate.getTime() - new Date().getTime();
192     }
193 
194     private ScheduledExecutorService getExecutorService() {
195         if (executorService == null) {
196             if (scheduledItems > 0) {
197                 LOGGER.debug("{} starting {} threads", name, scheduledItems);
198                 scheduledItems = Math.min(scheduledItems, MAX_SCHEDULED_ITEMS);
199                 final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(scheduledItems,
200                         Log4jThreadFactory.createDaemonThreadFactory("Scheduled"));
201                 executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
202                 executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
203                 this.executorService = executor;
204 
205             } else {
206                 LOGGER.debug("{}: No scheduled items", name);
207             }
208         }
209         return executorService;
210     }
211 
212     public class CronRunnable implements Runnable {
213 
214         private final CronExpression cronExpression;
215         private final Runnable runnable;
216         private CronScheduledFuture<?> scheduledFuture;
217 
218         public CronRunnable(final Runnable runnable, final CronExpression cronExpression) {
219             this.cronExpression = cronExpression;
220             this.runnable = runnable;
221         }
222 
223         public void setScheduledFuture(final CronScheduledFuture<?> future) {
224             this.scheduledFuture = future;
225         }
226 
227         @Override
228         public void run() {
229             try {
230                 final long millis = scheduledFuture.getFireTime().getTime() - System.currentTimeMillis();
231                 if (millis > 0) {
232                     LOGGER.debug("{} Cron thread woke up {} millis early. Sleeping", name, millis);
233                     try {
234                         Thread.sleep(millis);
235                     } catch (final InterruptedException ie) {
236                         // Ignore the interruption.
237                     }
238                 }
239                 runnable.run();
240             } catch(final Throwable ex) {
241                 LOGGER.error("{} caught error running command", name, ex);
242             } finally {
243                 final Date fireDate = cronExpression.getNextValidTimeAfter(new Date());
244                 final ScheduledFuture<?> future = schedule(this, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
245                 LOGGER.debug("{} Cron expression {} scheduled to fire again at {}", name, cronExpression.getCronExpression(),
246                         fireDate);
247                 scheduledFuture.reset(future, fireDate);
248             }
249         }
250 
251         @Override
252         public String toString() {
253             return "CronRunnable{" + cronExpression.getCronExpression() + " - " + scheduledFuture.getFireTime();
254         }
255     }
256 
257     @Override
258     public String toString() {
259         final StringBuilder sb = new StringBuilder("ConfigurationScheduler [name=");
260         sb.append(name);
261         sb.append(", [");
262         if (executorService != null) {
263             final Queue<Runnable> queue = ((ScheduledThreadPoolExecutor) executorService).getQueue();
264             boolean first = true;
265             for (final Runnable runnable : queue) {
266                 if (!first) {
267                     sb.append(", ");
268                 }
269                 sb.append(runnable.toString());
270                 first = false;
271             }
272         }
273         sb.append("]");
274         return sb.toString();
275     }
276 
277 }