View Javadoc

1   package org.apache.maven.surefire.junitcore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Future;
30  
31  import org.apache.maven.surefire.util.NestedRuntimeException;
32  import org.junit.runner.Computer;
33  import org.junit.runner.Runner;
34  import org.junit.runners.ParentRunner;
35  import org.junit.runners.model.InitializationError;
36  import org.junit.runners.model.RunnerBuilder;
37  import org.junit.runners.model.RunnerScheduler;
38  
39  /*
40   * @author Kristian Rosenvold
41   */
42  public class ConfigurableParallelComputer
43      extends Computer
44  {
45      private final boolean fClasses;
46  
47      private final boolean fMethods;
48  
49      private final boolean fixedPool;
50  
51      private final ExecutorService fService;
52  
53      private final List<AsynchronousRunner> nonBlockers =
54          Collections.synchronizedList( new ArrayList<AsynchronousRunner>() );
55  
56  
57      public ConfigurableParallelComputer()
58      {
59          this( true, true, Executors.newCachedThreadPool(), false );
60      }
61  
62      public ConfigurableParallelComputer( boolean fClasses, boolean fMethods )
63      {
64          this( fClasses, fMethods, Executors.newCachedThreadPool(), false );
65      }
66  
67      public ConfigurableParallelComputer( boolean fClasses, boolean fMethods, Integer numberOfThreads, boolean perCore )
68      {
69          this( fClasses, fMethods, Executors.newFixedThreadPool(
70              numberOfThreads * ( perCore ? Runtime.getRuntime().availableProcessors() : 1 ) ), true );
71      }
72  
73      private ConfigurableParallelComputer( boolean fClasses, boolean fMethods, ExecutorService executorService,
74                                            boolean fixedPool )
75      {
76          this.fClasses = fClasses;
77          this.fMethods = fMethods;
78          fService = executorService;
79          this.fixedPool = fixedPool;
80      }
81  
82      @SuppressWarnings( { "UnusedDeclaration" } )
83      public void close()
84          throws ExecutionException
85      {
86          for ( AsynchronousRunner nonBlocker : nonBlockers )
87          {
88              nonBlocker.waitForCompletion();
89          }
90  
91          fService.shutdown();
92          try
93          {
94              fService.awaitTermination( 10, java.util.concurrent.TimeUnit.SECONDS );
95          }
96          catch ( InterruptedException e )
97          {
98              throw new NestedRuntimeException( e );
99          }
100     }
101 
102     private Runner parallelize( Runner runner, RunnerScheduler runnerInterceptor )
103     {
104         if ( runner instanceof ParentRunner<?> )
105         {
106             ( (ParentRunner<?>) runner ).setScheduler( runnerInterceptor );
107         }
108         return runner;
109     }
110 
111     private RunnerScheduler getMethodInterceptor()
112     {
113         if ( fClasses && fMethods )
114         {
115             final AsynchronousRunner blockingAsynchronousRunner = new AsynchronousRunner( fService );
116             nonBlockers.add( blockingAsynchronousRunner );
117             return blockingAsynchronousRunner;
118         }
119         return fMethods ? new AsynchronousRunner( fService ) : new SynchronousRunner();
120     }
121 
122     private RunnerScheduler getClassInterceptor()
123     {
124         if ( fClasses )
125         {
126             return fMethods ? new SynchronousRunner() : new AsynchronousRunner( fService );
127         }
128         return new SynchronousRunner();
129     }
130 
131     @Override
132     public Runner getSuite( RunnerBuilder builder, java.lang.Class<?>[] classes )
133         throws InitializationError
134     {
135         Runner suite = super.getSuite( builder, classes );
136         return fClasses ? parallelize( suite, getClassInterceptor() ) : suite;
137     }
138 
139     @Override
140     protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
141         throws Throwable
142     {
143         Runner runner = super.getRunner( builder, testClass );
144         return fMethods ? parallelize( runner, getMethodInterceptor() ) : runner;
145     }
146 
147     @Override
148     public String toString()
149     {
150         return "ConfigurableParallelComputer{" + "classes=" + fClasses + ", methods=" + fMethods + ", fixedPool="
151             + fixedPool + '}';
152     }
153 
154     private class SynchronousRunner
155         implements RunnerScheduler
156     {
157         public void schedule( final Runnable childStatement )
158         {
159             childStatement.run();
160         }
161 
162         public void finished()
163         {
164         }
165     }
166 
167     public class AsynchronousRunner
168         implements RunnerScheduler
169     {
170         private final List<Future<Object>> futures = Collections.synchronizedList( new ArrayList<Future<Object>>() );
171 
172         private final ExecutorService fService;
173 
174         public AsynchronousRunner( ExecutorService fService )
175         {
176             this.fService = fService;
177         }
178 
179         public void schedule( final Runnable childStatement )
180         {
181             final Callable<Object> objectCallable = new Callable<Object>()
182             {
183                 public Object call()
184                     throws Exception
185                 {
186                     childStatement.run();
187                     return null;
188                 }
189             };
190             futures.add( fService.submit( objectCallable ) );
191         }
192 
193 
194         public void finished()
195         {
196             try
197             {
198                 waitForCompletion();
199             }
200             catch ( ExecutionException e )
201             {
202                 throw new NestedRuntimeException( e );
203             }
204         }
205 
206         public void waitForCompletion()
207             throws ExecutionException
208         {
209             for ( Future<Object> each : futures )
210             {
211                 try
212                 {
213                     each.get();
214                 }
215                 catch ( InterruptedException e )
216                 {
217                     throw new NestedRuntimeException( e );
218                 }
219             }
220         }
221     }
222 }