View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.EnumMap;
27  import java.util.Iterator;
28  import java.util.LinkedHashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import net.jcip.annotations.NotThreadSafe;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.testset.TestSetFailedException;
39  import org.junit.internal.runners.ErrorReportingRunner;
40  import org.junit.runner.Description;
41  import org.junit.runner.Runner;
42  import org.junit.runner.manipulation.Filter;
43  import org.junit.runner.manipulation.NoTestsRemainException;
44  import org.junit.runner.notification.RunNotifier;
45  import org.junit.runners.ParentRunner;
46  import org.junit.runners.Suite;
47  import org.junit.runners.model.InitializationError;
48  import org.junit.runners.model.RunnerBuilder;
49  
50  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
51  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
52  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
53  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
54  
55  /**
56   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
57   * the suites and classes can be reused in parallel methods.
58   * <pre>
59   * JUnitCoreParameters parameters = ...;
60   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
61   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
62   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
63   * Class<?>[] tests = {...};
64   * new JUnitCore().run(computer, tests);
65   * </pre>
66   * Note that the type has always at least one thread even if unspecified. The capacity in
67   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
68   * altogether.
69   * <p/>
70   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
71   * <tt>true</tt>.
72   * <pre>
73   * computer.describeStopped(true);
74   * </pre>
75   *
76   * @author Tibor Digana (tibor17)
77   * @since 2.16
78   */
79  public final class ParallelComputerBuilder
80  {
81      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
82  
83      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
84  
85      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
86  
87      private boolean useSeparatePools;
88  
89      private int totalPoolSize;
90  
91      private JUnitCoreParameters parameters;
92  
93      private boolean optimize;
94  
95      private boolean runningInTests;
96  
97      /**
98       * Calling {@link #useSeparatePools()}.
99       * Can be used only in unit tests.
100      * Do NOT call this constructor in production.
101      */
102     ParallelComputerBuilder()
103     {
104         runningInTests = true;
105         useSeparatePools();
106         parallelGroups.put( SUITES, 0 );
107         parallelGroups.put( CLASSES, 0 );
108         parallelGroups.put( METHODS, 0 );
109     }
110 
111     public ParallelComputerBuilder( JUnitCoreParameters parameters )
112     {
113         this();
114         runningInTests = false;
115         this.parameters = parameters;
116     }
117 
118     public ParallelComputer buildComputer()
119     {
120         return new PC();
121     }
122 
123     ParallelComputerBuilder useSeparatePools()
124     {
125         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
126         useSeparatePools = true;
127         return this;
128     }
129 
130     ParallelComputerBuilder useOnePool()
131     {
132         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
133         useSeparatePools = false;
134         return this;
135     }
136 
137     /**
138      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
139      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
140      *                      limited.
141      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
142      */
143     ParallelComputerBuilder useOnePool( int totalPoolSize )
144     {
145         if ( totalPoolSize < 1 )
146         {
147             throw new IllegalArgumentException( "Size of common pool is less than 1." );
148         }
149         this.totalPoolSize = totalPoolSize;
150         useSeparatePools = false;
151         return this;
152     }
153 
154     boolean isOptimized()
155     {
156         return optimize;
157     }
158 
159     ParallelComputerBuilder optimize( boolean optimize )
160     {
161         this.optimize = optimize;
162         return this;
163     }
164 
165     ParallelComputerBuilder parallelSuites()
166     {
167         return parallel( SUITES );
168     }
169 
170     ParallelComputerBuilder parallelSuites( int nThreads )
171     {
172         return parallel( nThreads, SUITES );
173     }
174 
175     ParallelComputerBuilder parallelClasses()
176     {
177         return parallel( CLASSES );
178     }
179 
180     ParallelComputerBuilder parallelClasses( int nThreads )
181     {
182         return parallel( nThreads, CLASSES );
183     }
184 
185     ParallelComputerBuilder parallelMethods()
186     {
187         return parallel( METHODS );
188     }
189 
190     ParallelComputerBuilder parallelMethods( int nThreads )
191     {
192         return parallel( nThreads, METHODS );
193     }
194 
195     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
196     {
197         if ( nThreads < 0 )
198         {
199             throw new IllegalArgumentException( "negative nThreads " + nThreads );
200         }
201 
202         if ( parallelType == null )
203         {
204             throw new NullPointerException( "null parallelType" );
205         }
206 
207         parallelGroups.put( parallelType, nThreads );
208         return this;
209     }
210 
211     private ParallelComputerBuilder parallel( Type parallelType )
212     {
213         return parallel( Integer.MAX_VALUE, parallelType );
214     }
215 
216     private double parallelTestsTimeoutInSeconds()
217     {
218         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
219     }
220 
221     private double parallelTestsTimeoutForcedInSeconds()
222     {
223         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
224     }
225 
226     final class PC
227         extends ParallelComputer
228     {
229         private final SingleThreadScheduler notThreadSafeTests = new SingleThreadScheduler();
230 
231         final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
232 
233         final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
234 
235         final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
236 
237         final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
238 
239         final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
240 
241         int poolCapacity;
242 
243         boolean splitPool;
244 
245         private final Map<Type, Integer> allGroups;
246 
247         private long nestedClassesChildren;
248 
249         private volatile Scheduler master;
250 
251         private PC()
252         {
253             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
254             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
255             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
256             splitPool = ParallelComputerBuilder.this.useSeparatePools;
257         }
258 
259         @Override
260         protected ShutdownResult describeStopped( boolean shutdownNow )
261         {
262             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
263             final Scheduler m = master;
264             if ( m != null )
265             {
266                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
267                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
268                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
269             }
270             return shutdownResult;
271         }
272 
273         @Override
274         boolean shutdownThreadPoolsAwaitingKilled()
275         {
276             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
277             final Scheduler m = master;
278             if ( m != null )
279             {
280                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
281             }
282             return notInterrupted;
283         }
284 
285         @Override
286         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
287             throws InitializationError
288         {
289             try
290             {
291                 super.getSuite( builder, cls );
292                 populateChildrenFromSuites();
293 
294                 WrappedRunners suiteSuites = wrapRunners( suites );
295                 WrappedRunners suiteClasses = wrapRunners( classes );
296 
297                 long suitesCount = suites.size();
298                 long classesCount = classes.size() + nestedClasses.size();
299                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
300                 if ( !ParallelComputerBuilder.this.runningInTests )
301                 {
302                     determineThreadCounts( suitesCount, classesCount, methodsCount );
303                 }
304 
305                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
306             }
307             catch ( TestSetFailedException e )
308             {
309                 throw new InitializationError( e );
310             }
311         }
312 
313         @Override
314         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
315             throws Throwable
316         {
317             Runner runner = super.getRunner( builder, testClass );
318             if ( canSchedule( runner ) )
319             {
320                 if ( !isThreadSafe( runner ) )
321                 {
322                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
323                     notParallelRunners.add( runner );
324                 }
325                 else if ( runner instanceof Suite )
326                 {
327                     suites.add( (Suite) runner );
328                 }
329                 else
330                 {
331                     classes.add( (ParentRunner) runner );
332                 }
333             }
334             else
335             {
336                 notParallelRunners.add( runner );
337             }
338             return runner;
339         }
340 
341         private void determineThreadCounts( long suites, long classes, long methods )
342             throws TestSetFailedException
343         {
344             final JUnitCoreParameters parameters = ParallelComputerBuilder.this.parameters;
345             final boolean optimize = ParallelComputerBuilder.this.optimize;
346             RunnerCounter counts = new RunnerCounter( suites, classes, methods );
347             Concurrency concurrency = resolveConcurrency( parameters, optimize ? counts : null );
348             allGroups.put( SUITES, concurrency.suites );
349             allGroups.put( CLASSES, concurrency.classes );
350             allGroups.put( METHODS, concurrency.methods );
351             poolCapacity = concurrency.capacity;
352             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
353         }
354 
355         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
356             throws InitializationError
357         {
358             // Do NOT use allGroups here.
359             long childrenCounter = 0;
360             ArrayList<Runner> runs = new ArrayList<Runner>();
361             for ( T runner : runners )
362             {
363                 if ( runner != null )
364                 {
365                     int children = countChildren( runner );
366                     childrenCounter += children;
367                     if ( children != 0 )
368                     {
369                         runs.add( runner );
370                     }
371                 }
372             }
373             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
374         }
375 
376         private int countChildren( Runner runner )
377         {
378             Description description = runner.getDescription();
379             Collection children = description == null ? null : description.getChildren();
380             return children == null ? 0 : children.size();
381         }
382 
383         private ExecutorService createPool( int poolSize )
384         {
385             return poolSize < Integer.MAX_VALUE
386                 ? Executors.newFixedThreadPool( poolSize )
387                 : Executors.newCachedThreadPool();
388         }
389 
390         private Scheduler createMaster( ExecutorService pool, int poolSize )
391         {
392             final int finalRunnersCounter = countFinalRunners(); // can be 0, 1, 2 or 3
393             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
394             {
395                 return new Scheduler( null, new InvokerStrategy() );
396             }
397             else if ( pool != null && poolSize == Integer.MAX_VALUE )
398             {
399                 return new Scheduler( null, new SharedThreadPoolStrategy( pool ) );
400             }
401             else
402             {
403                 return new Scheduler( null, SchedulingStrategies.createParallelStrategy( finalRunnersCounter ) );
404             }
405         }
406 
407         private int countFinalRunners()
408         {
409             int counter = notParallelRunners.isEmpty() ? 0 : 1;
410 
411             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
412             {
413                 ++counter;
414             }
415 
416             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
417             {
418                 ++counter;
419             }
420 
421             return counter;
422         }
423 
424         private void populateChildrenFromSuites()
425         {
426             // Do NOT use allGroups here.
427             Filter filter = new SuiteFilter();
428             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
429             {
430                 ParentRunner suite = it.next();
431                 try
432                 {
433                     suite.filter( filter );
434                 }
435                 catch ( NoTestsRemainException e )
436                 {
437                     it.remove();
438                 }
439             }
440         }
441 
442         private int totalPoolSize()
443         {
444             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
445             {
446                 int total = 0;
447                 for ( int nThreads : allGroups.values() )
448                 {
449                     total += nThreads;
450                     if ( total < 0 )
451                     {
452                         total = Integer.MAX_VALUE;
453                         break;
454                     }
455                 }
456                 return total;
457             }
458             else
459             {
460                 return poolCapacity;
461             }
462         }
463 
464         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
465             throws InitializationError
466         {
467             int parallelSuites = allGroups.get( SUITES );
468             int parallelClasses = allGroups.get( CLASSES );
469             int parallelMethods = allGroups.get( METHODS );
470             int poolSize = totalPoolSize();
471             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
472             master = createMaster( commonPool, poolSize );
473 
474             if ( suiteSuites != null )
475             {
476                 // a scheduler for parallel suites
477                 if ( commonPool != null && parallelSuites > 0 )
478                 {
479                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
480                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
481                 }
482                 else
483                 {
484                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
485                 }
486             }
487 
488             // schedulers for parallel classes
489             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
490             allSuites.addAll( nestedSuites );
491             if ( suiteClasses != null )
492             {
493                 allSuites.add( suiteClasses );
494             }
495             if ( !allSuites.isEmpty() )
496             {
497                 setSchedulers( allSuites, parallelClasses, commonPool );
498             }
499 
500             // schedulers for parallel methods
501             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
502             allClasses.addAll( nestedClasses );
503             if ( !allClasses.isEmpty() )
504             {
505                 setSchedulers( allClasses, parallelMethods, commonPool );
506             }
507 
508             // resulting runner for Computer#getSuite() scheduled by master scheduler
509             ParentRunner all = createFinalRunner( removeNullRunners(
510                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
511             ) );
512             all.setScheduler( master );
513             return all;
514         }
515 
516         private ParentRunner createFinalRunner( List<Runner> runners )
517             throws InitializationError
518         {
519             return new Suite( null, runners )
520             {
521                 @Override
522                 public void run( RunNotifier notifier )
523                 {
524                     try
525                     {
526                         beforeRunQuietly();
527                         super.run( notifier );
528                     }
529                     finally
530                     {
531                         afterRunQuietly();
532                     }
533                 }
534             };
535         }
536 
537         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
538         {
539             if ( commonPool != null )
540             {
541                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
542                 boolean doParallel = poolSize > 0;
543                 for ( ParentRunner runner : runners )
544                 {
545                     runner.setScheduler(
546                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
547                 }
548             }
549             else
550             {
551                 ExecutorService pool = null;
552                 if ( poolSize == Integer.MAX_VALUE )
553                 {
554                     pool = Executors.newCachedThreadPool();
555                 }
556                 else if ( poolSize > 0 )
557                 {
558                     pool = Executors.newFixedThreadPool( poolSize );
559                 }
560                 boolean doParallel = pool != null;
561                 for ( ParentRunner runner : runners )
562                 {
563                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
564                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
565                 }
566             }
567         }
568 
569         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
570                                            Balancer concurrency )
571         {
572             doParallel &= pool != null;
573             SchedulingStrategy strategy = doParallel ? new SharedThreadPoolStrategy( pool ) : new InvokerStrategy();
574             return new Scheduler( desc, master, strategy, concurrency );
575         }
576 
577         private Scheduler createScheduler( int poolSize )
578         {
579             if ( poolSize == Integer.MAX_VALUE )
580             {
581                 return new Scheduler( null, master, SchedulingStrategies.createParallelStrategyUnbounded() );
582             }
583             else if ( poolSize == 0 )
584             {
585                 return new Scheduler( null, master, new InvokerStrategy() );
586             }
587             else
588             {
589                 return new Scheduler( null, master, SchedulingStrategies.createParallelStrategy( poolSize ) );
590             }
591         }
592 
593         private boolean canSchedule( Runner runner )
594         {
595             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
596         }
597 
598         private boolean isThreadSafe( Runner runner )
599         {
600             return runner.getDescription().getAnnotation( NotThreadSafe.class ) == null;
601         }
602 
603         private class SuiteFilter
604             extends Filter
605         {
606             // Do NOT use allGroups in SuiteFilter.
607 
608             @Override
609             public boolean shouldRun( Description description )
610             {
611                 return true;
612             }
613 
614             @Override
615             public void apply( Object child )
616                 throws NoTestsRemainException
617             {
618                 super.apply( child );
619                 if ( child instanceof ParentRunner )
620                 {
621                     ParentRunner runner = ( ParentRunner ) child;
622                     if ( !isThreadSafe( runner ) )
623                     {
624                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
625                     }
626                     else if ( child instanceof Suite )
627                     {
628                         nestedSuites.add( (Suite) child );
629                     }
630                     else
631                     {
632                         ParentRunner parentRunner = (ParentRunner) child;
633                         nestedClasses.add( parentRunner );
634                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
635                     }
636                 }
637             }
638 
639             @Override
640             public String describe()
641             {
642                 return "";
643             }
644         }
645     }
646 
647     private static Suite createSuite( Collection<Runner> runners )
648         throws InitializationError
649     {
650         final List<Runner> onlyRunners = removeNullRunners( runners );
651         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
652         {
653         };
654     }
655 
656     private static List<Runner> removeNullRunners( Collection<Runner> runners )
657     {
658         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
659         onlyRunners.removeAll( NULL_SINGLETON );
660         return onlyRunners;
661     }
662 }