View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.annotation.Annotation;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.EnumMap;
28  import java.util.Iterator;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.report.ConsoleStream;
39  import org.apache.maven.surefire.testset.TestSetFailedException;
40  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41  import org.junit.internal.runners.ErrorReportingRunner;
42  import org.junit.runner.Description;
43  import org.junit.runner.Runner;
44  import org.junit.runner.manipulation.Filter;
45  import org.junit.runner.manipulation.NoTestsRemainException;
46  import org.junit.runner.notification.RunNotifier;
47  import org.junit.runners.ParentRunner;
48  import org.junit.runners.Suite;
49  import org.junit.runners.model.InitializationError;
50  import org.junit.runners.model.RunnerBuilder;
51  
52  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58  
59  @SuppressWarnings( { "javadoc", "checkstyle:javadoctype" } )
60  /**
61   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
62   * the suites and classes can be reused in parallel methods.
63   * <pre>
64   * JUnitCoreParameters parameters = ...;
65   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
66   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
67   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
68   * Class<?>[] tests = {...};
69   * new JUnitCore().run(computer, tests);
70   * </pre>
71   * Note that the type has always at least one thread even if unspecified. The capacity in
72   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
73   * altogether.
74   * <br>
75   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
76   * {@code true}.
77   * <pre>
78   * computer.describeStopped(true);
79   * </pre>
80   *
81   * @author Tibor Digana (tibor17)
82   * @since 2.16
83   */
84  public final class ParallelComputerBuilder
85  {
86      private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
87  
88      private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
89  
90      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
91  
92      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
93  
94      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
95  
96      private final ConsoleStream logger;
97  
98      private boolean useSeparatePools;
99  
100     private int totalPoolSize;
101 
102     private JUnitCoreParameters parameters;
103 
104     private boolean optimize;
105 
106     private boolean runningInTests;
107 
108     /**
109      * Calling {@link #useSeparatePools()}.
110      * Can be used only in unit tests.
111      * Do NOT call this constructor in production.
112      */
113     ParallelComputerBuilder( ConsoleStream logger )
114     {
115         this.logger = logger;
116         runningInTests = true;
117         useSeparatePools();
118         parallelGroups.put( SUITES, 0 );
119         parallelGroups.put( CLASSES, 0 );
120         parallelGroups.put( METHODS, 0 );
121     }
122 
123     public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
124     {
125         this( logger );
126         runningInTests = false;
127         this.parameters = parameters;
128     }
129 
130     public ParallelComputer buildComputer()
131     {
132         return new PC();
133     }
134 
135     ParallelComputerBuilder useSeparatePools()
136     {
137         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138         useSeparatePools = true;
139         return this;
140     }
141 
142     ParallelComputerBuilder useOnePool()
143     {
144         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
145         useSeparatePools = false;
146         return this;
147     }
148 
149     /**
150      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
151      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
152      *                      limited.
153      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
154      */
155     ParallelComputerBuilder useOnePool( int totalPoolSize )
156     {
157         if ( totalPoolSize < 1 )
158         {
159             throw new IllegalArgumentException( "Size of common pool is less than 1." );
160         }
161         this.totalPoolSize = totalPoolSize;
162         useSeparatePools = false;
163         return this;
164     }
165 
166     boolean isOptimized()
167     {
168         return optimize;
169     }
170 
171     ParallelComputerBuilder optimize( boolean optimize )
172     {
173         this.optimize = optimize;
174         return this;
175     }
176 
177     ParallelComputerBuilder parallelSuites()
178     {
179         return parallel( SUITES );
180     }
181 
182     ParallelComputerBuilder parallelSuites( int nThreads )
183     {
184         return parallel( nThreads, SUITES );
185     }
186 
187     ParallelComputerBuilder parallelClasses()
188     {
189         return parallel( CLASSES );
190     }
191 
192     ParallelComputerBuilder parallelClasses( int nThreads )
193     {
194         return parallel( nThreads, CLASSES );
195     }
196 
197     ParallelComputerBuilder parallelMethods()
198     {
199         return parallel( METHODS );
200     }
201 
202     ParallelComputerBuilder parallelMethods( int nThreads )
203     {
204         return parallel( nThreads, METHODS );
205     }
206 
207     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
208     {
209         if ( nThreads < 0 )
210         {
211             throw new IllegalArgumentException( "negative nThreads " + nThreads );
212         }
213 
214         if ( parallelType == null )
215         {
216             throw new IllegalArgumentException( "null parallelType" );
217         }
218 
219         parallelGroups.put( parallelType, nThreads );
220         return this;
221     }
222 
223     private ParallelComputerBuilder parallel( Type parallelType )
224     {
225         return parallel( Integer.MAX_VALUE, parallelType );
226     }
227 
228     private double parallelTestsTimeoutInSeconds()
229     {
230         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
231     }
232 
233     private double parallelTestsTimeoutForcedInSeconds()
234     {
235         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
236     }
237 
238     @SuppressWarnings( "unchecked" )
239     private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
240     {
241         try
242         {
243             Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
244             return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
245         }
246         catch ( ClassNotFoundException e )
247         {
248             return null;
249         }
250     }
251 
252     final class PC
253         extends ParallelComputer
254     {
255         private final SingleThreadScheduler notThreadSafeTests =
256             new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
257 
258         private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
259 
260         private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
261 
262         private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
263 
264         private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
265 
266         private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
267 
268         private int poolCapacity;
269 
270         private boolean splitPool;
271 
272         private final Map<Type, Integer> allGroups;
273 
274         private long nestedClassesChildren;
275 
276         private volatile Scheduler master;
277 
278         private PC()
279         {
280             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
281             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
282             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
283             splitPool = ParallelComputerBuilder.this.useSeparatePools;
284         }
285 
286         Collection<ParentRunner> getSuites()
287         {
288             return suites;
289         }
290 
291         Collection<ParentRunner> getNestedSuites()
292         {
293             return nestedSuites;
294         }
295 
296         Collection<ParentRunner> getClasses()
297         {
298             return classes;
299         }
300 
301         Collection<ParentRunner> getNestedClasses()
302         {
303             return nestedClasses;
304         }
305 
306         Collection<Runner> getNotParallelRunners()
307         {
308             return notParallelRunners;
309         }
310 
311         int getPoolCapacity()
312         {
313             return poolCapacity;
314         }
315 
316         boolean isSplitPool()
317         {
318             return splitPool;
319         }
320 
321         @Override
322         protected ShutdownResult describeStopped( boolean shutdownNow )
323         {
324             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
325             final Scheduler m = master;
326             if ( m != null )
327             {
328                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
329                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
330                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
331             }
332             return shutdownResult;
333         }
334 
335         @Override
336         boolean shutdownThreadPoolsAwaitingKilled()
337         {
338             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
339             final Scheduler m = master;
340             if ( m != null )
341             {
342                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
343             }
344             return notInterrupted;
345         }
346 
347         @Override
348         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
349             throws InitializationError
350         {
351             try
352             {
353                 super.getSuite( builder, cls );
354                 populateChildrenFromSuites();
355 
356                 WrappedRunners suiteSuites = wrapRunners( suites );
357                 WrappedRunners suiteClasses = wrapRunners( classes );
358 
359                 long suitesCount = suites.size();
360                 long classesCount = classes.size() + nestedClasses.size();
361                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
362                 if ( !ParallelComputerBuilder.this.runningInTests )
363                 {
364                     determineThreadCounts( suitesCount, classesCount, methodsCount );
365                 }
366 
367                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
368             }
369             catch ( TestSetFailedException e )
370             {
371                 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
372             }
373         }
374 
375         @Override
376         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
377             throws Throwable
378         {
379             Runner runner = super.getRunner( builder, testClass );
380             if ( canSchedule( runner ) )
381             {
382                 if ( !isThreadSafe( runner ) )
383                 {
384                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
385                     notParallelRunners.add( runner );
386                 }
387                 else if ( runner instanceof Suite )
388                 {
389                     suites.add( (Suite) runner );
390                 }
391                 else
392                 {
393                     classes.add( (ParentRunner) runner );
394                 }
395             }
396             else
397             {
398                 notParallelRunners.add( runner );
399             }
400             return runner;
401         }
402 
403         private void determineThreadCounts( long suites, long classes, long methods )
404             throws TestSetFailedException
405         {
406             RunnerCounter counts = null;
407             if ( ParallelComputerBuilder.this.optimize )
408             {
409                 counts = new RunnerCounter( suites, classes, methods );
410             }
411             Concurrency concurrency =
412                     resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
413             allGroups.put( SUITES, concurrency.suites );
414             allGroups.put( CLASSES, concurrency.classes );
415             allGroups.put( METHODS, concurrency.methods );
416             poolCapacity = concurrency.capacity;
417             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
418         }
419 
420         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
421             throws InitializationError
422         {
423             // Do NOT use allGroups here.
424             long childrenCounter = 0;
425             ArrayList<Runner> runs = new ArrayList<Runner>();
426             for ( T runner : runners )
427             {
428                 if ( runner != null )
429                 {
430                     int children = countChildren( runner );
431                     childrenCounter += children;
432                     runs.add( runner );
433                 }
434             }
435             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
436         }
437 
438         private int countChildren( Runner runner )
439         {
440             Description description = runner.getDescription();
441             Collection children = description == null ? null : description.getChildren();
442             return children == null ? 0 : children.size();
443         }
444 
445         private ExecutorService createPool( int poolSize )
446         {
447             return poolSize < Integer.MAX_VALUE
448                 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
449                 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
450         }
451 
452         private Scheduler createMaster( ExecutorService pool, int poolSize )
453         {
454             // can be 0, 1, 2 or 3
455             final int finalRunnersCounter = countFinalRunners();
456 
457             final SchedulingStrategy strategy;
458             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
459             {
460                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
461             }
462             else if ( pool != null && poolSize == Integer.MAX_VALUE )
463             {
464                 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
465             }
466             else
467             {
468                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
469             }
470             return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
471         }
472 
473         private int countFinalRunners()
474         {
475             int counter = notParallelRunners.isEmpty() ? 0 : 1;
476 
477             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
478             {
479                 ++counter;
480             }
481 
482             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
483             {
484                 ++counter;
485             }
486 
487             return counter;
488         }
489 
490         private void populateChildrenFromSuites()
491         {
492             // Do NOT use allGroups here.
493             Filter filter = new SuiteFilter();
494             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
495             {
496                 ParentRunner suite = it.next();
497                 try
498                 {
499                     suite.filter( filter );
500                 }
501                 catch ( NoTestsRemainException e )
502                 {
503                     it.remove();
504                 }
505             }
506         }
507 
508         private int totalPoolSize()
509         {
510             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
511             {
512                 int total = 0;
513                 for ( int nThreads : allGroups.values() )
514                 {
515                     total += nThreads;
516                     if ( total < 0 )
517                     {
518                         total = Integer.MAX_VALUE;
519                         break;
520                     }
521                 }
522                 return total;
523             }
524             else
525             {
526                 return poolCapacity;
527             }
528         }
529 
530         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
531             throws InitializationError
532         {
533             int parallelSuites = allGroups.get( SUITES );
534             int parallelClasses = allGroups.get( CLASSES );
535             int parallelMethods = allGroups.get( METHODS );
536             int poolSize = totalPoolSize();
537             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
538             master = createMaster( commonPool, poolSize );
539 
540             if ( suiteSuites != null )
541             {
542                 // a scheduler for parallel suites
543                 if ( commonPool != null && parallelSuites > 0 )
544                 {
545                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
546                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
547                 }
548                 else
549                 {
550                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
551                 }
552             }
553 
554             // schedulers for parallel classes
555             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
556             allSuites.addAll( nestedSuites );
557             if ( suiteClasses != null )
558             {
559                 allSuites.add( suiteClasses );
560             }
561             if ( !allSuites.isEmpty() )
562             {
563                 setSchedulers( allSuites, parallelClasses, commonPool );
564             }
565 
566             // schedulers for parallel methods
567             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
568             allClasses.addAll( nestedClasses );
569             if ( !allClasses.isEmpty() )
570             {
571                 setSchedulers( allClasses, parallelMethods, commonPool );
572             }
573 
574             // resulting runner for Computer#getSuite() scheduled by master scheduler
575             ParentRunner all = createFinalRunner( removeNullRunners(
576                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
577             ) );
578             all.setScheduler( master );
579             return all;
580         }
581 
582         private ParentRunner createFinalRunner( List<Runner> runners )
583             throws InitializationError
584         {
585             return new Suite( null, runners )
586             {
587                 @Override
588                 public void run( RunNotifier notifier )
589                 {
590                     try
591                     {
592                         beforeRunQuietly();
593                         super.run( notifier );
594                     }
595                     finally
596                     {
597                         afterRunQuietly();
598                     }
599                 }
600             };
601         }
602 
603         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
604         {
605             if ( commonPool != null )
606             {
607                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
608                 boolean doParallel = poolSize > 0;
609                 for ( ParentRunner runner : runners )
610                 {
611                     runner.setScheduler(
612                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
613                 }
614             }
615             else
616             {
617                 ExecutorService pool = null;
618                 if ( poolSize == Integer.MAX_VALUE )
619                 {
620                     pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
621                 }
622                 else if ( poolSize > 0 )
623                 {
624                     pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
625                 }
626                 boolean doParallel = pool != null;
627                 for ( ParentRunner runner : runners )
628                 {
629                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
630                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
631                 }
632             }
633         }
634 
635         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
636                                            Balancer concurrency )
637         {
638             SchedulingStrategy strategy =
639                     doParallel & pool != null
640                     ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
641                     : new InvokerStrategy( ParallelComputerBuilder.this.logger );
642             return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
643         }
644 
645         private Scheduler createScheduler( int poolSize )
646         {
647             final SchedulingStrategy strategy;
648             if ( poolSize == Integer.MAX_VALUE )
649             {
650                 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
651             }
652             else if ( poolSize == 0 )
653             {
654                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
655             }
656             else
657             {
658                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
659             }
660             return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
661         }
662 
663         private boolean canSchedule( Runner runner )
664         {
665             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
666         }
667 
668         private boolean isThreadSafe( Runner runner )
669         {
670             return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
671         }
672 
673         private class SuiteFilter
674             extends Filter
675         {
676             // Do NOT use allGroups in SuiteFilter.
677 
678             @Override
679             public boolean shouldRun( Description description )
680             {
681                 return true;
682             }
683 
684             @Override
685             public void apply( Object child )
686                 throws NoTestsRemainException
687             {
688                 super.apply( child );
689                 if ( child instanceof ParentRunner )
690                 {
691                     ParentRunner runner = ( ParentRunner ) child;
692                     if ( !isThreadSafe( runner ) )
693                     {
694                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
695                     }
696                     else if ( child instanceof Suite )
697                     {
698                         nestedSuites.add( (Suite) child );
699                     }
700                     else
701                     {
702                         ParentRunner parentRunner = (ParentRunner) child;
703                         nestedClasses.add( parentRunner );
704                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
705                     }
706                 }
707             }
708 
709             @Override
710             public String describe()
711             {
712                 return "";
713             }
714         }
715     }
716 
717     private static Suite createSuite( Collection<Runner> runners )
718         throws InitializationError
719     {
720         final List<Runner> onlyRunners = removeNullRunners( runners );
721         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
722         {
723         };
724     }
725 
726     private static List<Runner> removeNullRunners( Collection<Runner> runners )
727     {
728         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
729         onlyRunners.removeAll( NULL_SINGLETON );
730         return onlyRunners;
731     }
732 }