1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.maven.surefire.junitcore.pc;
20
21 import java.util.Collection;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.CopyOnWriteArraySet;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
30 import org.junit.runner.Description;
31 import org.junit.runners.model.RunnerScheduler;
32
33 /**
34 * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
35 * a master scheduler can shutdown slaves.
36 * <br>
37 * The scheduler objects should be first created (and wired) and set in runners
38 * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
39 * <br>
40 * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
41 *
42 * @author Tibor Digana (tibor17)
43 * @since 2.16
44 */
45 public class Scheduler implements RunnerScheduler {
46 private final Balancer balancer;
47
48 private final SchedulingStrategy strategy;
49
50 private final Set<Controller> slaves = new CopyOnWriteArraySet<>();
51
52 private final Description description;
53
54 private final ConsoleLogger logger;
55
56 private volatile boolean shutdown = false;
57
58 private volatile boolean started = false;
59
60 private volatile boolean finished = false;
61
62 private volatile Controller masterController;
63
64 /**
65 * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
66 * <br>
67 * You can use it with one infinite thread pool shared in strategies across all
68 * suites, class runners, etc.
69 *
70 * @param logger console logger
71 * @param description JUnit description of class
72 * @param strategy scheduling strategy
73 */
74 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy) {
75 this(logger, description, strategy, -1);
76 }
77
78 /**
79 * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
80 * <br>
81 * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
82 * {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
83 * or {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
84 *
85 * @param logger current logger implementation
86 * @param description description of current runner
87 * @param strategy scheduling strategy with a shared thread pool
88 * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
89 * @throws NullPointerException if null <code>strategy</code>
90 */
91 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency) {
92 this(logger, description, strategy, BalancerFactory.createBalancer(concurrency));
93 }
94
95 /**
96 * New instances should be used by schedulers with limited concurrency by <code>balancer</code>
97 * against other groups of schedulers. The schedulers share one pool.
98 * <br>
99 * Unlike in {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)} which was
100 * limiting the <code>concurrency</code> of children of a runner where this scheduler was set, {@code this}
101 * <code>balancer</code> is limiting the concurrency of all children in runners having schedulers created by this
102 * constructor.
103 *
104 * @param logger current logger implementation
105 * @param description description of current runner
106 * @param strategy scheduling strategy which may share threads with other strategy
107 * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
108 * @throws NullPointerException if null <code>strategy</code> or <code>balancer</code>
109 */
110 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer) {
111 strategy.setDefaultShutdownHandler(newShutdownHandler());
112 this.logger = logger;
113 this.description = description;
114 this.strategy = strategy;
115 this.balancer = balancer;
116 masterController = null;
117 }
118
119 /**
120 * Can be used by e.g. a runner having parallel classes in use case with parallel
121 * suites, classes and methods sharing the same thread pool.
122 *
123 * @param logger current logger implementation
124 * @param description description of current runner
125 * @param masterScheduler scheduler sharing own threads with this slave
126 * @param strategy scheduling strategy for this scheduler
127 * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
128 * @throws NullPointerException if null <code>masterScheduler</code>, <code>strategy</code> or <code>balancer</code>
129 */
130 public Scheduler(
131 ConsoleLogger logger,
132 Description description,
133 Scheduler masterScheduler,
134 SchedulingStrategy strategy,
135 Balancer balancer) {
136 this(logger, description, strategy, balancer);
137 strategy.setDefaultShutdownHandler(newShutdownHandler());
138 masterScheduler.register(this);
139 }
140
141 /**
142 * @param logger console logger
143 * @param description JUnit description of class
144 * @param masterScheduler a reference to
145 * {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)}
146 * or {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)}
147 * @param strategy scheduling strategy
148 * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
149 *
150 * @see #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)
151 * @see #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)
152 */
153 public Scheduler(
154 ConsoleLogger logger,
155 Description description,
156 Scheduler masterScheduler,
157 SchedulingStrategy strategy,
158 int concurrency) {
159 this(logger, description, strategy, concurrency);
160 strategy.setDefaultShutdownHandler(newShutdownHandler());
161 masterScheduler.register(this);
162 }
163
164 /**
165 * Should be used with individual pools on suites, classes and methods, see
166 * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
167 * <br>
168 * Cached thread pool is infinite and can be always shared.
169 *
170 * @param logger console logger
171 * @param description JUnit description of class
172 * @param masterScheduler parent scheduler
173 * @param strategy scheduling strategy
174 */
175 public Scheduler(
176 ConsoleLogger logger, Description description, Scheduler masterScheduler, SchedulingStrategy strategy) {
177 this(logger, description, masterScheduler, strategy, 0);
178 }
179
180 private void setController(Controller masterController) {
181 if (masterController == null) {
182 throw new NullPointerException("null ExecutionController");
183 }
184 this.masterController = masterController;
185 }
186
187 /**
188 * @param slave a slave scheduler to register
189 * @return {@code true} if successfully registered the <code>slave</code>.
190 */
191 private boolean register(Scheduler slave) {
192 boolean canRegister = slave != null && slave != this;
193 if (canRegister) {
194 Controller controller = new Controller(slave);
195 canRegister = !slaves.contains(controller);
196 if (canRegister) {
197 slaves.add(controller);
198 slave.setController(controller);
199 }
200 }
201 return canRegister;
202 }
203
204 /**
205 * @return {@code true} if new tasks can be scheduled.
206 */
207 private boolean canSchedule() {
208 return !shutdown && (masterController == null || masterController.canSchedule());
209 }
210
211 protected void logQuietly(Throwable t) {
212 logger.error(t);
213 }
214
215 protected void logQuietly(String msg) {
216 logger.warning(msg);
217 }
218
219 /**
220 * Attempts to stop all actively executing tasks and immediately returns a collection
221 * of descriptions of those tasks which have started prior to this call.
222 * <br>
223 * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
224 * If <code>shutdownNow</code> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
225 *
226 * @param stopNow if {@code true} interrupts waiting test methods
227 * @return collection of descriptions started before shutting down
228 */
229 protected ShutdownResult describeStopped(boolean stopNow) {
230 Collection<Description> executedTests = new ConcurrentLinkedQueue<>();
231 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<>();
232 stop(executedTests, incompleteTests, false, stopNow);
233 return new ShutdownResult(executedTests, incompleteTests);
234 }
235
236 /**
237 * Stop/Shutdown/Interrupt scheduler and its children (if any).
238 *
239 * @param executedTests Started tests which have finished normally or abruptly till called this method.
240 * @param incompleteTests Started tests which have finished incomplete due to shutdown.
241 * @param tryCancelFutures Useful to set to {@code false} if a timeout is specified in plugin config.
242 * When the runner of
243 * {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
244 * is finished in
245 * {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
246 * all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
247 * See the unit test {@code ParallelComputerBuilder#timeoutAndForcedShutdown()}.
248 * @param stopNow Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
249 * {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
250 * {@link Thread#interrupt()}.
251 */
252 private void stop(
253 Collection<Description> executedTests,
254 Collection<Description> incompleteTests,
255 boolean tryCancelFutures,
256 boolean stopNow) {
257 shutdown = true;
258 try {
259 if (started && !ParallelComputerUtil.isUnusedDescription(description)) {
260 if (executedTests != null) {
261 executedTests.add(description);
262 }
263
264 if (incompleteTests != null && !finished) {
265 incompleteTests.add(description);
266 }
267 }
268
269 for (Controller slave : slaves) {
270 slave.stop(executedTests, incompleteTests, tryCancelFutures, stopNow);
271 }
272 } finally {
273 try {
274 balancer.releaseAllPermits();
275 } finally {
276 if (stopNow) {
277 strategy.stopNow();
278 } else if (tryCancelFutures) {
279 strategy.stop();
280 } else {
281 strategy.disable();
282 }
283 }
284 }
285 }
286
287 protected boolean shutdownThreadPoolsAwaitingKilled() {
288 if (masterController == null) {
289 stop(null, null, true, false);
290 boolean isNotInterrupted = true;
291 if (strategy != null) {
292 isNotInterrupted = strategy.destroy();
293 }
294 for (Controller slave : slaves) {
295 isNotInterrupted &= slave.destroy();
296 }
297 return isNotInterrupted;
298 } else {
299 throw new UnsupportedOperationException("cannot call this method if this is not a master scheduler");
300 }
301 }
302
303 protected void beforeExecute() {}
304
305 protected void afterExecute() {}
306
307 @Override
308 public void schedule(Runnable childStatement) {
309 if (childStatement == null) {
310 logQuietly("cannot schedule null");
311 } else if (canSchedule() && strategy.canSchedule()) {
312 try {
313 boolean isNotInterrupted = balancer.acquirePermit();
314 if (isNotInterrupted && !shutdown) {
315 Runnable task = wrapTask(childStatement);
316 strategy.schedule(task);
317 started = true;
318 }
319 } catch (RejectedExecutionException e) {
320 stop(null, null, true, false);
321 } catch (Throwable t) {
322 balancer.releasePermit();
323 logQuietly(t);
324 }
325 }
326 }
327
328 @Override
329 public void finished() {
330 try {
331 strategy.finished();
332 } catch (InterruptedException e) {
333 logQuietly(e);
334 } finally {
335 finished = true;
336 }
337 }
338
339 private Runnable wrapTask(final Runnable task) {
340 return new Runnable() {
341 @Override
342 public void run() {
343 try {
344 beforeExecute();
345 task.run();
346 } finally {
347 try {
348 afterExecute();
349 } finally {
350 balancer.releasePermit();
351 }
352 }
353 }
354 };
355 }
356
357 protected ShutdownHandler newShutdownHandler() {
358 return new ShutdownHandler();
359 }
360
361 /**
362 * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
363 */
364 private final class Controller {
365 private final Scheduler slave;
366
367 private Controller(Scheduler slave) {
368 this.slave = slave;
369 }
370
371 /**
372 * @return {@code true} if new children can be scheduled.
373 */
374 boolean canSchedule() {
375 return Scheduler.this.canSchedule();
376 }
377
378 void stop(
379 Collection<Description> executedTests,
380 Collection<Description> incompleteTests,
381 boolean tryCancelFutures,
382 boolean shutdownNow) {
383 slave.stop(executedTests, incompleteTests, tryCancelFutures, shutdownNow);
384 }
385
386 /**
387 * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
388 */
389 boolean destroy() {
390 return slave.strategy.destroy();
391 }
392
393 @Override
394 public int hashCode() {
395 return slave.hashCode();
396 }
397
398 @Override
399 public boolean equals(Object o) {
400 return o == this || (o instanceof Controller) && slave.equals(((Controller) o).slave);
401 }
402 }
403
404 /**
405 * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
406 * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
407 * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
408 * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
409 * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
410 * schedulers recursively as well.
411 */
412 public class ShutdownHandler implements RejectedExecutionHandler {
413 private volatile RejectedExecutionHandler poolHandler;
414
415 protected ShutdownHandler() {
416 poolHandler = null;
417 }
418
419 public void setRejectedExecutionHandler(RejectedExecutionHandler poolHandler) {
420 this.poolHandler = poolHandler;
421 }
422
423 @Override
424 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
425 if (executor.isShutdown()) {
426 Scheduler.this.stop(null, null, true, false);
427 }
428 final RejectedExecutionHandler poolHandler = this.poolHandler;
429 if (poolHandler != null) {
430 poolHandler.rejectedExecution(r, executor);
431 }
432 }
433 }
434 }