1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.junitcore.pc;
20
21 import java.lang.annotation.Annotation;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.EnumMap;
27 import java.util.Iterator;
28 import java.util.LinkedHashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ThreadFactory;
35
36 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
37 import org.apache.maven.surefire.api.testset.TestSetFailedException;
38 import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
39 import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
40 import org.junit.internal.runners.ErrorReportingRunner;
41 import org.junit.runner.Description;
42 import org.junit.runner.Runner;
43 import org.junit.runner.manipulation.Filter;
44 import org.junit.runner.manipulation.NoTestsRemainException;
45 import org.junit.runner.notification.RunNotifier;
46 import org.junit.runners.ParentRunner;
47 import org.junit.runners.Suite;
48 import org.junit.runners.model.InitializationError;
49 import org.junit.runners.model.RunnerBuilder;
50
51 import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
52 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
53 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
54 import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
55 import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
56 import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
57
58 @SuppressWarnings({"javadoc", "checkstyle:javadoctype"})
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public final class ParallelComputerBuilder {
84 private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
85
86 private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
87
88 private static final Set<Runner> NULL_SINGLETON = Collections.singleton(null);
89
90 static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
91
92 private final Map<Type, Integer> parallelGroups = new EnumMap<>(Type.class);
93
94 private final ConsoleLogger logger;
95
96 private boolean useSeparatePools;
97
98 private int totalPoolSize;
99
100 private JUnitCoreParameters parameters;
101
102 private boolean optimize;
103
104 private boolean runningInTests;
105
106
107
108
109
110
111 ParallelComputerBuilder(ConsoleLogger logger) {
112 this.logger = logger;
113 runningInTests = true;
114 useSeparatePools();
115 parallelGroups.put(SUITES, 0);
116 parallelGroups.put(CLASSES, 0);
117 parallelGroups.put(METHODS, 0);
118 }
119
120 public ParallelComputerBuilder(ConsoleLogger logger, JUnitCoreParameters parameters) {
121 this(logger);
122 runningInTests = false;
123 this.parameters = parameters;
124 }
125
126 public ParallelComputer buildComputer() {
127 return new PC();
128 }
129
130 ParallelComputerBuilder useSeparatePools() {
131 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
132 useSeparatePools = true;
133 return this;
134 }
135
136 ParallelComputerBuilder useOnePool() {
137 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138 useSeparatePools = false;
139 return this;
140 }
141
142
143
144
145
146
147
148 ParallelComputerBuilder useOnePool(int totalPoolSize) {
149 if (totalPoolSize < 1) {
150 throw new IllegalArgumentException("Size of common pool is less than 1.");
151 }
152 this.totalPoolSize = totalPoolSize;
153 useSeparatePools = false;
154 return this;
155 }
156
157 boolean isOptimized() {
158 return optimize;
159 }
160
161 ParallelComputerBuilder optimize(boolean optimize) {
162 this.optimize = optimize;
163 return this;
164 }
165
166 ParallelComputerBuilder parallelSuites() {
167 return parallel(SUITES);
168 }
169
170 ParallelComputerBuilder parallelSuites(int nThreads) {
171 return parallel(nThreads, SUITES);
172 }
173
174 ParallelComputerBuilder parallelClasses() {
175 return parallel(CLASSES);
176 }
177
178 ParallelComputerBuilder parallelClasses(int nThreads) {
179 return parallel(nThreads, CLASSES);
180 }
181
182 ParallelComputerBuilder parallelMethods() {
183 return parallel(METHODS);
184 }
185
186 ParallelComputerBuilder parallelMethods(int nThreads) {
187 return parallel(nThreads, METHODS);
188 }
189
190 private ParallelComputerBuilder parallel(int nThreads, Type parallelType) {
191 if (nThreads < 0) {
192 throw new IllegalArgumentException("negative nThreads " + nThreads);
193 }
194
195 if (parallelType == null) {
196 throw new IllegalArgumentException("null parallelType");
197 }
198
199 parallelGroups.put(parallelType, nThreads);
200 return this;
201 }
202
203 private ParallelComputerBuilder parallel(Type parallelType) {
204 return parallel(Integer.MAX_VALUE, parallelType);
205 }
206
207 private double parallelTestsTimeoutInSeconds() {
208 return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
209 }
210
211 private double parallelTestsTimeoutForcedInSeconds() {
212 return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
213 }
214
215 @SuppressWarnings("unchecked")
216 private static Class<? extends Annotation> loadNotThreadSafeAnnotations() {
217 try {
218 Class c = Class.forName("net.jcip.annotations.NotThreadSafe");
219 return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
220 } catch (ClassNotFoundException e) {
221 return null;
222 }
223 }
224
225 final class PC extends ParallelComputer {
226 private final SingleThreadScheduler notThreadSafeTests =
227 new SingleThreadScheduler(ParallelComputerBuilder.this.logger);
228
229 private final Collection<ParentRunner> suites = new LinkedHashSet<>();
230
231 private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<>();
232
233 private final Collection<ParentRunner> classes = new LinkedHashSet<>();
234
235 private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<>();
236
237 private final Collection<Runner> notParallelRunners = new LinkedHashSet<>();
238
239 private int poolCapacity;
240
241 private boolean splitPool;
242
243 private final Map<Type, Integer> allGroups;
244
245 private long nestedClassesChildren;
246
247 private volatile Scheduler master;
248
249 private PC() {
250 super(parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds());
251 allGroups = new EnumMap<>(ParallelComputerBuilder.this.parallelGroups);
252 poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
253 splitPool = ParallelComputerBuilder.this.useSeparatePools;
254 }
255
256 Collection<ParentRunner> getSuites() {
257 return suites;
258 }
259
260 Collection<ParentRunner> getNestedSuites() {
261 return nestedSuites;
262 }
263
264 Collection<ParentRunner> getClasses() {
265 return classes;
266 }
267
268 Collection<ParentRunner> getNestedClasses() {
269 return nestedClasses;
270 }
271
272 Collection<Runner> getNotParallelRunners() {
273 return notParallelRunners;
274 }
275
276 int getPoolCapacity() {
277 return poolCapacity;
278 }
279
280 boolean isSplitPool() {
281 return splitPool;
282 }
283
284 @Override
285 protected ShutdownResult describeStopped(boolean shutdownNow) {
286 ShutdownResult shutdownResult = notThreadSafeTests.describeStopped(shutdownNow);
287 final Scheduler m = master;
288 if (m != null) {
289 ShutdownResult shutdownResultOfMaster = m.describeStopped(shutdownNow);
290 shutdownResult.getTriggeredTests().addAll(shutdownResultOfMaster.getTriggeredTests());
291 shutdownResult.getIncompleteTests().addAll(shutdownResultOfMaster.getIncompleteTests());
292 }
293 return shutdownResult;
294 }
295
296 @Override
297 protected boolean shutdownThreadPoolsAwaitingKilled() {
298 boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
299 final Scheduler m = master;
300 if (m != null) {
301 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
302 }
303 return notInterrupted;
304 }
305
306 @Override
307 public Runner getSuite(RunnerBuilder builder, Class<?>[] cls) throws InitializationError {
308 try {
309 super.getSuite(builder, cls);
310 populateChildrenFromSuites();
311
312 WrappedRunners suiteSuites = wrapRunners(suites);
313 WrappedRunners suiteClasses = wrapRunners(classes);
314
315 long suitesCount = suites.size();
316 long classesCount = classes.size() + nestedClasses.size();
317 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
318 if (!ParallelComputerBuilder.this.runningInTests) {
319 determineThreadCounts(suitesCount, classesCount, methodsCount);
320 }
321
322 return setSchedulers(suiteSuites.wrappingSuite, suiteClasses.wrappingSuite);
323 } catch (TestSetFailedException e) {
324 throw new InitializationError(Collections.<Throwable>singletonList(e));
325 }
326 }
327
328 @Override
329 protected Runner getRunner(RunnerBuilder builder, Class<?> testClass) throws Throwable {
330 Runner runner = super.getRunner(builder, testClass);
331 if (canSchedule(runner)) {
332 if (!isThreadSafe(runner)) {
333 ((ParentRunner) runner).setScheduler(notThreadSafeTests.newRunnerScheduler());
334 notParallelRunners.add(runner);
335 } else if (runner instanceof Suite) {
336 suites.add((Suite) runner);
337 } else {
338 classes.add((ParentRunner) runner);
339 }
340 } else {
341 notParallelRunners.add(runner);
342 }
343 return runner;
344 }
345
346 private void determineThreadCounts(long suites, long classes, long methods) throws TestSetFailedException {
347 RunnerCounter counts =
348 ParallelComputerBuilder.this.optimize ? new RunnerCounter(suites, classes, methods) : null;
349 Concurrency concurrency = resolveConcurrency(ParallelComputerBuilder.this.parameters, counts);
350 allGroups.put(SUITES, concurrency.suites);
351 allGroups.put(CLASSES, concurrency.classes);
352 allGroups.put(METHODS, concurrency.methods);
353 poolCapacity = concurrency.capacity;
354 splitPool &= concurrency.capacity <= 0;
355 }
356
357 private <T extends Runner> WrappedRunners wrapRunners(Collection<T> runners) throws InitializationError {
358
359 long childrenCounter = 0;
360 ArrayList<Runner> runs = new ArrayList<>();
361 for (T runner : runners) {
362 if (runner != null) {
363 int children = countChildren(runner);
364 childrenCounter += children;
365 runs.add(runner);
366 }
367 }
368 return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners(createSuite(runs), childrenCounter);
369 }
370
371 private int countChildren(Runner runner) {
372 Description description = runner.getDescription();
373 Collection children = description == null ? null : description.getChildren();
374 return children == null ? 0 : children.size();
375 }
376
377 private ExecutorService createPool(int poolSize) {
378 return poolSize < Integer.MAX_VALUE
379 ? Executors.newFixedThreadPool(poolSize, DAEMON_THREAD_FACTORY)
380 : Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY);
381 }
382
383 private Scheduler createMaster(ExecutorService pool, int poolSize) {
384
385 final int finalRunnersCounter = countFinalRunners();
386
387 final SchedulingStrategy strategy;
388 if (finalRunnersCounter <= 1 || poolSize <= 1) {
389 strategy = new InvokerStrategy(ParallelComputerBuilder.this.logger);
390 } else if (pool != null && poolSize == Integer.MAX_VALUE) {
391 strategy = new SharedThreadPoolStrategy(ParallelComputerBuilder.this.logger, pool);
392 } else {
393 strategy = createParallelStrategy(ParallelComputerBuilder.this.logger, finalRunnersCounter);
394 }
395 return new Scheduler(ParallelComputerBuilder.this.logger, null, strategy);
396 }
397
398 private int countFinalRunners() {
399 int counter = notParallelRunners.isEmpty() ? 0 : 1;
400
401 if (!suites.isEmpty() && allGroups.get(SUITES) > 0) {
402 ++counter;
403 }
404
405 if (!classes.isEmpty() && allGroups.get(CLASSES) > 0) {
406 ++counter;
407 }
408
409 return counter;
410 }
411
412 private void populateChildrenFromSuites() {
413
414 Filter filter = new SuiteFilter();
415 for (Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); ) {
416 ParentRunner suite = it.next();
417 try {
418 suite.filter(filter);
419 } catch (NoTestsRemainException e) {
420 it.remove();
421 }
422 }
423 }
424
425 private int totalPoolSize() {
426 if (poolCapacity == TOTAL_POOL_SIZE_UNDEFINED) {
427 int total = 0;
428 for (int nThreads : allGroups.values()) {
429 total += nThreads;
430 if (total < 0) {
431 total = Integer.MAX_VALUE;
432 break;
433 }
434 }
435 return total;
436 } else {
437 return poolCapacity;
438 }
439 }
440
441 private Runner setSchedulers(ParentRunner suiteSuites, ParentRunner suiteClasses) throws InitializationError {
442 int parallelSuites = allGroups.get(SUITES);
443 int parallelClasses = allGroups.get(CLASSES);
444 int parallelMethods = allGroups.get(METHODS);
445 int poolSize = totalPoolSize();
446 ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool(poolSize);
447 master = createMaster(commonPool, poolSize);
448
449 if (suiteSuites != null) {
450
451 if (commonPool != null && parallelSuites > 0) {
452 Balancer balancer = BalancerFactory.createBalancerWithFairness(parallelSuites);
453 suiteSuites.setScheduler(createScheduler(null, commonPool, true, balancer));
454 } else {
455 suiteSuites.setScheduler(createScheduler(parallelSuites));
456 }
457 }
458
459
460 ArrayList<ParentRunner> allSuites = new ArrayList<>(suites);
461 allSuites.addAll(nestedSuites);
462 if (suiteClasses != null) {
463 allSuites.add(suiteClasses);
464 }
465 if (!allSuites.isEmpty()) {
466 setSchedulers(allSuites, parallelClasses, commonPool);
467 }
468
469
470 ArrayList<ParentRunner> allClasses = new ArrayList<>(classes);
471 allClasses.addAll(nestedClasses);
472 if (!allClasses.isEmpty()) {
473 setSchedulers(allClasses, parallelMethods, commonPool);
474 }
475
476
477 ParentRunner all = createFinalRunner(removeNullRunners(
478 Arrays.<Runner>asList(suiteSuites, suiteClasses, createSuite(notParallelRunners))));
479 all.setScheduler(master);
480 return all;
481 }
482
483 private ParentRunner createFinalRunner(List<Runner> runners) throws InitializationError {
484 return new Suite(null, runners) {
485 @Override
486 public void run(RunNotifier notifier) {
487 try {
488 beforeRunQuietly();
489 super.run(notifier);
490 } finally {
491 afterRunQuietly();
492 }
493 }
494 };
495 }
496
497 private void setSchedulers(Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool) {
498 if (commonPool != null) {
499 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness(poolSize);
500 boolean doParallel = poolSize > 0;
501 for (ParentRunner runner : runners) {
502 runner.setScheduler(
503 createScheduler(runner.getDescription(), commonPool, doParallel, concurrencyLimit));
504 }
505 } else {
506 ExecutorService pool = null;
507 if (poolSize == Integer.MAX_VALUE) {
508 pool = Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY);
509 } else if (poolSize > 0) {
510 pool = Executors.newFixedThreadPool(poolSize, DAEMON_THREAD_FACTORY);
511 }
512 boolean doParallel = pool != null;
513 for (ParentRunner runner : runners) {
514 runner.setScheduler(createScheduler(
515 runner.getDescription(),
516 pool,
517 doParallel,
518 BalancerFactory.createInfinitePermitsBalancer()));
519 }
520 }
521 }
522
523 private Scheduler createScheduler(
524 Description desc, ExecutorService pool, boolean doParallel, Balancer concurrency) {
525 SchedulingStrategy strategy = doParallel & pool != null
526 ? new SharedThreadPoolStrategy(ParallelComputerBuilder.this.logger, pool)
527 : new InvokerStrategy(ParallelComputerBuilder.this.logger);
528 return new Scheduler(ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency);
529 }
530
531 private Scheduler createScheduler(int poolSize) {
532 final SchedulingStrategy strategy;
533 if (poolSize == Integer.MAX_VALUE) {
534 strategy = createParallelStrategyUnbounded(ParallelComputerBuilder.this.logger);
535 } else if (poolSize == 0) {
536 strategy = new InvokerStrategy(ParallelComputerBuilder.this.logger);
537 } else {
538 strategy = createParallelStrategy(ParallelComputerBuilder.this.logger, poolSize);
539 }
540 return new Scheduler(ParallelComputerBuilder.this.logger, null, master, strategy);
541 }
542
543 private boolean canSchedule(Runner runner) {
544 return !(runner instanceof ErrorReportingRunner) && runner instanceof ParentRunner;
545 }
546
547 private boolean isThreadSafe(Runner runner) {
548 return runner.getDescription().getAnnotation(JCIP_NOT_THREAD_SAFE) == null;
549 }
550
551 private class SuiteFilter extends Filter {
552
553
554 @Override
555 public boolean shouldRun(Description description) {
556 return true;
557 }
558
559 @Override
560 public void apply(Object child) throws NoTestsRemainException {
561 super.apply(child);
562 if (child instanceof ParentRunner) {
563 ParentRunner runner = (ParentRunner) child;
564 if (!isThreadSafe(runner)) {
565 runner.setScheduler(notThreadSafeTests.newRunnerScheduler());
566 } else if (child instanceof Suite) {
567 nestedSuites.add((Suite) child);
568 } else {
569 ParentRunner parentRunner = (ParentRunner) child;
570 nestedClasses.add(parentRunner);
571 nestedClassesChildren +=
572 parentRunner.getDescription().getChildren().size();
573 }
574 }
575 }
576
577 @Override
578 public String describe() {
579 return "";
580 }
581 }
582 }
583
584 private static Suite createSuite(Collection<Runner> runners) throws InitializationError {
585 final List<Runner> onlyRunners = removeNullRunners(runners);
586 return onlyRunners.isEmpty() ? null : new Suite(null, onlyRunners) {};
587 }
588
589 private static List<Runner> removeNullRunners(Collection<Runner> runners) {
590 final List<Runner> onlyRunners = new ArrayList<>(runners);
591 onlyRunners.removeAll(NULL_SINGLETON);
592 return onlyRunners;
593 }
594 }