View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.junit.runner.Description;
23  import org.junit.runners.model.RunnerScheduler;
24  
25  import java.util.Collection;
26  import java.util.Set;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.CopyOnWriteArraySet;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadPoolExecutor;
32  
33  /**
34   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
35   * a master scheduler can shutdown slaves.
36   * <p/>
37   * The scheduler objects should be first created (and wired) and set in runners
38   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
39   * <p/>
40   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
41   *
42   * @author Tibor Digana (tibor17)
43   * @since 2.16
44   */
45  public class Scheduler
46      implements RunnerScheduler
47  {
48      private final Balancer balancer;
49  
50      private final SchedulingStrategy strategy;
51  
52      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
53  
54      private final Description description;
55  
56      private volatile boolean shutdown = false;
57  
58      private volatile boolean started = false;
59  
60      private volatile boolean finished = false;
61  
62      private volatile Controller masterController;
63  
64      /**
65       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
66       * <p/>
67       * You can use it with one infinite thread pool shared in strategies across all
68       * suites, class runners, etc.
69       */
70      public Scheduler( Description description, SchedulingStrategy strategy )
71      {
72          this( description, strategy, -1 );
73      }
74  
75      /**
76       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
77       * <p/>
78       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
79       * {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy)}
80       * or {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
81       *
82       * @param description description of current runner
83       * @param strategy    scheduling strategy with a shared thread pool
84       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
85       * @throws NullPointerException if null <tt>strategy</tt>
86       */
87      public Scheduler( Description description, SchedulingStrategy strategy, int concurrency )
88      {
89          this( description, strategy, BalancerFactory.createBalancer( concurrency ) );
90      }
91  
92      /**
93       * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
94       * against other groups of schedulers. The schedulers share one pool.
95       * <p/>
96       * Unlike in {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} which was limiting
97       * the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em> <tt>balancer</tt>
98       * is limiting the concurrency of all children in runners having schedulers created by this constructor.
99       *
100      * @param description description of current runner
101      * @param strategy    scheduling strategy which may share threads with other strategy
102      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
103      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
104      */
105     public Scheduler( Description description, SchedulingStrategy strategy, Balancer balancer )
106     {
107         strategy.setDefaultShutdownHandler( newShutdownHandler() );
108         this.description = description;
109         this.strategy = strategy;
110         this.balancer = balancer;
111         masterController = null;
112     }
113 
114     /**
115      * Can be used by e.g. a runner having parallel classes in use case with parallel
116      * suites, classes and methods sharing the same thread pool.
117      *
118      * @param description     description of current runner
119      * @param masterScheduler scheduler sharing own threads with this slave
120      * @param strategy        scheduling strategy for this scheduler
121      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
122      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
123      */
124     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy,
125                       Balancer balancer )
126     {
127         this( description, strategy, balancer );
128         strategy.setDefaultShutdownHandler( newShutdownHandler() );
129         masterScheduler.register( this );
130     }
131 
132     /**
133      * @param masterScheduler a reference to {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)}
134      *                        or {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy)}
135      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy)
136      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)
137      */
138     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy, int concurrency )
139     {
140         this( description, strategy, concurrency );
141         strategy.setDefaultShutdownHandler( newShutdownHandler() );
142         masterScheduler.register( this );
143     }
144 
145     /**
146      * Should be used with individual pools on suites, classes and methods, see
147      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
148      * <p/>
149      * Cached thread pool is infinite and can be always shared.
150      */
151     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy )
152     {
153         this( description, masterScheduler, strategy, 0 );
154     }
155 
156     private void setController( Controller masterController )
157     {
158         if ( masterController == null )
159         {
160             throw new NullPointerException( "null ExecutionController" );
161         }
162         this.masterController = masterController;
163     }
164 
165     /**
166      * @param slave a slave scheduler to register
167      * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
168      */
169     private boolean register( Scheduler slave )
170     {
171         boolean canRegister = slave != null && slave != this;
172         if ( canRegister )
173         {
174             Controller controller = new Controller( slave );
175             canRegister = !slaves.contains( controller );
176             if ( canRegister )
177             {
178                 slaves.add( controller );
179                 slave.setController( controller );
180             }
181         }
182         return canRegister;
183     }
184 
185     /**
186      * @return <tt>true</tt> if new tasks can be scheduled.
187      */
188     private boolean canSchedule()
189     {
190         return !shutdown && ( masterController == null || masterController.canSchedule() );
191     }
192 
193     protected void logQuietly( Throwable t )
194     {
195         t.printStackTrace( System.out );
196     }
197 
198     protected void logQuietly( String msg )
199     {
200         System.out.println( msg );
201     }
202 
203     /**
204      * Attempts to stop all actively executing tasks and immediately returns a collection
205      * of descriptions of those tasks which have started prior to this call.
206      * <p/>
207      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
208      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
209      *
210      * @param stopNow if <tt>true</tt> interrupts waiting test methods
211      * @return collection of descriptions started before shutting down
212      */
213     protected ShutdownResult describeStopped( boolean stopNow )
214     {
215         Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
216         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
217         stop( executedTests, incompleteTests, false, stopNow );
218         return new ShutdownResult( executedTests, incompleteTests );
219     }
220 
221     /**
222      * Stop/Shutdown/Interrupt scheduler and its children (if any).
223      *
224      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
225      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
226      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
227      *                            When the runner of
228      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
229      *                            is finished in
230      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
231      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
232      *                            See the unit test <em>ParallelComputerBuilder#timeoutAndForcedShutdown()</em>.
233      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
234      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
235      *                            {@link Thread#interrupt()}.
236      */
237     private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
238                        boolean tryCancelFutures, boolean stopNow )
239     {
240         shutdown = true;
241         try
242         {
243             if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
244             {
245                 if ( executedTests != null )
246                 {
247                     executedTests.add( description );
248                 }
249 
250                 if ( incompleteTests != null && !finished )
251                 {
252                     incompleteTests.add( description );
253                 }
254             }
255 
256             for ( Controller slave : slaves )
257             {
258                 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
259             }
260         }
261         finally
262         {
263             try
264             {
265                 balancer.releaseAllPermits();
266             }
267             finally
268             {
269                 if ( stopNow )
270                 {
271                     strategy.stopNow();
272                 }
273                 else if ( tryCancelFutures )
274                 {
275                     strategy.stop();
276                 }
277                 else
278                 {
279                     strategy.disable();
280                 }
281             }
282         }
283     }
284 
285     protected boolean shutdownThreadPoolsAwaitingKilled()
286     {
287         if ( masterController == null )
288         {
289             stop( null, null, true, false );
290             boolean isNotInterrupted = true;
291             if ( strategy != null )
292             {
293                 isNotInterrupted = strategy.destroy();
294             }
295             for ( Controller slave : slaves )
296             {
297                 isNotInterrupted &= slave.destroy();
298             }
299             return isNotInterrupted;
300         }
301         else
302         {
303             throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
304         }
305     }
306 
307     protected void beforeExecute()
308     {
309     }
310 
311     protected void afterExecute()
312     {
313     }
314 
315     public void schedule( Runnable childStatement )
316     {
317         if ( childStatement == null )
318         {
319             logQuietly( "cannot schedule null" );
320         }
321         else if ( canSchedule() && strategy.canSchedule() )
322         {
323             try
324             {
325                 boolean isNotInterrupted = balancer.acquirePermit();
326                 if ( isNotInterrupted && !shutdown )
327                 {
328                     Runnable task = wrapTask( childStatement );
329                     strategy.schedule( task );
330                     started = true;
331                 }
332             }
333             catch ( RejectedExecutionException e )
334             {
335                 stop( null, null, true, false );
336             }
337             catch ( Throwable t )
338             {
339                 balancer.releasePermit();
340                 logQuietly( t );
341             }
342         }
343     }
344 
345     public void finished()
346     {
347         try
348         {
349             strategy.finished();
350         }
351         catch ( InterruptedException e )
352         {
353             logQuietly( e );
354         }
355         finally
356         {
357             finished = true;
358         }
359     }
360 
361     private Runnable wrapTask( final Runnable task )
362     {
363         return new Runnable()
364         {
365             public void run()
366             {
367                 try
368                 {
369                     beforeExecute();
370                     task.run();
371                 }
372                 finally
373                 {
374                     try
375                     {
376                         afterExecute();
377                     }
378                     finally
379                     {
380                         balancer.releasePermit();
381                     }
382                 }
383             }
384         };
385     }
386 
387     protected ShutdownHandler newShutdownHandler()
388     {
389         return new ShutdownHandler();
390     }
391 
392     /**
393      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
394      */
395     private final class Controller
396     {
397         private final Scheduler slave;
398 
399         private Controller( Scheduler slave )
400         {
401             this.slave = slave;
402         }
403 
404         /**
405          * @return <tt>true</tt> if new children can be scheduled.
406          */
407         boolean canSchedule()
408         {
409             return Scheduler.this.canSchedule();
410         }
411 
412         void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
413                    boolean tryCancelFutures, boolean shutdownNow )
414         {
415             slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
416         }
417 
418         /**
419          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
420          */
421         boolean destroy()
422         {
423             return slave.strategy.destroy();
424         }
425 
426         @Override
427         public int hashCode()
428         {
429             return slave.hashCode();
430         }
431 
432         @Override
433         public boolean equals( Object o )
434         {
435             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
436         }
437     }
438 
439     /**
440      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
441      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
442      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
443      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
444      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
445      * schedulers recursively as well.
446      */
447     public class ShutdownHandler
448         implements RejectedExecutionHandler
449     {
450         private volatile RejectedExecutionHandler poolHandler;
451 
452         protected ShutdownHandler()
453         {
454             poolHandler = null;
455         }
456 
457         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
458         {
459             this.poolHandler = poolHandler;
460         }
461 
462         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
463         {
464             if ( executor.isShutdown() )
465             {
466                 Scheduler.this.stop( null, null, true, false );
467             }
468             final RejectedExecutionHandler poolHandler = this.poolHandler;
469             if ( poolHandler != null )
470             {
471                 poolHandler.rejectedExecution( r, executor );
472             }
473         }
474     }
475 }