View Javadoc

1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.junit.internal.runners.ErrorReportingRunner;
23  import org.junit.runner.Description;
24  import org.junit.runner.Runner;
25  import org.junit.runner.manipulation.Filter;
26  import org.junit.runner.manipulation.NoTestsRemainException;
27  import org.junit.runner.notification.RunNotifier;
28  import org.junit.runners.ParentRunner;
29  import org.junit.runners.Suite;
30  import org.junit.runners.model.InitializationError;
31  import org.junit.runners.model.RunnerBuilder;
32  
33  import java.util.ArrayList;
34  import java.util.Collection;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedHashSet;
39  import java.util.Map;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  
43  /**
44   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
45   * the suites and classes can be reused in parallel methods.
46   * <pre>
47   * ParallelComputerBuilder builder = new ParallelComputerBuilder();
48   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
49   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
50   * Class<?>[] tests = {...};
51   * new JUnitCore().run(computer, tests);
52   * </pre>
53   * Note that the type has always at least one thread even if unspecified. The capacity in
54   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes altogether.
55   * <p>
56   * The Computer can be shutdown in a separate thread. Pending tests will be interrupted if the argument is <tt>true</tt>.
57   * <pre>
58   * computer.shutdown(true);
59   * </pre>
60   *
61   * @author Tibor Digana (tibor17)
62   * @since 2.16
63   */
64  public class ParallelComputerBuilder {
65      private static enum Type {
66          SUITES, CLASSES, METHODS
67      }
68  
69      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
70      private final Map<Type, Integer> parallelGroups = new HashMap<Type, Integer>(3);
71      private boolean useSeparatePools;
72      private int totalPoolSize;
73  
74      /**
75       * Calling {@link #useSeparatePools()}.
76       */
77      public ParallelComputerBuilder() {
78          useSeparatePools();
79          parallelGroups.put(Type.SUITES, 0);
80          parallelGroups.put(Type.CLASSES, 0);
81          parallelGroups.put(Type.METHODS, 0);
82      }
83  
84      public ParallelComputerBuilder useSeparatePools() {
85          totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
86          useSeparatePools = true;
87          return this;
88      }
89  
90      public ParallelComputerBuilder useOnePool() {
91          totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
92          useSeparatePools = false;
93          return this;
94      }
95  
96      /**
97       * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
98       *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not limited.
99       * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
100      */
101     public ParallelComputerBuilder useOnePool(int totalPoolSize) {
102         if (totalPoolSize < 1) {
103             throw new IllegalArgumentException("Size of common pool is less than 1.");
104         }
105         this.totalPoolSize = totalPoolSize;
106         useSeparatePools = false;
107         return this;
108     }
109 
110     public ParallelComputerBuilder parallelSuites() {
111         return parallel(Type.SUITES);
112     }
113 
114     public ParallelComputerBuilder parallelSuites(int nThreads) {
115         return parallel(nThreads, Type.SUITES);
116     }
117 
118     public ParallelComputerBuilder parallelClasses() {
119         return parallel(Type.CLASSES);
120     }
121 
122     public ParallelComputerBuilder parallelClasses(int nThreads) {
123         return parallel(nThreads, Type.CLASSES);
124     }
125 
126     public ParallelComputerBuilder parallelMethods() {
127         return parallel(Type.METHODS);
128     }
129 
130     public ParallelComputerBuilder parallelMethods(int nThreads) {
131         return parallel(nThreads, Type.METHODS);
132     }
133 
134     private ParallelComputerBuilder parallel(int nThreads, Type parallelType) {
135         if (nThreads < 0) {
136             throw new IllegalArgumentException("negative nThreads " + nThreads);
137         }
138 
139         if (parallelType == null) {
140             throw new NullPointerException("null parallelType");
141         }
142 
143         parallelGroups.put(parallelType, nThreads);
144         return this;
145     }
146 
147     private ParallelComputerBuilder parallel(Type parallelType) {
148         return parallel(Integer.MAX_VALUE, parallelType);
149     }
150 
151     public ParallelComputer buildComputer() {
152         return new PC();
153     }
154 
155     final class PC extends ParallelComputer
156     {
157         final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
158         final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
159         final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
160         final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
161         final Collection<Runner> unscheduledRunners = new LinkedHashSet<Runner>();
162         final int poolCapacity;
163         final boolean splitPool;
164         private final Map<Type, Integer> allGroups;
165         private volatile Scheduler master;
166 
167         private PC() {
168             allGroups = new HashMap<Type, Integer>(ParallelComputerBuilder.this.parallelGroups);
169             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
170             splitPool = ParallelComputerBuilder.this.useSeparatePools;
171         }
172 
173         @Override
174         public Collection<Description> shutdown(boolean shutdownNow) {
175             final Scheduler master = this.master;
176             return master == null ? Collections.<Description>emptyList() : master.shutdown(shutdownNow);
177         }
178 
179         @Override
180         public Runner getSuite(RunnerBuilder builder, Class<?>[] cls) throws InitializationError {
181             super.getSuite(builder, cls);
182             populateChildrenFromSuites();
183             return setSchedulers();
184         }
185 
186         @Override
187         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass ) throws Throwable
188         {
189             Runner runner = super.getRunner( builder, testClass );
190             if ( canSchedule(runner) )
191             {
192                 if ( runner instanceof Suite )
193                 {
194                     suites.add( (Suite) runner );
195                 }
196                 else
197                 {
198                     classes.add( (ParentRunner) runner );
199                 }
200             }
201             else
202             {
203                 unscheduledRunners.add( runner );
204             }
205             return runner;
206         }
207 
208         private class SuiteFilter extends Filter {
209             @Override
210             public boolean shouldRun(Description description) {
211                 return true;
212             }
213 
214             @Override
215             public void apply(Object child) throws NoTestsRemainException {
216                 super.apply(child);
217                 if (child instanceof Suite) {
218                     nestedSuites.add((Suite) child);
219                 } else if (child instanceof ParentRunner) {
220                     nestedClasses.add((ParentRunner) child);
221                 }
222             }
223 
224             @Override
225             public String describe() {
226                 return "";
227             }
228         }
229 
230         private <T extends Runner> ParentRunner wrapRunners( Collection<T> runners ) throws InitializationError {
231             ArrayList<Runner> runs = new ArrayList<Runner>();
232             for ( T runner : runners )
233             {
234                 if ( runner != null && hasChildren( runner ) )
235                 {
236                     runs.add( runner );
237                 }
238             }
239 
240             return runs.isEmpty() ? null : new Suite( null, runs ) {};
241         }
242 
243         private boolean hasChildren( Runner runner )
244         {
245             Description description = runner.getDescription();
246             Collection children = description == null ? null : description.getChildren();
247             return children != null && !children.isEmpty();
248         }
249 
250         private ExecutorService createPool(int poolSize) {
251             return poolSize < Integer.MAX_VALUE ? Executors.newFixedThreadPool(poolSize) : Executors.newCachedThreadPool();
252         }
253 
254         private Scheduler createMaster(ExecutorService pool, int poolSize) {
255             if (!areSuitesAndClassesParallel() || poolSize <= 1) {
256                 return new Scheduler(null, new InvokerStrategy());
257             } else if (pool != null && poolSize == Integer.MAX_VALUE) {
258                 return new Scheduler(null, new SharedThreadPoolStrategy(pool));
259             } else {
260                 return new Scheduler(null, SchedulingStrategies.createParallelStrategy(2));
261             }
262         }
263 
264         private boolean areSuitesAndClassesParallel() {
265             return !suites.isEmpty() && allGroups.get(Type.SUITES) > 0 && !classes.isEmpty() && allGroups.get(Type.CLASSES) > 0;
266         }
267 
268         private void populateChildrenFromSuites() {
269             Filter filter = new SuiteFilter();
270             for (Iterator<ParentRunner> it = suites.iterator(); it.hasNext();) {
271                 ParentRunner suite = it.next();
272                 try {
273                     suite.filter(filter);
274                 } catch (NoTestsRemainException e) {
275                     it.remove();
276                 }
277             }
278         }
279 
280         private int totalPoolSize() {
281             if (poolCapacity == TOTAL_POOL_SIZE_UNDEFINED) {
282                 int total = 0;
283                 for (int nThreads : allGroups.values()) {
284                     total += nThreads;
285                     if (total < 0) {
286                         total = Integer.MAX_VALUE;
287                         break;
288                     }
289                 }
290                 return total;
291             } else {
292                 return poolCapacity;
293             }
294         }
295 
296         private Runner setSchedulers() throws InitializationError {
297             int parallelSuites = allGroups.get(Type.SUITES);
298             int parallelClasses = allGroups.get(Type.CLASSES);
299             int parallelMethods = allGroups.get(Type.METHODS);
300             int poolSize = totalPoolSize();
301             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool(poolSize);
302             master = createMaster(commonPool, poolSize);
303 
304             ParentRunner suiteSuites = wrapRunners( suites );
305             if ( suiteSuites != null )
306             {
307                 // a scheduler for parallel suites
308                 if ( commonPool != null && parallelSuites > 0 )
309                 {
310                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
311                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
312                 }
313                 else
314                 {
315                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
316                 }
317             }
318 
319             // schedulers for parallel classes
320             ParentRunner suiteClasses = wrapRunners( classes );
321             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
322             allSuites.addAll( nestedSuites );
323             if ( suiteClasses != null )
324             {
325                 allSuites.add( suiteClasses );
326             }
327             if ( !allSuites.isEmpty() )
328             {
329                 setSchedulers( allSuites, parallelClasses, commonPool );
330             }
331 
332             // schedulers for parallel methods
333             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
334             allClasses.addAll( nestedClasses );
335             if ( !allClasses.isEmpty() )
336             {
337                 setSchedulers( allClasses, parallelMethods, commonPool );
338             }
339 
340             // resulting runner for Computer#getSuite() scheduled by master scheduler
341             ParentRunner all = createFinalRunner( suiteSuites, suiteClasses );
342             all.setScheduler( master );
343             return all;
344         }
345 
346         private ParentRunner createFinalRunner( Runner... runners ) throws InitializationError
347         {
348             ArrayList<Runner> all = new ArrayList<Runner>( unscheduledRunners );
349             for ( Runner runner : runners )
350             {
351                 if ( runner != null )
352                 {
353                     all.add( runner );
354                 }
355             }
356 
357             return new Suite( null, all )
358             {
359                 @Override
360                 public void run( RunNotifier notifier )
361                 {
362                     try
363                     {
364                         super.run( notifier );
365                     }
366                     finally
367                     {
368                         afterRunQuietly();
369                     }
370                 }
371             };
372         }
373 
374         private void setSchedulers(Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool) {
375             if (commonPool != null) {
376                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness(poolSize);
377                 boolean doParallel = poolSize > 0;
378                 for (ParentRunner runner : runners) {
379                     runner.setScheduler(createScheduler(runner.getDescription(), commonPool, doParallel, concurrencyLimit));
380                 }
381             } else {
382                 ExecutorService pool = null;
383                 if (poolSize == Integer.MAX_VALUE) {
384                     pool = Executors.newCachedThreadPool();
385                 } else if (poolSize > 0) {
386                     pool = Executors.newFixedThreadPool(poolSize);
387                 }
388                 boolean doParallel = pool != null;
389                 for (ParentRunner runner : runners) {
390                     runner.setScheduler(createScheduler(runner.getDescription(), pool, doParallel,
391                             BalancerFactory.createInfinitePermitsBalancer()));
392                 }
393             }
394         }
395 
396         private Scheduler createScheduler(Description desc, ExecutorService pool, boolean doParallel, Balancer concurrency) {
397             doParallel &= pool != null;
398             SchedulingStrategy strategy = doParallel ? new SharedThreadPoolStrategy(pool) : new InvokerStrategy();
399             return new Scheduler(desc, master, strategy, concurrency);
400         }
401 
402         private Scheduler createScheduler(int poolSize) {
403             if (poolSize == Integer.MAX_VALUE) {
404                 return new Scheduler(null, master, SchedulingStrategies.createParallelStrategyUnbounded());
405             } else if (poolSize == 0) {
406                 return new Scheduler(null, master, new InvokerStrategy());
407             } else {
408                 return new Scheduler(null, master, SchedulingStrategies.createParallelStrategy(poolSize));
409             }
410         }
411 
412         private boolean canSchedule(Runner runner) {
413             return !(runner instanceof ErrorReportingRunner) && runner instanceof ParentRunner;
414         }
415     }
416 }