1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.config;
18
19 import java.util.Date;
20 import java.util.Queue;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.logging.log4j.Logger;
28 import org.apache.logging.log4j.core.AbstractLifeCycle;
29 import org.apache.logging.log4j.core.util.CronExpression;
30 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
31 import org.apache.logging.log4j.status.StatusLogger;
32
33
34
35
36 public class ConfigurationScheduler extends AbstractLifeCycle {
37
38 private static final Logger LOGGER = StatusLogger.getLogger();
39 private static final String SIMPLE_NAME = "Log4j2 " + ConfigurationScheduler.class.getSimpleName();
40 private static final int MAX_SCHEDULED_ITEMS = 5;
41
42 private ScheduledExecutorService executorService;
43 private int scheduledItems = 0;
44 private final String name;
45
46 public ConfigurationScheduler() {
47 this(SIMPLE_NAME);
48 }
49
50 public ConfigurationScheduler(final String name) {
51 super();
52 this.name = name;
53 }
54
55 @Override
56 public void start() {
57 super.start();
58 }
59
60 @Override
61 public boolean stop(final long timeout, final TimeUnit timeUnit) {
62 setStopping();
63 if (isExecutorServiceSet()) {
64 LOGGER.debug("{} shutting down threads in {}", name, getExecutorService());
65 executorService.shutdown();
66 try {
67 executorService.awaitTermination(timeout, timeUnit);
68 } catch (final InterruptedException ie) {
69 executorService.shutdownNow();
70 try {
71 executorService.awaitTermination(timeout, timeUnit);
72 } catch (final InterruptedException inner) {
73 LOGGER.warn("{} stopped but some scheduled services may not have completed.", name);
74 }
75
76 Thread.currentThread().interrupt();
77 }
78 }
79 setStopped();
80 return true;
81 }
82
83 public boolean isExecutorServiceSet() {
84 return executorService != null;
85 }
86
87
88
89
90 public void incrementScheduledItems() {
91 if (isExecutorServiceSet()) {
92 LOGGER.error("{} attempted to increment scheduled items after start", name);
93 } else {
94 ++scheduledItems;
95 }
96 }
97
98
99
100
101 public void decrementScheduledItems() {
102 if (!isStarted() && scheduledItems > 0) {
103 --scheduledItems;
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116 public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
117 return getExecutorService().schedule(callable, delay, unit);
118 }
119
120
121
122
123
124
125
126
127
128 public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
129 return getExecutorService().schedule(command, delay, unit);
130 }
131
132
133
134
135
136
137
138
139 public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Runnable command) {
140 return scheduleWithCron(cronExpression, new Date(), command);
141 }
142
143
144
145
146
147
148
149
150 public CronScheduledFuture<?> scheduleWithCron(final CronExpression cronExpression, final Date startDate, final Runnable command) {
151 final Date fireDate = cronExpression.getNextValidTimeAfter(startDate == null ? new Date() : startDate);
152 final CronRunnable runnable = new CronRunnable(command, cronExpression);
153 final ScheduledFuture<?> future = schedule(runnable, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
154 final CronScheduledFuture<?> cronScheduledFuture = new CronScheduledFuture<>(future, fireDate);
155 runnable.setScheduledFuture(cronScheduledFuture);
156 LOGGER.debug("{} scheduled cron expression {} to fire at {}", name, cronExpression.getCronExpression(), fireDate);
157 return cronScheduledFuture;
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171
172 public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
173 return getExecutorService().scheduleAtFixedRate(command, initialDelay, period, unit);
174 }
175
176
177
178
179
180
181
182
183
184
185
186 public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
187 return getExecutorService().scheduleWithFixedDelay(command, initialDelay, delay, unit);
188 }
189
190 public long nextFireInterval(final Date fireDate) {
191 return fireDate.getTime() - new Date().getTime();
192 }
193
194 private ScheduledExecutorService getExecutorService() {
195 if (executorService == null) {
196 if (scheduledItems > 0) {
197 LOGGER.debug("{} starting {} threads", name, scheduledItems);
198 scheduledItems = Math.min(scheduledItems, MAX_SCHEDULED_ITEMS);
199 final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(scheduledItems,
200 Log4jThreadFactory.createDaemonThreadFactory("Scheduled"));
201 executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
202 executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
203 this.executorService = executor;
204
205 } else {
206 LOGGER.debug("{}: No scheduled items", name);
207 }
208 }
209 return executorService;
210 }
211
212 public class CronRunnable implements Runnable {
213
214 private final CronExpression cronExpression;
215 private final Runnable runnable;
216 private CronScheduledFuture<?> scheduledFuture;
217
218 public CronRunnable(final Runnable runnable, final CronExpression cronExpression) {
219 this.cronExpression = cronExpression;
220 this.runnable = runnable;
221 }
222
223 public void setScheduledFuture(final CronScheduledFuture<?> future) {
224 this.scheduledFuture = future;
225 }
226
227 @Override
228 public void run() {
229 try {
230 final long millis = scheduledFuture.getFireTime().getTime() - System.currentTimeMillis();
231 if (millis > 0) {
232 LOGGER.debug("{} Cron thread woke up {} millis early. Sleeping", name, millis);
233 try {
234 Thread.sleep(millis);
235 } catch (final InterruptedException ie) {
236
237 }
238 }
239 runnable.run();
240 } catch(final Throwable ex) {
241 LOGGER.error("{} caught error running command", name, ex);
242 } finally {
243 final Date fireDate = cronExpression.getNextValidTimeAfter(new Date());
244 final ScheduledFuture<?> future = schedule(this, nextFireInterval(fireDate), TimeUnit.MILLISECONDS);
245 LOGGER.debug("{} Cron expression {} scheduled to fire again at {}", name, cronExpression.getCronExpression(),
246 fireDate);
247 scheduledFuture.reset(future, fireDate);
248 }
249 }
250
251 @Override
252 public String toString() {
253 return "CronRunnable{" + cronExpression.getCronExpression() + " - " + scheduledFuture.getFireTime();
254 }
255 }
256
257 @Override
258 public String toString() {
259 final StringBuilder sb = new StringBuilder("ConfigurationScheduler [name=");
260 sb.append(name);
261 sb.append(", [");
262 if (executorService != null) {
263 final Queue<Runnable> queue = ((ScheduledThreadPoolExecutor) executorService).getQueue();
264 boolean first = true;
265 for (final Runnable runnable : queue) {
266 if (!first) {
267 sb.append(", ");
268 }
269 sb.append(runnable.toString());
270 first = false;
271 }
272 }
273 sb.append("]");
274 return sb.toString();
275 }
276
277 }