1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.annotation.Annotation;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.EnumMap;
28 import java.util.Iterator;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38 import org.apache.maven.surefire.report.ConsoleStream;
39 import org.apache.maven.surefire.testset.TestSetFailedException;
40 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41 import org.junit.internal.runners.ErrorReportingRunner;
42 import org.junit.runner.Description;
43 import org.junit.runner.Runner;
44 import org.junit.runner.manipulation.Filter;
45 import org.junit.runner.manipulation.NoTestsRemainException;
46 import org.junit.runner.notification.RunNotifier;
47 import org.junit.runners.ParentRunner;
48 import org.junit.runners.Suite;
49 import org.junit.runners.model.InitializationError;
50 import org.junit.runners.model.RunnerBuilder;
51
52 import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55 import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56 import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57 import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58
59 @SuppressWarnings( { "javadoc", "checkstyle:javadoctype" } )
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public final class ParallelComputerBuilder
85 {
86 private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
87
88 private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
89
90 private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
91
92 static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
93
94 private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
95
96 private final ConsoleStream logger;
97
98 private boolean useSeparatePools;
99
100 private int totalPoolSize;
101
102 private JUnitCoreParameters parameters;
103
104 private boolean optimize;
105
106 private boolean runningInTests;
107
108
109
110
111
112
113 ParallelComputerBuilder( ConsoleStream logger )
114 {
115 this.logger = logger;
116 runningInTests = true;
117 useSeparatePools();
118 parallelGroups.put( SUITES, 0 );
119 parallelGroups.put( CLASSES, 0 );
120 parallelGroups.put( METHODS, 0 );
121 }
122
123 public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
124 {
125 this( logger );
126 runningInTests = false;
127 this.parameters = parameters;
128 }
129
130 public ParallelComputer buildComputer()
131 {
132 return new PC();
133 }
134
135 ParallelComputerBuilder useSeparatePools()
136 {
137 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138 useSeparatePools = true;
139 return this;
140 }
141
142 ParallelComputerBuilder useOnePool()
143 {
144 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
145 useSeparatePools = false;
146 return this;
147 }
148
149
150
151
152
153
154
155 ParallelComputerBuilder useOnePool( int totalPoolSize )
156 {
157 if ( totalPoolSize < 1 )
158 {
159 throw new IllegalArgumentException( "Size of common pool is less than 1." );
160 }
161 this.totalPoolSize = totalPoolSize;
162 useSeparatePools = false;
163 return this;
164 }
165
166 boolean isOptimized()
167 {
168 return optimize;
169 }
170
171 ParallelComputerBuilder optimize( boolean optimize )
172 {
173 this.optimize = optimize;
174 return this;
175 }
176
177 ParallelComputerBuilder parallelSuites()
178 {
179 return parallel( SUITES );
180 }
181
182 ParallelComputerBuilder parallelSuites( int nThreads )
183 {
184 return parallel( nThreads, SUITES );
185 }
186
187 ParallelComputerBuilder parallelClasses()
188 {
189 return parallel( CLASSES );
190 }
191
192 ParallelComputerBuilder parallelClasses( int nThreads )
193 {
194 return parallel( nThreads, CLASSES );
195 }
196
197 ParallelComputerBuilder parallelMethods()
198 {
199 return parallel( METHODS );
200 }
201
202 ParallelComputerBuilder parallelMethods( int nThreads )
203 {
204 return parallel( nThreads, METHODS );
205 }
206
207 private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
208 {
209 if ( nThreads < 0 )
210 {
211 throw new IllegalArgumentException( "negative nThreads " + nThreads );
212 }
213
214 if ( parallelType == null )
215 {
216 throw new IllegalArgumentException( "null parallelType" );
217 }
218
219 parallelGroups.put( parallelType, nThreads );
220 return this;
221 }
222
223 private ParallelComputerBuilder parallel( Type parallelType )
224 {
225 return parallel( Integer.MAX_VALUE, parallelType );
226 }
227
228 private double parallelTestsTimeoutInSeconds()
229 {
230 return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
231 }
232
233 private double parallelTestsTimeoutForcedInSeconds()
234 {
235 return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
236 }
237
238 @SuppressWarnings( "unchecked" )
239 private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
240 {
241 try
242 {
243 Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
244 return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
245 }
246 catch ( ClassNotFoundException e )
247 {
248 return null;
249 }
250 }
251
252 final class PC
253 extends ParallelComputer
254 {
255 private final SingleThreadScheduler notThreadSafeTests =
256 new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
257
258 private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
259
260 private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
261
262 private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
263
264 private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
265
266 private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
267
268 private int poolCapacity;
269
270 private boolean splitPool;
271
272 private final Map<Type, Integer> allGroups;
273
274 private long nestedClassesChildren;
275
276 private volatile Scheduler master;
277
278 private PC()
279 {
280 super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
281 allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
282 poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
283 splitPool = ParallelComputerBuilder.this.useSeparatePools;
284 }
285
286 Collection<ParentRunner> getSuites()
287 {
288 return suites;
289 }
290
291 Collection<ParentRunner> getNestedSuites()
292 {
293 return nestedSuites;
294 }
295
296 Collection<ParentRunner> getClasses()
297 {
298 return classes;
299 }
300
301 Collection<ParentRunner> getNestedClasses()
302 {
303 return nestedClasses;
304 }
305
306 Collection<Runner> getNotParallelRunners()
307 {
308 return notParallelRunners;
309 }
310
311 int getPoolCapacity()
312 {
313 return poolCapacity;
314 }
315
316 boolean isSplitPool()
317 {
318 return splitPool;
319 }
320
321 @Override
322 protected ShutdownResult describeStopped( boolean shutdownNow )
323 {
324 ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
325 final Scheduler m = master;
326 if ( m != null )
327 {
328 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
329 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
330 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
331 }
332 return shutdownResult;
333 }
334
335 @Override
336 protected boolean shutdownThreadPoolsAwaitingKilled()
337 {
338 boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
339 final Scheduler m = master;
340 if ( m != null )
341 {
342 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
343 }
344 return notInterrupted;
345 }
346
347 @Override
348 public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
349 throws InitializationError
350 {
351 try
352 {
353 super.getSuite( builder, cls );
354 populateChildrenFromSuites();
355
356 WrappedRunners suiteSuites = wrapRunners( suites );
357 WrappedRunners suiteClasses = wrapRunners( classes );
358
359 long suitesCount = suites.size();
360 long classesCount = classes.size() + nestedClasses.size();
361 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
362 if ( !ParallelComputerBuilder.this.runningInTests )
363 {
364 determineThreadCounts( suitesCount, classesCount, methodsCount );
365 }
366
367 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
368 }
369 catch ( TestSetFailedException e )
370 {
371 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
372 }
373 }
374
375 @Override
376 protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
377 throws Throwable
378 {
379 Runner runner = super.getRunner( builder, testClass );
380 if ( canSchedule( runner ) )
381 {
382 if ( !isThreadSafe( runner ) )
383 {
384 ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
385 notParallelRunners.add( runner );
386 }
387 else if ( runner instanceof Suite )
388 {
389 suites.add( (Suite) runner );
390 }
391 else
392 {
393 classes.add( (ParentRunner) runner );
394 }
395 }
396 else
397 {
398 notParallelRunners.add( runner );
399 }
400 return runner;
401 }
402
403 private void determineThreadCounts( long suites, long classes, long methods )
404 throws TestSetFailedException
405 {
406 RunnerCounter counts = null;
407 if ( ParallelComputerBuilder.this.optimize )
408 {
409 counts = new RunnerCounter( suites, classes, methods );
410 }
411 Concurrency concurrency =
412 resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
413 allGroups.put( SUITES, concurrency.suites );
414 allGroups.put( CLASSES, concurrency.classes );
415 allGroups.put( METHODS, concurrency.methods );
416 poolCapacity = concurrency.capacity;
417 splitPool &= concurrency.capacity <= 0;
418 }
419
420 private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
421 throws InitializationError
422 {
423
424 long childrenCounter = 0;
425 ArrayList<Runner> runs = new ArrayList<Runner>();
426 for ( T runner : runners )
427 {
428 if ( runner != null )
429 {
430 int children = countChildren( runner );
431 childrenCounter += children;
432 runs.add( runner );
433 }
434 }
435 return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
436 }
437
438 private int countChildren( Runner runner )
439 {
440 Description description = runner.getDescription();
441 Collection children = description == null ? null : description.getChildren();
442 return children == null ? 0 : children.size();
443 }
444
445 private ExecutorService createPool( int poolSize )
446 {
447 return poolSize < Integer.MAX_VALUE
448 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
449 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
450 }
451
452 private Scheduler createMaster( ExecutorService pool, int poolSize )
453 {
454
455 final int finalRunnersCounter = countFinalRunners();
456
457 final SchedulingStrategy strategy;
458 if ( finalRunnersCounter <= 1 || poolSize <= 1 )
459 {
460 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
461 }
462 else if ( pool != null && poolSize == Integer.MAX_VALUE )
463 {
464 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
465 }
466 else
467 {
468 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
469 }
470 return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
471 }
472
473 private int countFinalRunners()
474 {
475 int counter = notParallelRunners.isEmpty() ? 0 : 1;
476
477 if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
478 {
479 ++counter;
480 }
481
482 if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
483 {
484 ++counter;
485 }
486
487 return counter;
488 }
489
490 private void populateChildrenFromSuites()
491 {
492
493 Filter filter = new SuiteFilter();
494 for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
495 {
496 ParentRunner suite = it.next();
497 try
498 {
499 suite.filter( filter );
500 }
501 catch ( NoTestsRemainException e )
502 {
503 it.remove();
504 }
505 }
506 }
507
508 private int totalPoolSize()
509 {
510 if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
511 {
512 int total = 0;
513 for ( int nThreads : allGroups.values() )
514 {
515 total += nThreads;
516 if ( total < 0 )
517 {
518 total = Integer.MAX_VALUE;
519 break;
520 }
521 }
522 return total;
523 }
524 else
525 {
526 return poolCapacity;
527 }
528 }
529
530 private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
531 throws InitializationError
532 {
533 int parallelSuites = allGroups.get( SUITES );
534 int parallelClasses = allGroups.get( CLASSES );
535 int parallelMethods = allGroups.get( METHODS );
536 int poolSize = totalPoolSize();
537 ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
538 master = createMaster( commonPool, poolSize );
539
540 if ( suiteSuites != null )
541 {
542
543 if ( commonPool != null && parallelSuites > 0 )
544 {
545 Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
546 suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
547 }
548 else
549 {
550 suiteSuites.setScheduler( createScheduler( parallelSuites ) );
551 }
552 }
553
554
555 ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
556 allSuites.addAll( nestedSuites );
557 if ( suiteClasses != null )
558 {
559 allSuites.add( suiteClasses );
560 }
561 if ( !allSuites.isEmpty() )
562 {
563 setSchedulers( allSuites, parallelClasses, commonPool );
564 }
565
566
567 ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
568 allClasses.addAll( nestedClasses );
569 if ( !allClasses.isEmpty() )
570 {
571 setSchedulers( allClasses, parallelMethods, commonPool );
572 }
573
574
575 ParentRunner all = createFinalRunner( removeNullRunners(
576 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
577 ) );
578 all.setScheduler( master );
579 return all;
580 }
581
582 private ParentRunner createFinalRunner( List<Runner> runners )
583 throws InitializationError
584 {
585 return new Suite( null, runners )
586 {
587 @Override
588 public void run( RunNotifier notifier )
589 {
590 try
591 {
592 beforeRunQuietly();
593 super.run( notifier );
594 }
595 finally
596 {
597 afterRunQuietly();
598 }
599 }
600 };
601 }
602
603 private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
604 {
605 if ( commonPool != null )
606 {
607 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
608 boolean doParallel = poolSize > 0;
609 for ( ParentRunner runner : runners )
610 {
611 runner.setScheduler(
612 createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
613 }
614 }
615 else
616 {
617 ExecutorService pool = null;
618 if ( poolSize == Integer.MAX_VALUE )
619 {
620 pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
621 }
622 else if ( poolSize > 0 )
623 {
624 pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
625 }
626 boolean doParallel = pool != null;
627 for ( ParentRunner runner : runners )
628 {
629 runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
630 BalancerFactory.createInfinitePermitsBalancer() ) );
631 }
632 }
633 }
634
635 private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
636 Balancer concurrency )
637 {
638 SchedulingStrategy strategy =
639 doParallel & pool != null
640 ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
641 : new InvokerStrategy( ParallelComputerBuilder.this.logger );
642 return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
643 }
644
645 private Scheduler createScheduler( int poolSize )
646 {
647 final SchedulingStrategy strategy;
648 if ( poolSize == Integer.MAX_VALUE )
649 {
650 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
651 }
652 else if ( poolSize == 0 )
653 {
654 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
655 }
656 else
657 {
658 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
659 }
660 return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
661 }
662
663 private boolean canSchedule( Runner runner )
664 {
665 return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
666 }
667
668 private boolean isThreadSafe( Runner runner )
669 {
670 return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
671 }
672
673 private class SuiteFilter
674 extends Filter
675 {
676
677
678 @Override
679 public boolean shouldRun( Description description )
680 {
681 return true;
682 }
683
684 @Override
685 public void apply( Object child )
686 throws NoTestsRemainException
687 {
688 super.apply( child );
689 if ( child instanceof ParentRunner )
690 {
691 ParentRunner runner = ( ParentRunner ) child;
692 if ( !isThreadSafe( runner ) )
693 {
694 runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
695 }
696 else if ( child instanceof Suite )
697 {
698 nestedSuites.add( (Suite) child );
699 }
700 else
701 {
702 ParentRunner parentRunner = (ParentRunner) child;
703 nestedClasses.add( parentRunner );
704 nestedClassesChildren += parentRunner.getDescription().getChildren().size();
705 }
706 }
707 }
708
709 @Override
710 public String describe()
711 {
712 return "";
713 }
714 }
715 }
716
717 private static Suite createSuite( Collection<Runner> runners )
718 throws InitializationError
719 {
720 final List<Runner> onlyRunners = removeNullRunners( runners );
721 return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
722 {
723 };
724 }
725
726 private static List<Runner> removeNullRunners( Collection<Runner> runners )
727 {
728 final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
729 onlyRunners.removeAll( NULL_SINGLETON );
730 return onlyRunners;
731 }
732 }