View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.junitcore.pc;
20  
21  import java.util.Collection;
22  import java.util.Set;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.CopyOnWriteArraySet;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.RejectedExecutionHandler;
27  import java.util.concurrent.ThreadPoolExecutor;
28  
29  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
30  import org.junit.runner.Description;
31  import org.junit.runners.model.RunnerScheduler;
32  
33  /**
34   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
35   * a master scheduler can shutdown slaves.
36   * <br>
37   * The scheduler objects should be first created (and wired) and set in runners
38   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
39   * <br>
40   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
41   *
42   * @author Tibor Digana (tibor17)
43   * @since 2.16
44   */
45  public class Scheduler implements RunnerScheduler {
46      private final Balancer balancer;
47  
48      private final SchedulingStrategy strategy;
49  
50      private final Set<Controller> slaves = new CopyOnWriteArraySet<>();
51  
52      private final Description description;
53  
54      private final ConsoleLogger logger;
55  
56      private volatile boolean shutdown = false;
57  
58      private volatile boolean started = false;
59  
60      private volatile boolean finished = false;
61  
62      private volatile Controller masterController;
63  
64      /**
65       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
66       * <br>
67       * You can use it with one infinite thread pool shared in strategies across all
68       * suites, class runners, etc.
69       *
70       * @param logger          console logger
71       * @param description     JUnit description of class
72       * @param strategy        scheduling strategy
73       */
74      public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy) {
75          this(logger, description, strategy, -1);
76      }
77  
78      /**
79       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
80       * <br>
81       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
82       * {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
83       * or {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
84       *
85       * @param logger current logger implementation
86       * @param description description of current runner
87       * @param strategy    scheduling strategy with a shared thread pool
88       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
89       * @throws NullPointerException if null <code>strategy</code>
90       */
91      public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency) {
92          this(logger, description, strategy, BalancerFactory.createBalancer(concurrency));
93      }
94  
95      /**
96       * New instances should be used by schedulers with limited concurrency by <code>balancer</code>
97       * against other groups of schedulers. The schedulers share one pool.
98       * <br>
99       * Unlike in {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)} which was
100      * limiting the <code>concurrency</code> of children of a runner where this scheduler was set, {@code this}
101      * <code>balancer</code> is limiting the concurrency of all children in runners having schedulers created by this
102      * constructor.
103      *
104      * @param logger current logger implementation
105      * @param description description of current runner
106      * @param strategy    scheduling strategy which may share threads with other strategy
107      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
108      * @throws NullPointerException if null <code>strategy</code> or <code>balancer</code>
109      */
110     public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer) {
111         strategy.setDefaultShutdownHandler(newShutdownHandler());
112         this.logger = logger;
113         this.description = description;
114         this.strategy = strategy;
115         this.balancer = balancer;
116         masterController = null;
117     }
118 
119     /**
120      * Can be used by e.g. a runner having parallel classes in use case with parallel
121      * suites, classes and methods sharing the same thread pool.
122      *
123      * @param logger current logger implementation
124      * @param description     description of current runner
125      * @param masterScheduler scheduler sharing own threads with this slave
126      * @param strategy        scheduling strategy for this scheduler
127      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
128      * @throws NullPointerException if null <code>masterScheduler</code>, <code>strategy</code> or <code>balancer</code>
129      */
130     public Scheduler(
131             ConsoleLogger logger,
132             Description description,
133             Scheduler masterScheduler,
134             SchedulingStrategy strategy,
135             Balancer balancer) {
136         this(logger, description, strategy, balancer);
137         strategy.setDefaultShutdownHandler(newShutdownHandler());
138         masterScheduler.register(this);
139     }
140 
141     /**
142      * @param logger          console logger
143      * @param description     JUnit description of class
144      * @param masterScheduler a reference to
145      * {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)}
146      *                        or {@link #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)}
147      * @param strategy        scheduling strategy
148      * @param concurrency     determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
149      *
150      * @see #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)
151      * @see #Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)
152      */
153     public Scheduler(
154             ConsoleLogger logger,
155             Description description,
156             Scheduler masterScheduler,
157             SchedulingStrategy strategy,
158             int concurrency) {
159         this(logger, description, strategy, concurrency);
160         strategy.setDefaultShutdownHandler(newShutdownHandler());
161         masterScheduler.register(this);
162     }
163 
164     /**
165      * Should be used with individual pools on suites, classes and methods, see
166      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
167      * <br>
168      * Cached thread pool is infinite and can be always shared.
169      *
170      * @param logger          console logger
171      * @param description     JUnit description of class
172      * @param masterScheduler parent scheduler
173      * @param strategy        scheduling strategy
174      */
175     public Scheduler(
176             ConsoleLogger logger, Description description, Scheduler masterScheduler, SchedulingStrategy strategy) {
177         this(logger, description, masterScheduler, strategy, 0);
178     }
179 
180     private void setController(Controller masterController) {
181         if (masterController == null) {
182             throw new NullPointerException("null ExecutionController");
183         }
184         this.masterController = masterController;
185     }
186 
187     /**
188      * @param slave a slave scheduler to register
189      * @return {@code true} if successfully registered the <code>slave</code>.
190      */
191     private boolean register(Scheduler slave) {
192         boolean canRegister = slave != null && slave != this;
193         if (canRegister) {
194             Controller controller = new Controller(slave);
195             canRegister = !slaves.contains(controller);
196             if (canRegister) {
197                 slaves.add(controller);
198                 slave.setController(controller);
199             }
200         }
201         return canRegister;
202     }
203 
204     /**
205      * @return {@code true} if new tasks can be scheduled.
206      */
207     private boolean canSchedule() {
208         return !shutdown && (masterController == null || masterController.canSchedule());
209     }
210 
211     protected void logQuietly(Throwable t) {
212         logger.error(t);
213     }
214 
215     protected void logQuietly(String msg) {
216         logger.warning(msg);
217     }
218 
219     /**
220      * Attempts to stop all actively executing tasks and immediately returns a collection
221      * of descriptions of those tasks which have started prior to this call.
222      * <br>
223      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
224      * If <code>shutdownNow</code> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
225      *
226      * @param stopNow if {@code true} interrupts waiting test methods
227      * @return collection of descriptions started before shutting down
228      */
229     protected ShutdownResult describeStopped(boolean stopNow) {
230         Collection<Description> executedTests = new ConcurrentLinkedQueue<>();
231         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<>();
232         stop(executedTests, incompleteTests, false, stopNow);
233         return new ShutdownResult(executedTests, incompleteTests);
234     }
235 
236     /**
237      * Stop/Shutdown/Interrupt scheduler and its children (if any).
238      *
239      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
240      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
241      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
242      *                            When the runner of
243      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
244      *                            is finished in
245      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
246      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
247      *                            See the unit test {@code ParallelComputerBuilder#timeoutAndForcedShutdown()}.
248      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
249      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
250      *                            {@link Thread#interrupt()}.
251      */
252     private void stop(
253             Collection<Description> executedTests,
254             Collection<Description> incompleteTests,
255             boolean tryCancelFutures,
256             boolean stopNow) {
257         shutdown = true;
258         try {
259             if (started && !ParallelComputerUtil.isUnusedDescription(description)) {
260                 if (executedTests != null) {
261                     executedTests.add(description);
262                 }
263 
264                 if (incompleteTests != null && !finished) {
265                     incompleteTests.add(description);
266                 }
267             }
268 
269             for (Controller slave : slaves) {
270                 slave.stop(executedTests, incompleteTests, tryCancelFutures, stopNow);
271             }
272         } finally {
273             try {
274                 balancer.releaseAllPermits();
275             } finally {
276                 if (stopNow) {
277                     strategy.stopNow();
278                 } else if (tryCancelFutures) {
279                     strategy.stop();
280                 } else {
281                     strategy.disable();
282                 }
283             }
284         }
285     }
286 
287     protected boolean shutdownThreadPoolsAwaitingKilled() {
288         if (masterController == null) {
289             stop(null, null, true, false);
290             boolean isNotInterrupted = true;
291             if (strategy != null) {
292                 isNotInterrupted = strategy.destroy();
293             }
294             for (Controller slave : slaves) {
295                 isNotInterrupted &= slave.destroy();
296             }
297             return isNotInterrupted;
298         } else {
299             throw new UnsupportedOperationException("cannot call this method if this is not a master scheduler");
300         }
301     }
302 
303     protected void beforeExecute() {}
304 
305     protected void afterExecute() {}
306 
307     @Override
308     public void schedule(Runnable childStatement) {
309         if (childStatement == null) {
310             logQuietly("cannot schedule null");
311         } else if (canSchedule() && strategy.canSchedule()) {
312             try {
313                 boolean isNotInterrupted = balancer.acquirePermit();
314                 if (isNotInterrupted && !shutdown) {
315                     Runnable task = wrapTask(childStatement);
316                     strategy.schedule(task);
317                     started = true;
318                 }
319             } catch (RejectedExecutionException e) {
320                 stop(null, null, true, false);
321             } catch (Throwable t) {
322                 balancer.releasePermit();
323                 logQuietly(t);
324             }
325         }
326     }
327 
328     @Override
329     public void finished() {
330         try {
331             strategy.finished();
332         } catch (InterruptedException e) {
333             logQuietly(e);
334         } finally {
335             finished = true;
336         }
337     }
338 
339     private Runnable wrapTask(final Runnable task) {
340         return new Runnable() {
341             @Override
342             public void run() {
343                 try {
344                     beforeExecute();
345                     task.run();
346                 } finally {
347                     try {
348                         afterExecute();
349                     } finally {
350                         balancer.releasePermit();
351                     }
352                 }
353             }
354         };
355     }
356 
357     protected ShutdownHandler newShutdownHandler() {
358         return new ShutdownHandler();
359     }
360 
361     /**
362      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
363      */
364     private final class Controller {
365         private final Scheduler slave;
366 
367         private Controller(Scheduler slave) {
368             this.slave = slave;
369         }
370 
371         /**
372          * @return {@code true} if new children can be scheduled.
373          */
374         boolean canSchedule() {
375             return Scheduler.this.canSchedule();
376         }
377 
378         void stop(
379                 Collection<Description> executedTests,
380                 Collection<Description> incompleteTests,
381                 boolean tryCancelFutures,
382                 boolean shutdownNow) {
383             slave.stop(executedTests, incompleteTests, tryCancelFutures, shutdownNow);
384         }
385 
386         /**
387          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
388          */
389         boolean destroy() {
390             return slave.strategy.destroy();
391         }
392 
393         @Override
394         public int hashCode() {
395             return slave.hashCode();
396         }
397 
398         @Override
399         public boolean equals(Object o) {
400             return o == this || (o instanceof Controller) && slave.equals(((Controller) o).slave);
401         }
402     }
403 
404     /**
405      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
406      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
407      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
408      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
409      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
410      * schedulers recursively as well.
411      */
412     public class ShutdownHandler implements RejectedExecutionHandler {
413         private volatile RejectedExecutionHandler poolHandler;
414 
415         protected ShutdownHandler() {
416             poolHandler = null;
417         }
418 
419         public void setRejectedExecutionHandler(RejectedExecutionHandler poolHandler) {
420             this.poolHandler = poolHandler;
421         }
422 
423         @Override
424         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
425             if (executor.isShutdown()) {
426                 Scheduler.this.stop(null, null, true, false);
427             }
428             final RejectedExecutionHandler poolHandler = this.poolHandler;
429             if (poolHandler != null) {
430                 poolHandler.rejectedExecution(r, executor);
431             }
432         }
433     }
434 }