View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.api.report.ConsoleStream;
23  import org.junit.runner.Description;
24  import org.junit.runners.model.RunnerScheduler;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.PrintStream;
28  import java.util.Collection;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArraySet;
32  import java.util.concurrent.RejectedExecutionException;
33  import java.util.concurrent.RejectedExecutionHandler;
34  import java.util.concurrent.ThreadPoolExecutor;
35  
36  /**
37   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
38   * a master scheduler can shutdown slaves.
39   * <br>
40   * The scheduler objects should be first created (and wired) and set in runners
41   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
42   * <br>
43   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
44   *
45   * @author Tibor Digana (tibor17)
46   * @since 2.16
47   */
48  public class Scheduler
49      implements RunnerScheduler
50  {
51      private final Balancer balancer;
52  
53      private final SchedulingStrategy strategy;
54  
55      private final Set<Controller> slaves = new CopyOnWriteArraySet<>();
56  
57      private final Description description;
58  
59      private final ConsoleStream logger;
60  
61      private volatile boolean shutdown = false;
62  
63      private volatile boolean started = false;
64  
65      private volatile boolean finished = false;
66  
67      private volatile Controller masterController;
68  
69      /**
70       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
71       * <br>
72       * You can use it with one infinite thread pool shared in strategies across all
73       * suites, class runners, etc.
74       *
75       * @param logger          console logger
76       * @param description     JUnit description of class
77       * @param strategy        scheduling strategy
78       */
79      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy )
80      {
81          this( logger, description, strategy, -1 );
82      }
83  
84      /**
85       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
86       * <br>
87       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
88       * {@link #Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
89       * or {@link #Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
90       *
91       * @param logger current logger implementation
92       * @param description description of current runner
93       * @param strategy    scheduling strategy with a shared thread pool
94       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
95       * @throws NullPointerException if null <tt>strategy</tt>
96       */
97      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, int concurrency )
98      {
99          this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
100     }
101 
102     /**
103      * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
104      * against other groups of schedulers. The schedulers share one pool.
105      * <br>
106      * Unlike in {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)} which was
107      * limiting the <tt>concurrency</tt> of children of a runner where this scheduler was set, {@code this}
108      * <tt>balancer</tt> is limiting the concurrency of all children in runners having schedulers created by this
109      * constructor.
110      *
111      * @param logger current logger implementation
112      * @param description description of current runner
113      * @param strategy    scheduling strategy which may share threads with other strategy
114      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
115      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
116      */
117     public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, Balancer balancer )
118     {
119         strategy.setDefaultShutdownHandler( newShutdownHandler() );
120         this.logger = logger;
121         this.description = description;
122         this.strategy = strategy;
123         this.balancer = balancer;
124         masterController = null;
125     }
126 
127     /**
128      * Can be used by e.g. a runner having parallel classes in use case with parallel
129      * suites, classes and methods sharing the same thread pool.
130      *
131      * @param logger current logger implementation
132      * @param description     description of current runner
133      * @param masterScheduler scheduler sharing own threads with this slave
134      * @param strategy        scheduling strategy for this scheduler
135      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
136      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
137      */
138     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
139                       SchedulingStrategy strategy, Balancer balancer )
140     {
141         this( logger, description, strategy, balancer );
142         strategy.setDefaultShutdownHandler( newShutdownHandler() );
143         masterScheduler.register( this );
144     }
145 
146     /**
147      * @param logger          console logger
148      * @param description     JUnit description of class
149      * @param masterScheduler a reference to
150      * {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)}
151      *                        or {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)}
152      * @param strategy        scheduling strategy
153      * @param concurrency     determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
154      *
155      * @see #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)
156      * @see #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)
157      */
158     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
159                       SchedulingStrategy strategy, int concurrency )
160     {
161         this( logger, description, strategy, concurrency );
162         strategy.setDefaultShutdownHandler( newShutdownHandler() );
163         masterScheduler.register( this );
164     }
165 
166     /**
167      * Should be used with individual pools on suites, classes and methods, see
168      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
169      * <br>
170      * Cached thread pool is infinite and can be always shared.
171      *
172      * @param logger          console logger
173      * @param description     JUnit description of class
174      * @param masterScheduler parent scheduler
175      * @param strategy        scheduling strategy
176      */
177     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
178                       SchedulingStrategy strategy )
179     {
180         this( logger, description, masterScheduler, strategy, 0 );
181     }
182 
183     private void setController( Controller masterController )
184     {
185         if ( masterController == null )
186         {
187             throw new NullPointerException( "null ExecutionController" );
188         }
189         this.masterController = masterController;
190     }
191 
192     /**
193      * @param slave a slave scheduler to register
194      * @return {@code true} if successfully registered the <tt>slave</tt>.
195      */
196     private boolean register( Scheduler slave )
197     {
198         boolean canRegister = slave != null && slave != this;
199         if ( canRegister )
200         {
201             Controller controller = new Controller( slave );
202             canRegister = !slaves.contains( controller );
203             if ( canRegister )
204             {
205                 slaves.add( controller );
206                 slave.setController( controller );
207             }
208         }
209         return canRegister;
210     }
211 
212     /**
213      * @return {@code true} if new tasks can be scheduled.
214      */
215     private boolean canSchedule()
216     {
217         return !shutdown && ( masterController == null || masterController.canSchedule() );
218     }
219 
220     protected void logQuietly( Throwable t )
221     {
222         ByteArrayOutputStream out = new ByteArrayOutputStream();
223         try ( PrintStream stream = new PrintStream( out ) )
224         {
225             t.printStackTrace( stream );
226         }
227         logger.println( out.toString() );
228     }
229 
230     protected void logQuietly( String msg )
231     {
232         logger.println( msg );
233     }
234 
235     /**
236      * Attempts to stop all actively executing tasks and immediately returns a collection
237      * of descriptions of those tasks which have started prior to this call.
238      * <br>
239      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
240      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
241      *
242      * @param stopNow if {@code true} interrupts waiting test methods
243      * @return collection of descriptions started before shutting down
244      */
245     protected ShutdownResult describeStopped( boolean stopNow )
246     {
247         Collection<Description> executedTests = new ConcurrentLinkedQueue<>();
248         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<>();
249         stop( executedTests, incompleteTests, false, stopNow );
250         return new ShutdownResult( executedTests, incompleteTests );
251     }
252 
253     /**
254      * Stop/Shutdown/Interrupt scheduler and its children (if any).
255      *
256      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
257      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
258      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
259      *                            When the runner of
260      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
261      *                            is finished in
262      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
263      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
264      *                            See the unit test {@code ParallelComputerBuilder#timeoutAndForcedShutdown()}.
265      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
266      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
267      *                            {@link Thread#interrupt()}.
268      */
269     private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
270                        boolean tryCancelFutures, boolean stopNow )
271     {
272         shutdown = true;
273         try
274         {
275             if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
276             {
277                 if ( executedTests != null )
278                 {
279                     executedTests.add( description );
280                 }
281 
282                 if ( incompleteTests != null && !finished )
283                 {
284                     incompleteTests.add( description );
285                 }
286             }
287 
288             for ( Controller slave : slaves )
289             {
290                 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
291             }
292         }
293         finally
294         {
295             try
296             {
297                 balancer.releaseAllPermits();
298             }
299             finally
300             {
301                 if ( stopNow )
302                 {
303                     strategy.stopNow();
304                 }
305                 else if ( tryCancelFutures )
306                 {
307                     strategy.stop();
308                 }
309                 else
310                 {
311                     strategy.disable();
312                 }
313             }
314         }
315     }
316 
317     protected boolean shutdownThreadPoolsAwaitingKilled()
318     {
319         if ( masterController == null )
320         {
321             stop( null, null, true, false );
322             boolean isNotInterrupted = true;
323             if ( strategy != null )
324             {
325                 isNotInterrupted = strategy.destroy();
326             }
327             for ( Controller slave : slaves )
328             {
329                 isNotInterrupted &= slave.destroy();
330             }
331             return isNotInterrupted;
332         }
333         else
334         {
335             throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
336         }
337     }
338 
339     protected void beforeExecute()
340     {
341     }
342 
343     protected void afterExecute()
344     {
345     }
346 
347     @Override
348     public void schedule( Runnable childStatement )
349     {
350         if ( childStatement == null )
351         {
352             logQuietly( "cannot schedule null" );
353         }
354         else if ( canSchedule() && strategy.canSchedule() )
355         {
356             try
357             {
358                 boolean isNotInterrupted = balancer.acquirePermit();
359                 if ( isNotInterrupted && !shutdown )
360                 {
361                     Runnable task = wrapTask( childStatement );
362                     strategy.schedule( task );
363                     started = true;
364                 }
365             }
366             catch ( RejectedExecutionException e )
367             {
368                 stop( null, null, true, false );
369             }
370             catch ( Throwable t )
371             {
372                 balancer.releasePermit();
373                 logQuietly( t );
374             }
375         }
376     }
377 
378     @Override
379     public void finished()
380     {
381         try
382         {
383             strategy.finished();
384         }
385         catch ( InterruptedException e )
386         {
387             logQuietly( e );
388         }
389         finally
390         {
391             finished = true;
392         }
393     }
394 
395     private Runnable wrapTask( final Runnable task )
396     {
397         return new Runnable()
398         {
399             @Override
400             public void run()
401             {
402                 try
403                 {
404                     beforeExecute();
405                     task.run();
406                 }
407                 finally
408                 {
409                     try
410                     {
411                         afterExecute();
412                     }
413                     finally
414                     {
415                         balancer.releasePermit();
416                     }
417                 }
418             }
419         };
420     }
421 
422     protected ShutdownHandler newShutdownHandler()
423     {
424         return new ShutdownHandler();
425     }
426 
427     /**
428      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
429      */
430     private final class Controller
431     {
432         private final Scheduler slave;
433 
434         private Controller( Scheduler slave )
435         {
436             this.slave = slave;
437         }
438 
439         /**
440          * @return {@code true} if new children can be scheduled.
441          */
442         boolean canSchedule()
443         {
444             return Scheduler.this.canSchedule();
445         }
446 
447         void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
448                    boolean tryCancelFutures, boolean shutdownNow )
449         {
450             slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
451         }
452 
453         /**
454          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
455          */
456         boolean destroy()
457         {
458             return slave.strategy.destroy();
459         }
460 
461         @Override
462         public int hashCode()
463         {
464             return slave.hashCode();
465         }
466 
467         @Override
468         public boolean equals( Object o )
469         {
470             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
471         }
472     }
473 
474     /**
475      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
476      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
477      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
478      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
479      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
480      * schedulers recursively as well.
481      */
482     public class ShutdownHandler
483         implements RejectedExecutionHandler
484     {
485         private volatile RejectedExecutionHandler poolHandler;
486 
487         protected ShutdownHandler()
488         {
489             poolHandler = null;
490         }
491 
492         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
493         {
494             this.poolHandler = poolHandler;
495         }
496 
497         @Override
498         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
499         {
500             if ( executor.isShutdown() )
501             {
502                 Scheduler.this.stop( null, null, true, false );
503             }
504             final RejectedExecutionHandler poolHandler = this.poolHandler;
505             if ( poolHandler != null )
506             {
507                 poolHandler.rejectedExecution( r, executor );
508             }
509         }
510     }
511 }