View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.EnumMap;
27  import java.util.Iterator;
28  import java.util.LinkedHashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import net.jcip.annotations.NotThreadSafe;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.report.ConsoleLogger;
39  import org.apache.maven.surefire.testset.TestSetFailedException;
40  import org.junit.internal.runners.ErrorReportingRunner;
41  import org.junit.runner.Description;
42  import org.junit.runner.Runner;
43  import org.junit.runner.manipulation.Filter;
44  import org.junit.runner.manipulation.NoTestsRemainException;
45  import org.junit.runner.notification.RunNotifier;
46  import org.junit.runners.ParentRunner;
47  import org.junit.runners.Suite;
48  import org.junit.runners.model.InitializationError;
49  import org.junit.runners.model.RunnerBuilder;
50  
51  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
52  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
53  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
54  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
55  
56  /**
57   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
58   * the suites and classes can be reused in parallel methods.
59   * <pre>
60   * JUnitCoreParameters parameters = ...;
61   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
62   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
63   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
64   * Class<?>[] tests = {...};
65   * new JUnitCore().run(computer, tests);
66   * </pre>
67   * Note that the type has always at least one thread even if unspecified. The capacity in
68   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
69   * altogether.
70   * <p/>
71   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
72   * <tt>true</tt>.
73   * <pre>
74   * computer.describeStopped(true);
75   * </pre>
76   *
77   * @author Tibor Digana (tibor17)
78   * @since 2.16
79   */
80  public final class ParallelComputerBuilder
81  {
82      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
83  
84      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
85  
86      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
87  
88      private final ConsoleLogger logger;
89  
90      private boolean useSeparatePools;
91  
92      private int totalPoolSize;
93  
94      private JUnitCoreParameters parameters;
95  
96      private boolean optimize;
97  
98      private boolean runningInTests;
99  
100     /**
101      * Calling {@link #useSeparatePools()}.
102      * Can be used only in unit tests.
103      * Do NOT call this constructor in production.
104      */
105     ParallelComputerBuilder( ConsoleLogger logger )
106     {
107         this.logger = logger;
108         runningInTests = true;
109         useSeparatePools();
110         parallelGroups.put( SUITES, 0 );
111         parallelGroups.put( CLASSES, 0 );
112         parallelGroups.put( METHODS, 0 );
113     }
114 
115     public ParallelComputerBuilder( ConsoleLogger logger, JUnitCoreParameters parameters )
116     {
117         this( logger );
118         runningInTests = false;
119         this.parameters = parameters;
120     }
121 
122     public ParallelComputer buildComputer()
123     {
124         return new PC();
125     }
126 
127     ParallelComputerBuilder useSeparatePools()
128     {
129         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
130         useSeparatePools = true;
131         return this;
132     }
133 
134     ParallelComputerBuilder useOnePool()
135     {
136         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
137         useSeparatePools = false;
138         return this;
139     }
140 
141     /**
142      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
143      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
144      *                      limited.
145      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
146      */
147     ParallelComputerBuilder useOnePool( int totalPoolSize )
148     {
149         if ( totalPoolSize < 1 )
150         {
151             throw new IllegalArgumentException( "Size of common pool is less than 1." );
152         }
153         this.totalPoolSize = totalPoolSize;
154         useSeparatePools = false;
155         return this;
156     }
157 
158     boolean isOptimized()
159     {
160         return optimize;
161     }
162 
163     ParallelComputerBuilder optimize( boolean optimize )
164     {
165         this.optimize = optimize;
166         return this;
167     }
168 
169     ParallelComputerBuilder parallelSuites()
170     {
171         return parallel( SUITES );
172     }
173 
174     ParallelComputerBuilder parallelSuites( int nThreads )
175     {
176         return parallel( nThreads, SUITES );
177     }
178 
179     ParallelComputerBuilder parallelClasses()
180     {
181         return parallel( CLASSES );
182     }
183 
184     ParallelComputerBuilder parallelClasses( int nThreads )
185     {
186         return parallel( nThreads, CLASSES );
187     }
188 
189     ParallelComputerBuilder parallelMethods()
190     {
191         return parallel( METHODS );
192     }
193 
194     ParallelComputerBuilder parallelMethods( int nThreads )
195     {
196         return parallel( nThreads, METHODS );
197     }
198 
199     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
200     {
201         if ( nThreads < 0 )
202         {
203             throw new IllegalArgumentException( "negative nThreads " + nThreads );
204         }
205 
206         if ( parallelType == null )
207         {
208             throw new NullPointerException( "null parallelType" );
209         }
210 
211         parallelGroups.put( parallelType, nThreads );
212         return this;
213     }
214 
215     private ParallelComputerBuilder parallel( Type parallelType )
216     {
217         return parallel( Integer.MAX_VALUE, parallelType );
218     }
219 
220     private double parallelTestsTimeoutInSeconds()
221     {
222         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
223     }
224 
225     private double parallelTestsTimeoutForcedInSeconds()
226     {
227         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
228     }
229 
230     final class PC
231         extends ParallelComputer
232     {
233         private final SingleThreadScheduler notThreadSafeTests =
234             new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
235 
236         final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
237 
238         final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
239 
240         final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
241 
242         final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
243 
244         final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
245 
246         int poolCapacity;
247 
248         boolean splitPool;
249 
250         private final Map<Type, Integer> allGroups;
251 
252         private long nestedClassesChildren;
253 
254         private volatile Scheduler master;
255 
256         private PC()
257         {
258             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
259             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
260             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
261             splitPool = ParallelComputerBuilder.this.useSeparatePools;
262         }
263 
264         @Override
265         protected ShutdownResult describeStopped( boolean shutdownNow )
266         {
267             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
268             final Scheduler m = master;
269             if ( m != null )
270             {
271                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
272                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
273                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
274             }
275             return shutdownResult;
276         }
277 
278         @Override
279         boolean shutdownThreadPoolsAwaitingKilled()
280         {
281             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
282             final Scheduler m = master;
283             if ( m != null )
284             {
285                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
286             }
287             return notInterrupted;
288         }
289 
290         @Override
291         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
292             throws InitializationError
293         {
294             try
295             {
296                 super.getSuite( builder, cls );
297                 populateChildrenFromSuites();
298 
299                 WrappedRunners suiteSuites = wrapRunners( suites );
300                 WrappedRunners suiteClasses = wrapRunners( classes );
301 
302                 long suitesCount = suites.size();
303                 long classesCount = classes.size() + nestedClasses.size();
304                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
305                 if ( !ParallelComputerBuilder.this.runningInTests )
306                 {
307                     determineThreadCounts( suitesCount, classesCount, methodsCount );
308                 }
309 
310                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
311             }
312             catch ( TestSetFailedException e )
313             {
314                 throw new InitializationError( e );
315             }
316         }
317 
318         @Override
319         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
320             throws Throwable
321         {
322             Runner runner = super.getRunner( builder, testClass );
323             if ( canSchedule( runner ) )
324             {
325                 if ( !isThreadSafe( runner ) )
326                 {
327                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
328                     notParallelRunners.add( runner );
329                 }
330                 else if ( runner instanceof Suite )
331                 {
332                     suites.add( (Suite) runner );
333                 }
334                 else
335                 {
336                     classes.add( (ParentRunner) runner );
337                 }
338             }
339             else
340             {
341                 notParallelRunners.add( runner );
342             }
343             return runner;
344         }
345 
346         private void determineThreadCounts( long suites, long classes, long methods )
347             throws TestSetFailedException
348         {
349             final JUnitCoreParameters parameters = ParallelComputerBuilder.this.parameters;
350             final boolean optimize = ParallelComputerBuilder.this.optimize;
351             RunnerCounter counts = new RunnerCounter( suites, classes, methods );
352             Concurrency concurrency = resolveConcurrency( parameters, optimize ? counts : null );
353             allGroups.put( SUITES, concurrency.suites );
354             allGroups.put( CLASSES, concurrency.classes );
355             allGroups.put( METHODS, concurrency.methods );
356             poolCapacity = concurrency.capacity;
357             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
358         }
359 
360         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
361             throws InitializationError
362         {
363             // Do NOT use allGroups here.
364             long childrenCounter = 0;
365             ArrayList<Runner> runs = new ArrayList<Runner>();
366             for ( T runner : runners )
367             {
368                 if ( runner != null )
369                 {
370                     int children = countChildren( runner );
371                     childrenCounter += children;
372                     if ( children != 0 )
373                     {
374                         runs.add( runner );
375                     }
376                 }
377             }
378             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
379         }
380 
381         private int countChildren( Runner runner )
382         {
383             Description description = runner.getDescription();
384             Collection children = description == null ? null : description.getChildren();
385             return children == null ? 0 : children.size();
386         }
387 
388         private ExecutorService createPool( int poolSize )
389         {
390             return poolSize < Integer.MAX_VALUE
391                 ? Executors.newFixedThreadPool( poolSize )
392                 : Executors.newCachedThreadPool();
393         }
394 
395         private Scheduler createMaster( ExecutorService pool, int poolSize )
396         {
397             final int finalRunnersCounter = countFinalRunners(); // can be 0, 1, 2 or 3
398             final SchedulingStrategy strategy;
399             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
400             {
401                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
402             }
403             else if ( pool != null && poolSize == Integer.MAX_VALUE )
404             {
405                 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
406             }
407             else
408             {
409                 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger,
410                                                                         finalRunnersCounter );
411             }
412             return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
413         }
414 
415         private int countFinalRunners()
416         {
417             int counter = notParallelRunners.isEmpty() ? 0 : 1;
418 
419             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
420             {
421                 ++counter;
422             }
423 
424             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
425             {
426                 ++counter;
427             }
428 
429             return counter;
430         }
431 
432         private void populateChildrenFromSuites()
433         {
434             // Do NOT use allGroups here.
435             Filter filter = new SuiteFilter();
436             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
437             {
438                 ParentRunner suite = it.next();
439                 try
440                 {
441                     suite.filter( filter );
442                 }
443                 catch ( NoTestsRemainException e )
444                 {
445                     it.remove();
446                 }
447             }
448         }
449 
450         private int totalPoolSize()
451         {
452             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
453             {
454                 int total = 0;
455                 for ( int nThreads : allGroups.values() )
456                 {
457                     total += nThreads;
458                     if ( total < 0 )
459                     {
460                         total = Integer.MAX_VALUE;
461                         break;
462                     }
463                 }
464                 return total;
465             }
466             else
467             {
468                 return poolCapacity;
469             }
470         }
471 
472         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
473             throws InitializationError
474         {
475             int parallelSuites = allGroups.get( SUITES );
476             int parallelClasses = allGroups.get( CLASSES );
477             int parallelMethods = allGroups.get( METHODS );
478             int poolSize = totalPoolSize();
479             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
480             master = createMaster( commonPool, poolSize );
481 
482             if ( suiteSuites != null )
483             {
484                 // a scheduler for parallel suites
485                 if ( commonPool != null && parallelSuites > 0 )
486                 {
487                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
488                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
489                 }
490                 else
491                 {
492                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
493                 }
494             }
495 
496             // schedulers for parallel classes
497             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
498             allSuites.addAll( nestedSuites );
499             if ( suiteClasses != null )
500             {
501                 allSuites.add( suiteClasses );
502             }
503             if ( !allSuites.isEmpty() )
504             {
505                 setSchedulers( allSuites, parallelClasses, commonPool );
506             }
507 
508             // schedulers for parallel methods
509             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
510             allClasses.addAll( nestedClasses );
511             if ( !allClasses.isEmpty() )
512             {
513                 setSchedulers( allClasses, parallelMethods, commonPool );
514             }
515 
516             // resulting runner for Computer#getSuite() scheduled by master scheduler
517             ParentRunner all = createFinalRunner( removeNullRunners(
518                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
519             ) );
520             all.setScheduler( master );
521             return all;
522         }
523 
524         private ParentRunner createFinalRunner( List<Runner> runners )
525             throws InitializationError
526         {
527             return new Suite( null, runners )
528             {
529                 @Override
530                 public void run( RunNotifier notifier )
531                 {
532                     try
533                     {
534                         beforeRunQuietly();
535                         super.run( notifier );
536                     }
537                     finally
538                     {
539                         afterRunQuietly();
540                     }
541                 }
542             };
543         }
544 
545         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
546         {
547             if ( commonPool != null )
548             {
549                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
550                 boolean doParallel = poolSize > 0;
551                 for ( ParentRunner runner : runners )
552                 {
553                     runner.setScheduler(
554                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
555                 }
556             }
557             else
558             {
559                 ExecutorService pool = null;
560                 if ( poolSize == Integer.MAX_VALUE )
561                 {
562                     pool = Executors.newCachedThreadPool();
563                 }
564                 else if ( poolSize > 0 )
565                 {
566                     pool = Executors.newFixedThreadPool( poolSize );
567                 }
568                 boolean doParallel = pool != null;
569                 for ( ParentRunner runner : runners )
570                 {
571                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
572                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
573                 }
574             }
575         }
576 
577         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
578                                            Balancer concurrency )
579         {
580             doParallel &= pool != null;
581             SchedulingStrategy strategy = doParallel
582                     ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
583                     : new InvokerStrategy( ParallelComputerBuilder.this.logger );
584             return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
585         }
586 
587         private Scheduler createScheduler( int poolSize )
588         {
589             final SchedulingStrategy strategy;
590             if ( poolSize == Integer.MAX_VALUE )
591             {
592                 strategy = SchedulingStrategies.createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
593             }
594             else if ( poolSize == 0 )
595             {
596                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
597             }
598             else
599             {
600                 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
601             }
602             return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
603         }
604 
605         private boolean canSchedule( Runner runner )
606         {
607             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
608         }
609 
610         private boolean isThreadSafe( Runner runner )
611         {
612             return runner.getDescription().getAnnotation( NotThreadSafe.class ) == null;
613         }
614 
615         private class SuiteFilter
616             extends Filter
617         {
618             // Do NOT use allGroups in SuiteFilter.
619 
620             @Override
621             public boolean shouldRun( Description description )
622             {
623                 return true;
624             }
625 
626             @Override
627             public void apply( Object child )
628                 throws NoTestsRemainException
629             {
630                 super.apply( child );
631                 if ( child instanceof ParentRunner )
632                 {
633                     ParentRunner runner = ( ParentRunner ) child;
634                     if ( !isThreadSafe( runner ) )
635                     {
636                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
637                     }
638                     else if ( child instanceof Suite )
639                     {
640                         nestedSuites.add( (Suite) child );
641                     }
642                     else
643                     {
644                         ParentRunner parentRunner = (ParentRunner) child;
645                         nestedClasses.add( parentRunner );
646                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
647                     }
648                 }
649             }
650 
651             @Override
652             public String describe()
653             {
654                 return "";
655             }
656         }
657     }
658 
659     private static Suite createSuite( Collection<Runner> runners )
660         throws InitializationError
661     {
662         final List<Runner> onlyRunners = removeNullRunners( runners );
663         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
664         {
665         };
666     }
667 
668     private static List<Runner> removeNullRunners( Collection<Runner> runners )
669     {
670         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
671         onlyRunners.removeAll( NULL_SINGLETON );
672         return onlyRunners;
673     }
674 }