View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.annotation.Annotation;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.EnumMap;
28  import java.util.Iterator;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.report.ConsoleStream;
39  import org.apache.maven.surefire.testset.TestSetFailedException;
40  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41  import org.junit.internal.runners.ErrorReportingRunner;
42  import org.junit.runner.Description;
43  import org.junit.runner.Runner;
44  import org.junit.runner.manipulation.Filter;
45  import org.junit.runner.manipulation.NoTestsRemainException;
46  import org.junit.runner.notification.RunNotifier;
47  import org.junit.runners.ParentRunner;
48  import org.junit.runners.Suite;
49  import org.junit.runners.model.InitializationError;
50  import org.junit.runners.model.RunnerBuilder;
51  
52  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58  
59  @SuppressWarnings( { "javadoc", "checkstyle:javadoctype" } )
60  /*
61   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
62   * the suites and classes can be reused in parallel methods.
63   * <pre>
64   * JUnitCoreParameters parameters = ...;
65   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
66   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
67   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
68   * Class<?>[] tests = {...};
69   * new JUnitCore().run(computer, tests);
70   * </pre>
71   * Note that the type has always at least one thread even if unspecified. The capacity in
72   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
73   * altogether.
74   * <br>
75   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
76   * {@code true}.
77   * <pre>
78   * computer.describeStopped(true);
79   * </pre>
80   *
81   * @author Tibor Digana (tibor17)
82   * @since 2.16
83   */
84  public final class ParallelComputerBuilder
85  {
86      private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
87  
88      private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
89  
90      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
91  
92      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
93  
94      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
95  
96      private final ConsoleStream logger;
97  
98      private boolean useSeparatePools;
99  
100     private int totalPoolSize;
101 
102     private JUnitCoreParameters parameters;
103 
104     private boolean optimize;
105 
106     private boolean runningInTests;
107 
108     /**
109      * Calling {@link #useSeparatePools()}.
110      * Can be used only in unit tests.
111      * Do NOT call this constructor in production.
112      */
113     ParallelComputerBuilder( ConsoleStream logger )
114     {
115         this.logger = logger;
116         runningInTests = true;
117         useSeparatePools();
118         parallelGroups.put( SUITES, 0 );
119         parallelGroups.put( CLASSES, 0 );
120         parallelGroups.put( METHODS, 0 );
121     }
122 
123     public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
124     {
125         this( logger );
126         runningInTests = false;
127         this.parameters = parameters;
128     }
129 
130     public ParallelComputer buildComputer()
131     {
132         return new PC();
133     }
134 
135     ParallelComputerBuilder useSeparatePools()
136     {
137         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138         useSeparatePools = true;
139         return this;
140     }
141 
142     ParallelComputerBuilder useOnePool()
143     {
144         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
145         useSeparatePools = false;
146         return this;
147     }
148 
149     /**
150      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
151      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
152      *                      limited.
153      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
154      */
155     ParallelComputerBuilder useOnePool( int totalPoolSize )
156     {
157         if ( totalPoolSize < 1 )
158         {
159             throw new IllegalArgumentException( "Size of common pool is less than 1." );
160         }
161         this.totalPoolSize = totalPoolSize;
162         useSeparatePools = false;
163         return this;
164     }
165 
166     boolean isOptimized()
167     {
168         return optimize;
169     }
170 
171     ParallelComputerBuilder optimize( boolean optimize )
172     {
173         this.optimize = optimize;
174         return this;
175     }
176 
177     ParallelComputerBuilder parallelSuites()
178     {
179         return parallel( SUITES );
180     }
181 
182     ParallelComputerBuilder parallelSuites( int nThreads )
183     {
184         return parallel( nThreads, SUITES );
185     }
186 
187     ParallelComputerBuilder parallelClasses()
188     {
189         return parallel( CLASSES );
190     }
191 
192     ParallelComputerBuilder parallelClasses( int nThreads )
193     {
194         return parallel( nThreads, CLASSES );
195     }
196 
197     ParallelComputerBuilder parallelMethods()
198     {
199         return parallel( METHODS );
200     }
201 
202     ParallelComputerBuilder parallelMethods( int nThreads )
203     {
204         return parallel( nThreads, METHODS );
205     }
206 
207     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
208     {
209         if ( nThreads < 0 )
210         {
211             throw new IllegalArgumentException( "negative nThreads " + nThreads );
212         }
213 
214         if ( parallelType == null )
215         {
216             throw new IllegalArgumentException( "null parallelType" );
217         }
218 
219         parallelGroups.put( parallelType, nThreads );
220         return this;
221     }
222 
223     private ParallelComputerBuilder parallel( Type parallelType )
224     {
225         return parallel( Integer.MAX_VALUE, parallelType );
226     }
227 
228     private double parallelTestsTimeoutInSeconds()
229     {
230         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
231     }
232 
233     private double parallelTestsTimeoutForcedInSeconds()
234     {
235         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
236     }
237 
238     @SuppressWarnings( "unchecked" )
239     private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
240     {
241         try
242         {
243             Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
244             return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
245         }
246         catch ( ClassNotFoundException e )
247         {
248             return null;
249         }
250     }
251 
252     final class PC
253         extends ParallelComputer
254     {
255         private final SingleThreadScheduler notThreadSafeTests =
256             new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
257 
258         private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
259 
260         private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
261 
262         private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
263 
264         private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
265 
266         private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
267 
268         private int poolCapacity;
269 
270         private boolean splitPool;
271 
272         private final Map<Type, Integer> allGroups;
273 
274         private long nestedClassesChildren;
275 
276         private volatile Scheduler master;
277 
278         private PC()
279         {
280             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
281             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
282             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
283             splitPool = ParallelComputerBuilder.this.useSeparatePools;
284         }
285 
286         Collection<ParentRunner> getSuites()
287         {
288             return suites;
289         }
290 
291         Collection<ParentRunner> getNestedSuites()
292         {
293             return nestedSuites;
294         }
295 
296         Collection<ParentRunner> getClasses()
297         {
298             return classes;
299         }
300 
301         Collection<ParentRunner> getNestedClasses()
302         {
303             return nestedClasses;
304         }
305 
306         Collection<Runner> getNotParallelRunners()
307         {
308             return notParallelRunners;
309         }
310 
311         int getPoolCapacity()
312         {
313             return poolCapacity;
314         }
315 
316         boolean isSplitPool()
317         {
318             return splitPool;
319         }
320 
321         @Override
322         protected ShutdownResult describeStopped( boolean shutdownNow )
323         {
324             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
325             final Scheduler m = master;
326             if ( m != null )
327             {
328                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
329                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
330                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
331             }
332             return shutdownResult;
333         }
334 
335         @Override
336         protected boolean shutdownThreadPoolsAwaitingKilled()
337         {
338             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
339             final Scheduler m = master;
340             if ( m != null )
341             {
342                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
343             }
344             return notInterrupted;
345         }
346 
347         @Override
348         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
349             throws InitializationError
350         {
351             try
352             {
353                 super.getSuite( builder, cls );
354                 populateChildrenFromSuites();
355 
356                 WrappedRunners suiteSuites = wrapRunners( suites );
357                 WrappedRunners suiteClasses = wrapRunners( classes );
358 
359                 long suitesCount = suites.size();
360                 long classesCount = classes.size() + nestedClasses.size();
361                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
362                 if ( !ParallelComputerBuilder.this.runningInTests )
363                 {
364                     determineThreadCounts( suitesCount, classesCount, methodsCount );
365                 }
366 
367                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
368             }
369             catch ( TestSetFailedException e )
370             {
371                 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
372             }
373         }
374 
375         @Override
376         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
377             throws Throwable
378         {
379             Runner runner = super.getRunner( builder, testClass );
380             if ( canSchedule( runner ) )
381             {
382                 if ( !isThreadSafe( runner ) )
383                 {
384                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
385                     notParallelRunners.add( runner );
386                 }
387                 else if ( runner instanceof Suite )
388                 {
389                     suites.add( (Suite) runner );
390                 }
391                 else
392                 {
393                     classes.add( (ParentRunner) runner );
394                 }
395             }
396             else
397             {
398                 notParallelRunners.add( runner );
399             }
400             return runner;
401         }
402 
403         private void determineThreadCounts( long suites, long classes, long methods )
404             throws TestSetFailedException
405         {
406             RunnerCounter counts =
407                     ParallelComputerBuilder.this.optimize ? new RunnerCounter( suites, classes, methods ) : null;
408             Concurrency concurrency =
409                     resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
410             allGroups.put( SUITES, concurrency.suites );
411             allGroups.put( CLASSES, concurrency.classes );
412             allGroups.put( METHODS, concurrency.methods );
413             poolCapacity = concurrency.capacity;
414             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
415         }
416 
417         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
418             throws InitializationError
419         {
420             // Do NOT use allGroups here.
421             long childrenCounter = 0;
422             ArrayList<Runner> runs = new ArrayList<Runner>();
423             for ( T runner : runners )
424             {
425                 if ( runner != null )
426                 {
427                     int children = countChildren( runner );
428                     childrenCounter += children;
429                     runs.add( runner );
430                 }
431             }
432             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
433         }
434 
435         private int countChildren( Runner runner )
436         {
437             Description description = runner.getDescription();
438             Collection children = description == null ? null : description.getChildren();
439             return children == null ? 0 : children.size();
440         }
441 
442         private ExecutorService createPool( int poolSize )
443         {
444             return poolSize < Integer.MAX_VALUE
445                 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
446                 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
447         }
448 
449         private Scheduler createMaster( ExecutorService pool, int poolSize )
450         {
451             // can be 0, 1, 2 or 3
452             final int finalRunnersCounter = countFinalRunners();
453 
454             final SchedulingStrategy strategy;
455             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
456             {
457                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
458             }
459             else if ( pool != null && poolSize == Integer.MAX_VALUE )
460             {
461                 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
462             }
463             else
464             {
465                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
466             }
467             return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
468         }
469 
470         private int countFinalRunners()
471         {
472             int counter = notParallelRunners.isEmpty() ? 0 : 1;
473 
474             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
475             {
476                 ++counter;
477             }
478 
479             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
480             {
481                 ++counter;
482             }
483 
484             return counter;
485         }
486 
487         private void populateChildrenFromSuites()
488         {
489             // Do NOT use allGroups here.
490             Filter filter = new SuiteFilter();
491             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
492             {
493                 ParentRunner suite = it.next();
494                 try
495                 {
496                     suite.filter( filter );
497                 }
498                 catch ( NoTestsRemainException e )
499                 {
500                     it.remove();
501                 }
502             }
503         }
504 
505         private int totalPoolSize()
506         {
507             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
508             {
509                 int total = 0;
510                 for ( int nThreads : allGroups.values() )
511                 {
512                     total += nThreads;
513                     if ( total < 0 )
514                     {
515                         total = Integer.MAX_VALUE;
516                         break;
517                     }
518                 }
519                 return total;
520             }
521             else
522             {
523                 return poolCapacity;
524             }
525         }
526 
527         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
528             throws InitializationError
529         {
530             int parallelSuites = allGroups.get( SUITES );
531             int parallelClasses = allGroups.get( CLASSES );
532             int parallelMethods = allGroups.get( METHODS );
533             int poolSize = totalPoolSize();
534             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
535             master = createMaster( commonPool, poolSize );
536 
537             if ( suiteSuites != null )
538             {
539                 // a scheduler for parallel suites
540                 if ( commonPool != null && parallelSuites > 0 )
541                 {
542                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
543                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
544                 }
545                 else
546                 {
547                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
548                 }
549             }
550 
551             // schedulers for parallel classes
552             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
553             allSuites.addAll( nestedSuites );
554             if ( suiteClasses != null )
555             {
556                 allSuites.add( suiteClasses );
557             }
558             if ( !allSuites.isEmpty() )
559             {
560                 setSchedulers( allSuites, parallelClasses, commonPool );
561             }
562 
563             // schedulers for parallel methods
564             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
565             allClasses.addAll( nestedClasses );
566             if ( !allClasses.isEmpty() )
567             {
568                 setSchedulers( allClasses, parallelMethods, commonPool );
569             }
570 
571             // resulting runner for Computer#getSuite() scheduled by master scheduler
572             ParentRunner all = createFinalRunner( removeNullRunners(
573                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
574             ) );
575             all.setScheduler( master );
576             return all;
577         }
578 
579         private ParentRunner createFinalRunner( List<Runner> runners )
580             throws InitializationError
581         {
582             return new Suite( null, runners )
583             {
584                 @Override
585                 public void run( RunNotifier notifier )
586                 {
587                     try
588                     {
589                         beforeRunQuietly();
590                         super.run( notifier );
591                     }
592                     finally
593                     {
594                         afterRunQuietly();
595                     }
596                 }
597             };
598         }
599 
600         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
601         {
602             if ( commonPool != null )
603             {
604                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
605                 boolean doParallel = poolSize > 0;
606                 for ( ParentRunner runner : runners )
607                 {
608                     runner.setScheduler(
609                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
610                 }
611             }
612             else
613             {
614                 ExecutorService pool = null;
615                 if ( poolSize == Integer.MAX_VALUE )
616                 {
617                     pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
618                 }
619                 else if ( poolSize > 0 )
620                 {
621                     pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
622                 }
623                 boolean doParallel = pool != null;
624                 for ( ParentRunner runner : runners )
625                 {
626                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
627                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
628                 }
629             }
630         }
631 
632         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
633                                            Balancer concurrency )
634         {
635             SchedulingStrategy strategy =
636                     doParallel & pool != null
637                     ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
638                     : new InvokerStrategy( ParallelComputerBuilder.this.logger );
639             return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
640         }
641 
642         private Scheduler createScheduler( int poolSize )
643         {
644             final SchedulingStrategy strategy;
645             if ( poolSize == Integer.MAX_VALUE )
646             {
647                 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
648             }
649             else if ( poolSize == 0 )
650             {
651                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
652             }
653             else
654             {
655                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
656             }
657             return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
658         }
659 
660         private boolean canSchedule( Runner runner )
661         {
662             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
663         }
664 
665         private boolean isThreadSafe( Runner runner )
666         {
667             return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
668         }
669 
670         private class SuiteFilter
671             extends Filter
672         {
673             // Do NOT use allGroups in SuiteFilter.
674 
675             @Override
676             public boolean shouldRun( Description description )
677             {
678                 return true;
679             }
680 
681             @Override
682             public void apply( Object child )
683                 throws NoTestsRemainException
684             {
685                 super.apply( child );
686                 if ( child instanceof ParentRunner )
687                 {
688                     ParentRunner runner = ( ParentRunner ) child;
689                     if ( !isThreadSafe( runner ) )
690                     {
691                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
692                     }
693                     else if ( child instanceof Suite )
694                     {
695                         nestedSuites.add( (Suite) child );
696                     }
697                     else
698                     {
699                         ParentRunner parentRunner = (ParentRunner) child;
700                         nestedClasses.add( parentRunner );
701                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
702                     }
703                 }
704             }
705 
706             @Override
707             public String describe()
708             {
709                 return "";
710             }
711         }
712     }
713 
714     private static Suite createSuite( Collection<Runner> runners )
715         throws InitializationError
716     {
717         final List<Runner> onlyRunners = removeNullRunners( runners );
718         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
719         {
720         };
721     }
722 
723     private static List<Runner> removeNullRunners( Collection<Runner> runners )
724     {
725         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
726         onlyRunners.removeAll( NULL_SINGLETON );
727         return onlyRunners;
728     }
729 }