View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.report.ConsoleStream;
23  import org.junit.runner.Description;
24  import org.junit.runners.model.RunnerScheduler;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.PrintStream;
28  import java.util.Collection;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArraySet;
32  import java.util.concurrent.RejectedExecutionException;
33  import java.util.concurrent.RejectedExecutionHandler;
34  import java.util.concurrent.ThreadPoolExecutor;
35  
36  /**
37   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
38   * a master scheduler can shutdown slaves.
39   * <br>
40   * The scheduler objects should be first created (and wired) and set in runners
41   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
42   * <br>
43   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
44   *
45   * @author Tibor Digana (tibor17)
46   * @since 2.16
47   */
48  public class Scheduler
49      implements RunnerScheduler
50  {
51      private final Balancer balancer;
52  
53      private final SchedulingStrategy strategy;
54  
55      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56  
57      private final Description description;
58  
59      private final ConsoleStream logger;
60  
61      private volatile boolean shutdown = false;
62  
63      private volatile boolean started = false;
64  
65      private volatile boolean finished = false;
66  
67      private volatile Controller masterController;
68  
69      /**
70       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
71       * <br>
72       * You can use it with one infinite thread pool shared in strategies across all
73       * suites, class runners, etc.
74       *
75       * @param logger          console logger
76       * @param description     JUnit description of class
77       * @param strategy        scheduling strategy
78       */
79      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy )
80      {
81          this( logger, description, strategy, -1 );
82      }
83  
84      /**
85       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
86       * <br>
87       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
88       * {@link #Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
89       * or {@link #Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
90       *
91       * @param logger current logger implementation
92       * @param description description of current runner
93       * @param strategy    scheduling strategy with a shared thread pool
94       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
95       * @throws NullPointerException if null <tt>strategy</tt>
96       */
97      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, int concurrency )
98      {
99          this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
100     }
101 
102     /**
103      * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
104      * against other groups of schedulers. The schedulers share one pool.
105      * <br>
106      * Unlike in {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)} which was
107      * limiting the <tt>concurrency</tt> of children of a runner where this scheduler was set, {@code this}
108      * <tt>balancer</tt> is limiting the concurrency of all children in runners having schedulers created by this
109      * constructor.
110      *
111      * @param logger current logger implementation
112      * @param description description of current runner
113      * @param strategy    scheduling strategy which may share threads with other strategy
114      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
115      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
116      */
117     public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, Balancer balancer )
118     {
119         strategy.setDefaultShutdownHandler( newShutdownHandler() );
120         this.logger = logger;
121         this.description = description;
122         this.strategy = strategy;
123         this.balancer = balancer;
124         masterController = null;
125     }
126 
127     /**
128      * Can be used by e.g. a runner having parallel classes in use case with parallel
129      * suites, classes and methods sharing the same thread pool.
130      *
131      * @param logger current logger implementation
132      * @param description     description of current runner
133      * @param masterScheduler scheduler sharing own threads with this slave
134      * @param strategy        scheduling strategy for this scheduler
135      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
136      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
137      */
138     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
139                       SchedulingStrategy strategy, Balancer balancer )
140     {
141         this( logger, description, strategy, balancer );
142         strategy.setDefaultShutdownHandler( newShutdownHandler() );
143         masterScheduler.register( this );
144     }
145 
146     /**
147      * @param logger          console logger
148      * @param description     JUnit description of class
149      * @param masterScheduler a reference to
150      * {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)}
151      *                        or {@link #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)}
152      * @param strategy        scheduling strategy
153      * @param concurrency     determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
154      *
155      * @see #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)
156      * @see #Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)
157      */
158     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
159                       SchedulingStrategy strategy, int concurrency )
160     {
161         this( logger, description, strategy, concurrency );
162         strategy.setDefaultShutdownHandler( newShutdownHandler() );
163         masterScheduler.register( this );
164     }
165 
166     /**
167      * Should be used with individual pools on suites, classes and methods, see
168      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
169      * <br>
170      * Cached thread pool is infinite and can be always shared.
171      *
172      * @param logger          console logger
173      * @param description     JUnit description of class
174      * @param masterScheduler parent scheduler
175      * @param strategy        scheduling strategy
176      */
177     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
178                       SchedulingStrategy strategy )
179     {
180         this( logger, description, masterScheduler, strategy, 0 );
181     }
182 
183     private void setController( Controller masterController )
184     {
185         if ( masterController == null )
186         {
187             throw new NullPointerException( "null ExecutionController" );
188         }
189         this.masterController = masterController;
190     }
191 
192     /**
193      * @param slave a slave scheduler to register
194      * @return {@code true} if successfully registered the <tt>slave</tt>.
195      */
196     private boolean register( Scheduler slave )
197     {
198         boolean canRegister = slave != null && slave != this;
199         if ( canRegister )
200         {
201             Controller controller = new Controller( slave );
202             canRegister = !slaves.contains( controller );
203             if ( canRegister )
204             {
205                 slaves.add( controller );
206                 slave.setController( controller );
207             }
208         }
209         return canRegister;
210     }
211 
212     /**
213      * @return {@code true} if new tasks can be scheduled.
214      */
215     private boolean canSchedule()
216     {
217         return !shutdown && ( masterController == null || masterController.canSchedule() );
218     }
219 
220     protected void logQuietly( Throwable t )
221     {
222         ByteArrayOutputStream out = new ByteArrayOutputStream();
223         PrintStream stream = new PrintStream( out );
224         try
225         {
226             t.printStackTrace( stream );
227         }
228         finally
229         {
230             stream.close();
231         }
232         logger.println( out.toString() );
233     }
234 
235     protected void logQuietly( String msg )
236     {
237         logger.println( msg );
238     }
239 
240     /**
241      * Attempts to stop all actively executing tasks and immediately returns a collection
242      * of descriptions of those tasks which have started prior to this call.
243      * <br>
244      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
245      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
246      *
247      * @param stopNow if {@code true} interrupts waiting test methods
248      * @return collection of descriptions started before shutting down
249      */
250     protected ShutdownResult describeStopped( boolean stopNow )
251     {
252         Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
253         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
254         stop( executedTests, incompleteTests, false, stopNow );
255         return new ShutdownResult( executedTests, incompleteTests );
256     }
257 
258     /**
259      * Stop/Shutdown/Interrupt scheduler and its children (if any).
260      *
261      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
262      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
263      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
264      *                            When the runner of
265      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
266      *                            is finished in
267      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
268      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
269      *                            See the unit test {@code ParallelComputerBuilder#timeoutAndForcedShutdown()}.
270      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
271      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
272      *                            {@link Thread#interrupt()}.
273      */
274     private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
275                        boolean tryCancelFutures, boolean stopNow )
276     {
277         shutdown = true;
278         try
279         {
280             if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
281             {
282                 if ( executedTests != null )
283                 {
284                     executedTests.add( description );
285                 }
286 
287                 if ( incompleteTests != null && !finished )
288                 {
289                     incompleteTests.add( description );
290                 }
291             }
292 
293             for ( Controller slave : slaves )
294             {
295                 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
296             }
297         }
298         finally
299         {
300             try
301             {
302                 balancer.releaseAllPermits();
303             }
304             finally
305             {
306                 if ( stopNow )
307                 {
308                     strategy.stopNow();
309                 }
310                 else if ( tryCancelFutures )
311                 {
312                     strategy.stop();
313                 }
314                 else
315                 {
316                     strategy.disable();
317                 }
318             }
319         }
320     }
321 
322     protected boolean shutdownThreadPoolsAwaitingKilled()
323     {
324         if ( masterController == null )
325         {
326             stop( null, null, true, false );
327             boolean isNotInterrupted = true;
328             if ( strategy != null )
329             {
330                 isNotInterrupted = strategy.destroy();
331             }
332             for ( Controller slave : slaves )
333             {
334                 isNotInterrupted &= slave.destroy();
335             }
336             return isNotInterrupted;
337         }
338         else
339         {
340             throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
341         }
342     }
343 
344     protected void beforeExecute()
345     {
346     }
347 
348     protected void afterExecute()
349     {
350     }
351 
352     @Override
353     public void schedule( Runnable childStatement )
354     {
355         if ( childStatement == null )
356         {
357             logQuietly( "cannot schedule null" );
358         }
359         else if ( canSchedule() && strategy.canSchedule() )
360         {
361             try
362             {
363                 boolean isNotInterrupted = balancer.acquirePermit();
364                 if ( isNotInterrupted && !shutdown )
365                 {
366                     Runnable task = wrapTask( childStatement );
367                     strategy.schedule( task );
368                     started = true;
369                 }
370             }
371             catch ( RejectedExecutionException e )
372             {
373                 stop( null, null, true, false );
374             }
375             catch ( Throwable t )
376             {
377                 balancer.releasePermit();
378                 logQuietly( t );
379             }
380         }
381     }
382 
383     @Override
384     public void finished()
385     {
386         try
387         {
388             strategy.finished();
389         }
390         catch ( InterruptedException e )
391         {
392             logQuietly( e );
393         }
394         finally
395         {
396             finished = true;
397         }
398     }
399 
400     private Runnable wrapTask( final Runnable task )
401     {
402         return new Runnable()
403         {
404             @Override
405             public void run()
406             {
407                 try
408                 {
409                     beforeExecute();
410                     task.run();
411                 }
412                 finally
413                 {
414                     try
415                     {
416                         afterExecute();
417                     }
418                     finally
419                     {
420                         balancer.releasePermit();
421                     }
422                 }
423             }
424         };
425     }
426 
427     protected ShutdownHandler newShutdownHandler()
428     {
429         return new ShutdownHandler();
430     }
431 
432     /**
433      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
434      */
435     private final class Controller
436     {
437         private final Scheduler slave;
438 
439         private Controller( Scheduler slave )
440         {
441             this.slave = slave;
442         }
443 
444         /**
445          * @return {@code true} if new children can be scheduled.
446          */
447         boolean canSchedule()
448         {
449             return Scheduler.this.canSchedule();
450         }
451 
452         void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
453                    boolean tryCancelFutures, boolean shutdownNow )
454         {
455             slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
456         }
457 
458         /**
459          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
460          */
461         boolean destroy()
462         {
463             return slave.strategy.destroy();
464         }
465 
466         @Override
467         public int hashCode()
468         {
469             return slave.hashCode();
470         }
471 
472         @Override
473         public boolean equals( Object o )
474         {
475             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
476         }
477     }
478 
479     /**
480      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
481      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
482      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
483      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
484      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
485      * schedulers recursively as well.
486      */
487     public class ShutdownHandler
488         implements RejectedExecutionHandler
489     {
490         private volatile RejectedExecutionHandler poolHandler;
491 
492         protected ShutdownHandler()
493         {
494             poolHandler = null;
495         }
496 
497         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
498         {
499             this.poolHandler = poolHandler;
500         }
501 
502         @Override
503         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
504         {
505             if ( executor.isShutdown() )
506             {
507                 Scheduler.this.stop( null, null, true, false );
508             }
509             final RejectedExecutionHandler poolHandler = this.poolHandler;
510             if ( poolHandler != null )
511             {
512                 poolHandler.rejectedExecution( r, executor );
513             }
514         }
515     }
516 }