001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.config; 018 019import java.util.Date; 020import java.util.Queue; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ScheduledExecutorService; 023import java.util.concurrent.ScheduledFuture; 024import java.util.concurrent.ScheduledThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.logging.log4j.Logger; 028import org.apache.logging.log4j.core.AbstractLifeCycle; 029import org.apache.logging.log4j.core.util.CronExpression; 030import org.apache.logging.log4j.core.util.Log4jThreadFactory; 031import org.apache.logging.log4j.status.StatusLogger; 032 033/** 034 * 035 */ 036public class ConfigurationScheduler extends AbstractLifeCycle { 037 038 private static final Logger LOGGER = StatusLogger.getLogger(); 039 private static final String SIMPLE_NAME = "Log4j2 " + ConfigurationScheduler.class.getSimpleName(); 040 private static final int MAX_SCHEDULED_ITEMS = 5; 041 042 private volatile ScheduledExecutorService executorService; 043 private int scheduledItems = 0; 044 private final String name; 045 046 public ConfigurationScheduler() { 047 this(SIMPLE_NAME); 048 } 049 050 public ConfigurationScheduler(final String name) { 051 this.name = name; 052 } 053 054 @Override 055 public void start() { 056 super.start(); 057 } 058 059 @Override 060 public boolean stop(final long timeout, final TimeUnit timeUnit) { 061 setStopping(); 062 if (isExecutorServiceSet()) { 063 LOGGER.debug("{} shutting down threads in {}", name, getExecutorService()); 064 executorService.shutdown(); 065 try { 066 executorService.awaitTermination(timeout, timeUnit); 067 } catch (final InterruptedException ie) { 068 executorService.shutdownNow(); 069 try { 070 executorService.awaitTermination(timeout, timeUnit); 071 } catch (final InterruptedException inner) { 072 LOGGER.warn("{} stopped but some scheduled services may not have completed.", name); 073 } 074 // Preserve interrupt status 075 Thread.currentThread().interrupt(); 076 } 077 } 078 setStopped(); 079 return true; 080 } 081 082 public boolean isExecutorServiceSet() { 083 return executorService != null; 084 } 085 086 /** 087 * Increment the number of threads in the pool. 088 */ 089 public void incrementScheduledItems() { 090 if (isExecutorServiceSet()) { 091 LOGGER.error("{} attempted to increment scheduled items after start", name); 092 } else { 093 ++scheduledItems; 094 } 095 } 096 097 /** 098 * Decrement the number of threads in the pool 099 */ 100 public void decrementScheduledItems() { 101 if (!isStarted() && scheduledItems > 0) { 102 --scheduledItems; 103 } 104 } 105 106 /** 107 * Creates and executes a ScheduledFuture that becomes enabled after the given delay. 108 * @param <V> The result type returned by this Future 109 * @param callable the function to execute. 110 * @param delay the time from now to delay execution. 111 * @param unit the time unit of the delay parameter. 112 * @return a ScheduledFuture that can be used to extract result or cancel. 113 * 114 */ 115 public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { 116 return getExecutorService().schedule(callable, delay, unit); 117 } 118 119 /** 120 * Creates and executes a one-shot action that becomes enabled after the given delay. 121 * @param command the task to execute. 122 * @param delay the time from now to delay execution. 123 * @param unit the time unit of the delay parameter. 124 * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null 125 * upon completion. 126 */ 127 public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { 128 return getExecutorService().schedule(command, delay, unit); 129 } 130 131 132 /** 133 * Creates and executes an action that first based on a cron expression. 134 * @param cronExpression the cron expression describing the schedule. 135 * @param command The Runnable to run, 136 * @return a ScheduledFuture representing the next time the command will run. 137 */ 138 public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Runnable command) { 139 return scheduleWithCron(cronExpression, new Date(), command); 140 } 141 142 /** 143 * Creates and executes an action that first based on a cron expression. 144 * @param cronExpression the cron expression describing the schedule. 145 * @param startDate The time to use as the time to begin the cron expression. Defaults to the current date and time. 146 * @param command The Runnable to run, 147 * @return a ScheduledFuture representing the next time the command will run. 148 */ 149 public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Date startDate, final Runnable command) { 150 final Date fireDate = cronExpression.getNextValidTimeAfter(startDate == null ? new Date() : startDate); 151 final CronRunnable runnable = new CronRunnable(command, cronExpression); 152 final ScheduledFuture<?> future = schedule(runnable, nextFireInterval(fireDate), TimeUnit.MILLISECONDS); 153 final CronScheduledFuture<?> cronScheduledFuture = new CronScheduledFuture<>(future, fireDate); 154 runnable.setScheduledFuture(cronScheduledFuture); 155 LOGGER.debug("{} scheduled cron expression {} to fire at {}", name, cronExpression.getCronExpression(), fireDate); 156 return cronScheduledFuture; 157 } 158 159 160 /** 161 * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently 162 * with the given period; that is executions will commence after initialDelay then initialDelay+period, 163 * then initialDelay + 2 * period, and so on. 164 * @param command the task to execute. 165 * @param initialDelay the time to delay first execution. 166 * @param period the period between successive executions. 167 * @param unit the time unit of the initialDelay and period parameters. 168 * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an 169 * exception upon cancellation 170 */ 171 public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { 172 return getExecutorService().scheduleAtFixedRate(command, initialDelay, period, unit); 173 } 174 175 /** 176 * Creates and executes a periodic action that becomes enabled first after the given initial delay, and 177 * subsequently with the given delay between the termination of one execution and the commencement of the next. 178 * @param command the task to execute. 179 * @param initialDelay the time to delay first execution. 180 * @param delay the delay between the termination of one execution and the commencement of the next. 181 * @param unit the time unit of the initialDelay and delay parameters 182 * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an 183 * exception upon cancellation 184 */ 185 public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { 186 return getExecutorService().scheduleWithFixedDelay(command, initialDelay, delay, unit); 187 } 188 189 public long nextFireInterval(final Date fireDate) { 190 return fireDate.getTime() - new Date().getTime(); 191 } 192 193 private ScheduledExecutorService getExecutorService() { 194 if (executorService == null) { 195 synchronized (this) { 196 if (executorService == null) { 197 if (scheduledItems > 0) { 198 LOGGER.debug("{} starting {} threads", name, scheduledItems); 199 scheduledItems = Math.min(scheduledItems, MAX_SCHEDULED_ITEMS); 200 final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(scheduledItems, 201 Log4jThreadFactory.createDaemonThreadFactory("Scheduled")); 202 executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 203 executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 204 this.executorService = executor; 205 206 } else { 207 LOGGER.debug("{}: No scheduled items", name); 208 } 209 } 210 } 211 } 212 return executorService; 213 } 214 215 public class CronRunnable implements Runnable { 216 217 private final CronExpression cronExpression; 218 private final Runnable runnable; 219 private CronScheduledFuture<?> scheduledFuture; 220 221 public CronRunnable(final Runnable runnable, final CronExpression cronExpression) { 222 this.cronExpression = cronExpression; 223 this.runnable = runnable; 224 } 225 226 public void setScheduledFuture(final CronScheduledFuture<?> future) { 227 this.scheduledFuture = future; 228 } 229 230 @Override 231 public void run() { 232 try { 233 final long millis = scheduledFuture.getFireTime().getTime() - System.currentTimeMillis(); 234 if (millis > 0) { 235 LOGGER.debug("{} Cron thread woke up {} millis early. Sleeping", name, millis); 236 try { 237 Thread.sleep(millis); 238 } catch (final InterruptedException ie) { 239 // Ignore the interruption. 240 } 241 } 242 runnable.run(); 243 } catch(final Throwable ex) { 244 LOGGER.error("{} caught error running command", name, ex); 245 } finally { 246 final Date fireDate = cronExpression.getNextValidTimeAfter(new Date()); 247 final ScheduledFuture<?> future = schedule(this, nextFireInterval(fireDate), TimeUnit.MILLISECONDS); 248 LOGGER.debug("{} Cron expression {} scheduled to fire again at {}", name, cronExpression.getCronExpression(), 249 fireDate); 250 scheduledFuture.reset(future, fireDate); 251 } 252 } 253 254 @Override 255 public String toString() { 256 return "CronRunnable{" + cronExpression.getCronExpression() + " - " + scheduledFuture.getFireTime(); 257 } 258 } 259 260 @Override 261 public String toString() { 262 final StringBuilder sb = new StringBuilder("ConfigurationScheduler [name="); 263 sb.append(name); 264 sb.append(", ["); 265 if (executorService != null) { 266 final Queue<Runnable> queue = ((ScheduledThreadPoolExecutor) executorService).getQueue(); 267 boolean first = true; 268 for (final Runnable runnable : queue) { 269 if (!first) { 270 sb.append(", "); 271 } 272 sb.append(runnable.toString()); 273 first = false; 274 } 275 } 276 sb.append("]"); 277 return sb.toString(); 278 } 279 280}