1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.annotation.Annotation;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.EnumMap;
28 import java.util.Iterator;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38 import org.apache.maven.surefire.report.ConsoleStream;
39 import org.apache.maven.surefire.testset.TestSetFailedException;
40 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41 import org.junit.internal.runners.ErrorReportingRunner;
42 import org.junit.runner.Description;
43 import org.junit.runner.Runner;
44 import org.junit.runner.manipulation.Filter;
45 import org.junit.runner.manipulation.NoTestsRemainException;
46 import org.junit.runner.notification.RunNotifier;
47 import org.junit.runners.ParentRunner;
48 import org.junit.runners.Suite;
49 import org.junit.runners.model.InitializationError;
50 import org.junit.runners.model.RunnerBuilder;
51
52 import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55 import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56 import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57 import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58
59 @SuppressWarnings( { "javadoc", "checkstyle:javadoctype" } )
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public final class ParallelComputerBuilder
85 {
86 private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
87
88 private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
89
90 private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
91
92 static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
93
94 private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
95
96 private final ConsoleStream logger;
97
98 private boolean useSeparatePools;
99
100 private int totalPoolSize;
101
102 private JUnitCoreParameters parameters;
103
104 private boolean optimize;
105
106 private boolean runningInTests;
107
108
109
110
111
112
113 ParallelComputerBuilder( ConsoleStream logger )
114 {
115 this.logger = logger;
116 runningInTests = true;
117 useSeparatePools();
118 parallelGroups.put( SUITES, 0 );
119 parallelGroups.put( CLASSES, 0 );
120 parallelGroups.put( METHODS, 0 );
121 }
122
123 public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
124 {
125 this( logger );
126 runningInTests = false;
127 this.parameters = parameters;
128 }
129
130 public ParallelComputer buildComputer()
131 {
132 return new PC();
133 }
134
135 ParallelComputerBuilder useSeparatePools()
136 {
137 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138 useSeparatePools = true;
139 return this;
140 }
141
142 ParallelComputerBuilder useOnePool()
143 {
144 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
145 useSeparatePools = false;
146 return this;
147 }
148
149
150
151
152
153
154
155 ParallelComputerBuilder useOnePool( int totalPoolSize )
156 {
157 if ( totalPoolSize < 1 )
158 {
159 throw new IllegalArgumentException( "Size of common pool is less than 1." );
160 }
161 this.totalPoolSize = totalPoolSize;
162 useSeparatePools = false;
163 return this;
164 }
165
166 boolean isOptimized()
167 {
168 return optimize;
169 }
170
171 ParallelComputerBuilder optimize( boolean optimize )
172 {
173 this.optimize = optimize;
174 return this;
175 }
176
177 ParallelComputerBuilder parallelSuites()
178 {
179 return parallel( SUITES );
180 }
181
182 ParallelComputerBuilder parallelSuites( int nThreads )
183 {
184 return parallel( nThreads, SUITES );
185 }
186
187 ParallelComputerBuilder parallelClasses()
188 {
189 return parallel( CLASSES );
190 }
191
192 ParallelComputerBuilder parallelClasses( int nThreads )
193 {
194 return parallel( nThreads, CLASSES );
195 }
196
197 ParallelComputerBuilder parallelMethods()
198 {
199 return parallel( METHODS );
200 }
201
202 ParallelComputerBuilder parallelMethods( int nThreads )
203 {
204 return parallel( nThreads, METHODS );
205 }
206
207 private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
208 {
209 if ( nThreads < 0 )
210 {
211 throw new IllegalArgumentException( "negative nThreads " + nThreads );
212 }
213
214 if ( parallelType == null )
215 {
216 throw new IllegalArgumentException( "null parallelType" );
217 }
218
219 parallelGroups.put( parallelType, nThreads );
220 return this;
221 }
222
223 private ParallelComputerBuilder parallel( Type parallelType )
224 {
225 return parallel( Integer.MAX_VALUE, parallelType );
226 }
227
228 private double parallelTestsTimeoutInSeconds()
229 {
230 return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
231 }
232
233 private double parallelTestsTimeoutForcedInSeconds()
234 {
235 return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
236 }
237
238 @SuppressWarnings( "unchecked" )
239 private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
240 {
241 try
242 {
243 Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
244 return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
245 }
246 catch ( ClassNotFoundException e )
247 {
248 return null;
249 }
250 }
251
252 final class PC
253 extends ParallelComputer
254 {
255 private final SingleThreadScheduler notThreadSafeTests =
256 new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
257
258 private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
259
260 private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
261
262 private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
263
264 private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
265
266 private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
267
268 private int poolCapacity;
269
270 private boolean splitPool;
271
272 private final Map<Type, Integer> allGroups;
273
274 private long nestedClassesChildren;
275
276 private volatile Scheduler master;
277
278 private PC()
279 {
280 super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
281 allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
282 poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
283 splitPool = ParallelComputerBuilder.this.useSeparatePools;
284 }
285
286 Collection<ParentRunner> getSuites()
287 {
288 return suites;
289 }
290
291 Collection<ParentRunner> getNestedSuites()
292 {
293 return nestedSuites;
294 }
295
296 Collection<ParentRunner> getClasses()
297 {
298 return classes;
299 }
300
301 Collection<ParentRunner> getNestedClasses()
302 {
303 return nestedClasses;
304 }
305
306 Collection<Runner> getNotParallelRunners()
307 {
308 return notParallelRunners;
309 }
310
311 int getPoolCapacity()
312 {
313 return poolCapacity;
314 }
315
316 boolean isSplitPool()
317 {
318 return splitPool;
319 }
320
321 @Override
322 protected ShutdownResult describeStopped( boolean shutdownNow )
323 {
324 ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
325 final Scheduler m = master;
326 if ( m != null )
327 {
328 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
329 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
330 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
331 }
332 return shutdownResult;
333 }
334
335 @Override
336 protected boolean shutdownThreadPoolsAwaitingKilled()
337 {
338 boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
339 final Scheduler m = master;
340 if ( m != null )
341 {
342 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
343 }
344 return notInterrupted;
345 }
346
347 @Override
348 public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
349 throws InitializationError
350 {
351 try
352 {
353 super.getSuite( builder, cls );
354 populateChildrenFromSuites();
355
356 WrappedRunners suiteSuites = wrapRunners( suites );
357 WrappedRunners suiteClasses = wrapRunners( classes );
358
359 long suitesCount = suites.size();
360 long classesCount = classes.size() + nestedClasses.size();
361 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
362 if ( !ParallelComputerBuilder.this.runningInTests )
363 {
364 determineThreadCounts( suitesCount, classesCount, methodsCount );
365 }
366
367 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
368 }
369 catch ( TestSetFailedException e )
370 {
371 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
372 }
373 }
374
375 @Override
376 protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
377 throws Throwable
378 {
379 Runner runner = super.getRunner( builder, testClass );
380 if ( canSchedule( runner ) )
381 {
382 if ( !isThreadSafe( runner ) )
383 {
384 ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
385 notParallelRunners.add( runner );
386 }
387 else if ( runner instanceof Suite )
388 {
389 suites.add( (Suite) runner );
390 }
391 else
392 {
393 classes.add( (ParentRunner) runner );
394 }
395 }
396 else
397 {
398 notParallelRunners.add( runner );
399 }
400 return runner;
401 }
402
403 private void determineThreadCounts( long suites, long classes, long methods )
404 throws TestSetFailedException
405 {
406 RunnerCounter counts =
407 ParallelComputerBuilder.this.optimize ? new RunnerCounter( suites, classes, methods ) : null;
408 Concurrency concurrency =
409 resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
410 allGroups.put( SUITES, concurrency.suites );
411 allGroups.put( CLASSES, concurrency.classes );
412 allGroups.put( METHODS, concurrency.methods );
413 poolCapacity = concurrency.capacity;
414 splitPool &= concurrency.capacity <= 0;
415 }
416
417 private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
418 throws InitializationError
419 {
420
421 long childrenCounter = 0;
422 ArrayList<Runner> runs = new ArrayList<Runner>();
423 for ( T runner : runners )
424 {
425 if ( runner != null )
426 {
427 int children = countChildren( runner );
428 childrenCounter += children;
429 runs.add( runner );
430 }
431 }
432 return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
433 }
434
435 private int countChildren( Runner runner )
436 {
437 Description description = runner.getDescription();
438 Collection children = description == null ? null : description.getChildren();
439 return children == null ? 0 : children.size();
440 }
441
442 private ExecutorService createPool( int poolSize )
443 {
444 return poolSize < Integer.MAX_VALUE
445 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
446 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
447 }
448
449 private Scheduler createMaster( ExecutorService pool, int poolSize )
450 {
451
452 final int finalRunnersCounter = countFinalRunners();
453
454 final SchedulingStrategy strategy;
455 if ( finalRunnersCounter <= 1 || poolSize <= 1 )
456 {
457 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
458 }
459 else if ( pool != null && poolSize == Integer.MAX_VALUE )
460 {
461 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
462 }
463 else
464 {
465 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
466 }
467 return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
468 }
469
470 private int countFinalRunners()
471 {
472 int counter = notParallelRunners.isEmpty() ? 0 : 1;
473
474 if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
475 {
476 ++counter;
477 }
478
479 if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
480 {
481 ++counter;
482 }
483
484 return counter;
485 }
486
487 private void populateChildrenFromSuites()
488 {
489
490 Filter filter = new SuiteFilter();
491 for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
492 {
493 ParentRunner suite = it.next();
494 try
495 {
496 suite.filter( filter );
497 }
498 catch ( NoTestsRemainException e )
499 {
500 it.remove();
501 }
502 }
503 }
504
505 private int totalPoolSize()
506 {
507 if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
508 {
509 int total = 0;
510 for ( int nThreads : allGroups.values() )
511 {
512 total += nThreads;
513 if ( total < 0 )
514 {
515 total = Integer.MAX_VALUE;
516 break;
517 }
518 }
519 return total;
520 }
521 else
522 {
523 return poolCapacity;
524 }
525 }
526
527 private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
528 throws InitializationError
529 {
530 int parallelSuites = allGroups.get( SUITES );
531 int parallelClasses = allGroups.get( CLASSES );
532 int parallelMethods = allGroups.get( METHODS );
533 int poolSize = totalPoolSize();
534 ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
535 master = createMaster( commonPool, poolSize );
536
537 if ( suiteSuites != null )
538 {
539
540 if ( commonPool != null && parallelSuites > 0 )
541 {
542 Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
543 suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
544 }
545 else
546 {
547 suiteSuites.setScheduler( createScheduler( parallelSuites ) );
548 }
549 }
550
551
552 ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
553 allSuites.addAll( nestedSuites );
554 if ( suiteClasses != null )
555 {
556 allSuites.add( suiteClasses );
557 }
558 if ( !allSuites.isEmpty() )
559 {
560 setSchedulers( allSuites, parallelClasses, commonPool );
561 }
562
563
564 ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
565 allClasses.addAll( nestedClasses );
566 if ( !allClasses.isEmpty() )
567 {
568 setSchedulers( allClasses, parallelMethods, commonPool );
569 }
570
571
572 ParentRunner all = createFinalRunner( removeNullRunners(
573 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
574 ) );
575 all.setScheduler( master );
576 return all;
577 }
578
579 private ParentRunner createFinalRunner( List<Runner> runners )
580 throws InitializationError
581 {
582 return new Suite( null, runners )
583 {
584 @Override
585 public void run( RunNotifier notifier )
586 {
587 try
588 {
589 beforeRunQuietly();
590 super.run( notifier );
591 }
592 finally
593 {
594 afterRunQuietly();
595 }
596 }
597 };
598 }
599
600 private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
601 {
602 if ( commonPool != null )
603 {
604 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
605 boolean doParallel = poolSize > 0;
606 for ( ParentRunner runner : runners )
607 {
608 runner.setScheduler(
609 createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
610 }
611 }
612 else
613 {
614 ExecutorService pool = null;
615 if ( poolSize == Integer.MAX_VALUE )
616 {
617 pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
618 }
619 else if ( poolSize > 0 )
620 {
621 pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
622 }
623 boolean doParallel = pool != null;
624 for ( ParentRunner runner : runners )
625 {
626 runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
627 BalancerFactory.createInfinitePermitsBalancer() ) );
628 }
629 }
630 }
631
632 private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
633 Balancer concurrency )
634 {
635 SchedulingStrategy strategy =
636 doParallel & pool != null
637 ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
638 : new InvokerStrategy( ParallelComputerBuilder.this.logger );
639 return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
640 }
641
642 private Scheduler createScheduler( int poolSize )
643 {
644 final SchedulingStrategy strategy;
645 if ( poolSize == Integer.MAX_VALUE )
646 {
647 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
648 }
649 else if ( poolSize == 0 )
650 {
651 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
652 }
653 else
654 {
655 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
656 }
657 return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
658 }
659
660 private boolean canSchedule( Runner runner )
661 {
662 return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
663 }
664
665 private boolean isThreadSafe( Runner runner )
666 {
667 return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
668 }
669
670 private class SuiteFilter
671 extends Filter
672 {
673
674
675 @Override
676 public boolean shouldRun( Description description )
677 {
678 return true;
679 }
680
681 @Override
682 public void apply( Object child )
683 throws NoTestsRemainException
684 {
685 super.apply( child );
686 if ( child instanceof ParentRunner )
687 {
688 ParentRunner runner = ( ParentRunner ) child;
689 if ( !isThreadSafe( runner ) )
690 {
691 runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
692 }
693 else if ( child instanceof Suite )
694 {
695 nestedSuites.add( (Suite) child );
696 }
697 else
698 {
699 ParentRunner parentRunner = (ParentRunner) child;
700 nestedClasses.add( parentRunner );
701 nestedClassesChildren += parentRunner.getDescription().getChildren().size();
702 }
703 }
704 }
705
706 @Override
707 public String describe()
708 {
709 return "";
710 }
711 }
712 }
713
714 private static Suite createSuite( Collection<Runner> runners )
715 throws InitializationError
716 {
717 final List<Runner> onlyRunners = removeNullRunners( runners );
718 return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
719 {
720 };
721 }
722
723 private static List<Runner> removeNullRunners( Collection<Runner> runners )
724 {
725 final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
726 onlyRunners.removeAll( NULL_SINGLETON );
727 return onlyRunners;
728 }
729 }