View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.junitcore.pc;
20  
21  import java.lang.annotation.Annotation;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.EnumMap;
27  import java.util.Iterator;
28  import java.util.LinkedHashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.ThreadFactory;
35  
36  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
37  import org.apache.maven.surefire.api.testset.TestSetFailedException;
38  import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
39  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
40  import org.junit.internal.runners.ErrorReportingRunner;
41  import org.junit.runner.Description;
42  import org.junit.runner.Runner;
43  import org.junit.runner.manipulation.Filter;
44  import org.junit.runner.manipulation.NoTestsRemainException;
45  import org.junit.runner.notification.RunNotifier;
46  import org.junit.runners.ParentRunner;
47  import org.junit.runners.Suite;
48  import org.junit.runners.model.InitializationError;
49  import org.junit.runners.model.RunnerBuilder;
50  
51  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
52  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
53  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
54  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
55  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
56  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
57  
58  @SuppressWarnings({"javadoc", "checkstyle:javadoctype"})
59  /*
60   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
61   * the suites and classes can be reused in parallel methods.
62   * <pre>
63   * JUnitCoreParameters parameters = ...;
64   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
65   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
66   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
67   * Class<?>[] tests = {...};
68   * new JUnitCore().run(computer, tests);
69   * </pre>
70   * Note that the type has always at least one thread even if unspecified. The capacity in
71   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
72   * altogether.
73   * <br>
74   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
75   * {@code true}.
76   * <pre>
77   * computer.describeStopped(true);
78   * </pre>
79   *
80   * @author Tibor Digana (tibor17)
81   * @since 2.16
82   */
83  public final class ParallelComputerBuilder {
84      private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
85  
86      private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
87  
88      private static final Set<Runner> NULL_SINGLETON = Collections.singleton(null);
89  
90      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
91  
92      private final Map<Type, Integer> parallelGroups = new EnumMap<>(Type.class);
93  
94      private final ConsoleLogger logger;
95  
96      private boolean useSeparatePools;
97  
98      private int totalPoolSize;
99  
100     private JUnitCoreParameters parameters;
101 
102     private boolean optimize;
103 
104     private boolean runningInTests;
105 
106     /**
107      * Calling {@link #useSeparatePools()}.
108      * Can be used only in unit tests.
109      * Do NOT call this constructor in production.
110      */
111     ParallelComputerBuilder(ConsoleLogger logger) {
112         this.logger = logger;
113         runningInTests = true;
114         useSeparatePools();
115         parallelGroups.put(SUITES, 0);
116         parallelGroups.put(CLASSES, 0);
117         parallelGroups.put(METHODS, 0);
118     }
119 
120     public ParallelComputerBuilder(ConsoleLogger logger, JUnitCoreParameters parameters) {
121         this(logger);
122         runningInTests = false;
123         this.parameters = parameters;
124     }
125 
126     public ParallelComputer buildComputer() {
127         return new PC();
128     }
129 
130     ParallelComputerBuilder useSeparatePools() {
131         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
132         useSeparatePools = true;
133         return this;
134     }
135 
136     ParallelComputerBuilder useOnePool() {
137         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
138         useSeparatePools = false;
139         return this;
140     }
141 
142     /**
143      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
144      *                      If the <code>totalPoolSize</code> is {@link Integer#MAX_VALUE}, the pool capacity is not
145      *                      limited.
146      * @throws IllegalArgumentException If <code>totalPoolSize</code> is &lt; 1.
147      */
148     ParallelComputerBuilder useOnePool(int totalPoolSize) {
149         if (totalPoolSize < 1) {
150             throw new IllegalArgumentException("Size of common pool is less than 1.");
151         }
152         this.totalPoolSize = totalPoolSize;
153         useSeparatePools = false;
154         return this;
155     }
156 
157     boolean isOptimized() {
158         return optimize;
159     }
160 
161     ParallelComputerBuilder optimize(boolean optimize) {
162         this.optimize = optimize;
163         return this;
164     }
165 
166     ParallelComputerBuilder parallelSuites() {
167         return parallel(SUITES);
168     }
169 
170     ParallelComputerBuilder parallelSuites(int nThreads) {
171         return parallel(nThreads, SUITES);
172     }
173 
174     ParallelComputerBuilder parallelClasses() {
175         return parallel(CLASSES);
176     }
177 
178     ParallelComputerBuilder parallelClasses(int nThreads) {
179         return parallel(nThreads, CLASSES);
180     }
181 
182     ParallelComputerBuilder parallelMethods() {
183         return parallel(METHODS);
184     }
185 
186     ParallelComputerBuilder parallelMethods(int nThreads) {
187         return parallel(nThreads, METHODS);
188     }
189 
190     private ParallelComputerBuilder parallel(int nThreads, Type parallelType) {
191         if (nThreads < 0) {
192             throw new IllegalArgumentException("negative nThreads " + nThreads);
193         }
194 
195         if (parallelType == null) {
196             throw new IllegalArgumentException("null parallelType");
197         }
198 
199         parallelGroups.put(parallelType, nThreads);
200         return this;
201     }
202 
203     private ParallelComputerBuilder parallel(Type parallelType) {
204         return parallel(Integer.MAX_VALUE, parallelType);
205     }
206 
207     private double parallelTestsTimeoutInSeconds() {
208         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
209     }
210 
211     private double parallelTestsTimeoutForcedInSeconds() {
212         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
213     }
214 
215     @SuppressWarnings("unchecked")
216     private static Class<? extends Annotation> loadNotThreadSafeAnnotations() {
217         try {
218             Class c = Class.forName("net.jcip.annotations.NotThreadSafe");
219             return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
220         } catch (ClassNotFoundException e) {
221             return null;
222         }
223     }
224 
225     final class PC extends ParallelComputer {
226         private final SingleThreadScheduler notThreadSafeTests =
227                 new SingleThreadScheduler(ParallelComputerBuilder.this.logger);
228 
229         private final Collection<ParentRunner> suites = new LinkedHashSet<>();
230 
231         private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<>();
232 
233         private final Collection<ParentRunner> classes = new LinkedHashSet<>();
234 
235         private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<>();
236 
237         private final Collection<Runner> notParallelRunners = new LinkedHashSet<>();
238 
239         private int poolCapacity;
240 
241         private boolean splitPool;
242 
243         private final Map<Type, Integer> allGroups;
244 
245         private long nestedClassesChildren;
246 
247         private volatile Scheduler master;
248 
249         private PC() {
250             super(parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds());
251             allGroups = new EnumMap<>(ParallelComputerBuilder.this.parallelGroups);
252             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
253             splitPool = ParallelComputerBuilder.this.useSeparatePools;
254         }
255 
256         Collection<ParentRunner> getSuites() {
257             return suites;
258         }
259 
260         Collection<ParentRunner> getNestedSuites() {
261             return nestedSuites;
262         }
263 
264         Collection<ParentRunner> getClasses() {
265             return classes;
266         }
267 
268         Collection<ParentRunner> getNestedClasses() {
269             return nestedClasses;
270         }
271 
272         Collection<Runner> getNotParallelRunners() {
273             return notParallelRunners;
274         }
275 
276         int getPoolCapacity() {
277             return poolCapacity;
278         }
279 
280         boolean isSplitPool() {
281             return splitPool;
282         }
283 
284         @Override
285         protected ShutdownResult describeStopped(boolean shutdownNow) {
286             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped(shutdownNow);
287             final Scheduler m = master;
288             if (m != null) {
289                 ShutdownResult shutdownResultOfMaster = m.describeStopped(shutdownNow);
290                 shutdownResult.getTriggeredTests().addAll(shutdownResultOfMaster.getTriggeredTests());
291                 shutdownResult.getIncompleteTests().addAll(shutdownResultOfMaster.getIncompleteTests());
292             }
293             return shutdownResult;
294         }
295 
296         @Override
297         protected boolean shutdownThreadPoolsAwaitingKilled() {
298             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
299             final Scheduler m = master;
300             if (m != null) {
301                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
302             }
303             return notInterrupted;
304         }
305 
306         @Override
307         public Runner getSuite(RunnerBuilder builder, Class<?>[] cls) throws InitializationError {
308             try {
309                 super.getSuite(builder, cls);
310                 populateChildrenFromSuites();
311 
312                 WrappedRunners suiteSuites = wrapRunners(suites);
313                 WrappedRunners suiteClasses = wrapRunners(classes);
314 
315                 long suitesCount = suites.size();
316                 long classesCount = classes.size() + nestedClasses.size();
317                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
318                 if (!ParallelComputerBuilder.this.runningInTests) {
319                     determineThreadCounts(suitesCount, classesCount, methodsCount);
320                 }
321 
322                 return setSchedulers(suiteSuites.wrappingSuite, suiteClasses.wrappingSuite);
323             } catch (TestSetFailedException e) {
324                 throw new InitializationError(Collections.<Throwable>singletonList(e));
325             }
326         }
327 
328         @Override
329         protected Runner getRunner(RunnerBuilder builder, Class<?> testClass) throws Throwable {
330             Runner runner = super.getRunner(builder, testClass);
331             if (canSchedule(runner)) {
332                 if (!isThreadSafe(runner)) {
333                     ((ParentRunner) runner).setScheduler(notThreadSafeTests.newRunnerScheduler());
334                     notParallelRunners.add(runner);
335                 } else if (runner instanceof Suite) {
336                     suites.add((Suite) runner);
337                 } else {
338                     classes.add((ParentRunner) runner);
339                 }
340             } else {
341                 notParallelRunners.add(runner);
342             }
343             return runner;
344         }
345 
346         private void determineThreadCounts(long suites, long classes, long methods) throws TestSetFailedException {
347             RunnerCounter counts =
348                     ParallelComputerBuilder.this.optimize ? new RunnerCounter(suites, classes, methods) : null;
349             Concurrency concurrency = resolveConcurrency(ParallelComputerBuilder.this.parameters, counts);
350             allGroups.put(SUITES, concurrency.suites);
351             allGroups.put(CLASSES, concurrency.classes);
352             allGroups.put(METHODS, concurrency.methods);
353             poolCapacity = concurrency.capacity;
354             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
355         }
356 
357         private <T extends Runner> WrappedRunners wrapRunners(Collection<T> runners) throws InitializationError {
358             // Do NOT use allGroups here.
359             long childrenCounter = 0;
360             ArrayList<Runner> runs = new ArrayList<>();
361             for (T runner : runners) {
362                 if (runner != null) {
363                     int children = countChildren(runner);
364                     childrenCounter += children;
365                     runs.add(runner);
366                 }
367             }
368             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners(createSuite(runs), childrenCounter);
369         }
370 
371         private int countChildren(Runner runner) {
372             Description description = runner.getDescription();
373             Collection children = description == null ? null : description.getChildren();
374             return children == null ? 0 : children.size();
375         }
376 
377         private ExecutorService createPool(int poolSize) {
378             return poolSize < Integer.MAX_VALUE
379                     ? Executors.newFixedThreadPool(poolSize, DAEMON_THREAD_FACTORY)
380                     : Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY);
381         }
382 
383         private Scheduler createMaster(ExecutorService pool, int poolSize) {
384             // can be 0, 1, 2 or 3
385             final int finalRunnersCounter = countFinalRunners();
386 
387             final SchedulingStrategy strategy;
388             if (finalRunnersCounter <= 1 || poolSize <= 1) {
389                 strategy = new InvokerStrategy(ParallelComputerBuilder.this.logger);
390             } else if (pool != null && poolSize == Integer.MAX_VALUE) {
391                 strategy = new SharedThreadPoolStrategy(ParallelComputerBuilder.this.logger, pool);
392             } else {
393                 strategy = createParallelStrategy(ParallelComputerBuilder.this.logger, finalRunnersCounter);
394             }
395             return new Scheduler(ParallelComputerBuilder.this.logger, null, strategy);
396         }
397 
398         private int countFinalRunners() {
399             int counter = notParallelRunners.isEmpty() ? 0 : 1;
400 
401             if (!suites.isEmpty() && allGroups.get(SUITES) > 0) {
402                 ++counter;
403             }
404 
405             if (!classes.isEmpty() && allGroups.get(CLASSES) > 0) {
406                 ++counter;
407             }
408 
409             return counter;
410         }
411 
412         private void populateChildrenFromSuites() {
413             // Do NOT use allGroups here.
414             Filter filter = new SuiteFilter();
415             for (Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); ) {
416                 ParentRunner suite = it.next();
417                 try {
418                     suite.filter(filter);
419                 } catch (NoTestsRemainException e) {
420                     it.remove();
421                 }
422             }
423         }
424 
425         private int totalPoolSize() {
426             if (poolCapacity == TOTAL_POOL_SIZE_UNDEFINED) {
427                 int total = 0;
428                 for (int nThreads : allGroups.values()) {
429                     total += nThreads;
430                     if (total < 0) {
431                         total = Integer.MAX_VALUE;
432                         break;
433                     }
434                 }
435                 return total;
436             } else {
437                 return poolCapacity;
438             }
439         }
440 
441         private Runner setSchedulers(ParentRunner suiteSuites, ParentRunner suiteClasses) throws InitializationError {
442             int parallelSuites = allGroups.get(SUITES);
443             int parallelClasses = allGroups.get(CLASSES);
444             int parallelMethods = allGroups.get(METHODS);
445             int poolSize = totalPoolSize();
446             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool(poolSize);
447             master = createMaster(commonPool, poolSize);
448 
449             if (suiteSuites != null) {
450                 // a scheduler for parallel suites
451                 if (commonPool != null && parallelSuites > 0) {
452                     Balancer balancer = BalancerFactory.createBalancerWithFairness(parallelSuites);
453                     suiteSuites.setScheduler(createScheduler(null, commonPool, true, balancer));
454                 } else {
455                     suiteSuites.setScheduler(createScheduler(parallelSuites));
456                 }
457             }
458 
459             // schedulers for parallel classes
460             ArrayList<ParentRunner> allSuites = new ArrayList<>(suites);
461             allSuites.addAll(nestedSuites);
462             if (suiteClasses != null) {
463                 allSuites.add(suiteClasses);
464             }
465             if (!allSuites.isEmpty()) {
466                 setSchedulers(allSuites, parallelClasses, commonPool);
467             }
468 
469             // schedulers for parallel methods
470             ArrayList<ParentRunner> allClasses = new ArrayList<>(classes);
471             allClasses.addAll(nestedClasses);
472             if (!allClasses.isEmpty()) {
473                 setSchedulers(allClasses, parallelMethods, commonPool);
474             }
475 
476             // resulting runner for Computer#getSuite() scheduled by master scheduler
477             ParentRunner all = createFinalRunner(removeNullRunners(
478                     Arrays.<Runner>asList(suiteSuites, suiteClasses, createSuite(notParallelRunners))));
479             all.setScheduler(master);
480             return all;
481         }
482 
483         private ParentRunner createFinalRunner(List<Runner> runners) throws InitializationError {
484             return new Suite(null, runners) {
485                 @Override
486                 public void run(RunNotifier notifier) {
487                     try {
488                         beforeRunQuietly();
489                         super.run(notifier);
490                     } finally {
491                         afterRunQuietly();
492                     }
493                 }
494             };
495         }
496 
497         private void setSchedulers(Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool) {
498             if (commonPool != null) {
499                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness(poolSize);
500                 boolean doParallel = poolSize > 0;
501                 for (ParentRunner runner : runners) {
502                     runner.setScheduler(
503                             createScheduler(runner.getDescription(), commonPool, doParallel, concurrencyLimit));
504                 }
505             } else {
506                 ExecutorService pool = null;
507                 if (poolSize == Integer.MAX_VALUE) {
508                     pool = Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY);
509                 } else if (poolSize > 0) {
510                     pool = Executors.newFixedThreadPool(poolSize, DAEMON_THREAD_FACTORY);
511                 }
512                 boolean doParallel = pool != null;
513                 for (ParentRunner runner : runners) {
514                     runner.setScheduler(createScheduler(
515                             runner.getDescription(),
516                             pool,
517                             doParallel,
518                             BalancerFactory.createInfinitePermitsBalancer()));
519                 }
520             }
521         }
522 
523         private Scheduler createScheduler(
524                 Description desc, ExecutorService pool, boolean doParallel, Balancer concurrency) {
525             SchedulingStrategy strategy = doParallel & pool != null
526                     ? new SharedThreadPoolStrategy(ParallelComputerBuilder.this.logger, pool)
527                     : new InvokerStrategy(ParallelComputerBuilder.this.logger);
528             return new Scheduler(ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency);
529         }
530 
531         private Scheduler createScheduler(int poolSize) {
532             final SchedulingStrategy strategy;
533             if (poolSize == Integer.MAX_VALUE) {
534                 strategy = createParallelStrategyUnbounded(ParallelComputerBuilder.this.logger);
535             } else if (poolSize == 0) {
536                 strategy = new InvokerStrategy(ParallelComputerBuilder.this.logger);
537             } else {
538                 strategy = createParallelStrategy(ParallelComputerBuilder.this.logger, poolSize);
539             }
540             return new Scheduler(ParallelComputerBuilder.this.logger, null, master, strategy);
541         }
542 
543         private boolean canSchedule(Runner runner) {
544             return !(runner instanceof ErrorReportingRunner) && runner instanceof ParentRunner;
545         }
546 
547         private boolean isThreadSafe(Runner runner) {
548             return runner.getDescription().getAnnotation(JCIP_NOT_THREAD_SAFE) == null;
549         }
550 
551         private class SuiteFilter extends Filter {
552             // Do NOT use allGroups in SuiteFilter.
553 
554             @Override
555             public boolean shouldRun(Description description) {
556                 return true;
557             }
558 
559             @Override
560             public void apply(Object child) throws NoTestsRemainException {
561                 super.apply(child);
562                 if (child instanceof ParentRunner) {
563                     ParentRunner runner = (ParentRunner) child;
564                     if (!isThreadSafe(runner)) {
565                         runner.setScheduler(notThreadSafeTests.newRunnerScheduler());
566                     } else if (child instanceof Suite) {
567                         nestedSuites.add((Suite) child);
568                     } else {
569                         ParentRunner parentRunner = (ParentRunner) child;
570                         nestedClasses.add(parentRunner);
571                         nestedClassesChildren +=
572                                 parentRunner.getDescription().getChildren().size();
573                     }
574                 }
575             }
576 
577             @Override
578             public String describe() {
579                 return "";
580             }
581         }
582     }
583 
584     private static Suite createSuite(Collection<Runner> runners) throws InitializationError {
585         final List<Runner> onlyRunners = removeNullRunners(runners);
586         return onlyRunners.isEmpty() ? null : new Suite(null, onlyRunners) {};
587     }
588 
589     private static List<Runner> removeNullRunners(Collection<Runner> runners) {
590         final List<Runner> onlyRunners = new ArrayList<>(runners);
591         onlyRunners.removeAll(NULL_SINGLETON);
592         return onlyRunners;
593     }
594 }