View Javadoc

1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.junit.runner.Description;
23  import org.junit.runners.model.RunnerScheduler;
24  
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Set;
28  import java.util.concurrent.CopyOnWriteArraySet;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadPoolExecutor;
32  
33  /**
34   *
35   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
36   * a master scheduler can shutdown slaves.
37   * <p>
38   * The scheduler objects should be first created (and wired) and set in runners
39   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
40   * <p>
41   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
42   *
43   * @author Tibor Digana (tibor17)
44   * @since 2.16
45   */
46  public class Scheduler implements RunnerScheduler {
47      private final Balancer balancer;
48      private final SchedulingStrategy strategy;
49      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
50      private final Description description;
51      private volatile boolean shutdown = false;
52      private volatile boolean started = false;
53      private volatile Controller masterController;
54  
55      /**
56       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
57       * <p>
58       * You can use it with one infinite thread pool shared in strategies across all
59       * suites, class runners, etc.
60       */
61      public Scheduler(Description description, SchedulingStrategy strategy) {
62          this(description, strategy, -1);
63      }
64  
65      /**
66       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
67       * <p>
68       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
69       * {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy)}
70       * or {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
71       *
72       * @param description description of current runner
73       * @param strategy scheduling strategy with a shared thread pool
74       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
75       * @throws NullPointerException if null <tt>strategy</tt>
76       */
77      public Scheduler(Description description, SchedulingStrategy strategy, int concurrency) {
78          this(description, strategy, BalancerFactory.createBalancer(concurrency));
79      }
80  
81      /**
82       * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
83       * against other groups of schedulers. The schedulers share one pool.
84       * <p>
85       * Unlike in {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} which was limiting
86       * the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em> <tt>balancer</tt>
87       * is limiting the concurrency of all children in runners having schedulers created by this constructor.
88       *
89       * @param description description of current runner
90       * @param strategy scheduling strategy which may share threads with other strategy
91       * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
92       * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
93       */
94      public Scheduler(Description description, SchedulingStrategy strategy, Balancer balancer) {
95          strategy.setDefaultShutdownHandler(newShutdownHandler());
96          this.description = description;
97          this.strategy = strategy;
98          this.balancer = balancer;
99          masterController = null;
100     }
101     /**
102      * Can be used by e.g. a runner having parallel classes in use case with parallel
103      * suites, classes and methods sharing the same thread pool.
104      *
105      * @param description description of current runner
106      * @param masterScheduler scheduler sharing own threads with this slave
107      * @param strategy scheduling strategy for this scheduler
108      * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
109      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
110      */
111     public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy, Balancer balancer) {
112         this(description, strategy, balancer);
113         strategy.setDefaultShutdownHandler(newShutdownHandler());
114         masterScheduler.register(this);
115     }
116 
117     /**
118      * @param masterScheduler a reference to {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)}
119      *                        or {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy)}
120      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy)
121      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)
122      */
123     public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy, int concurrency) {
124         this(description, strategy, concurrency);
125         strategy.setDefaultShutdownHandler(newShutdownHandler());
126         masterScheduler.register(this);
127     }
128 
129     /**
130      * Should be used with individual pools on suites, classes and methods, see
131      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
132      * <p>
133      * Cached thread pool is infinite and can be always shared.
134      */
135     public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy) {
136         this(description, masterScheduler, strategy, 0);
137     }
138 
139     private void setController(Controller masterController) {
140         if (masterController == null) {
141             throw new NullPointerException("null ExecutionController");
142         }
143         this.masterController = masterController;
144     }
145 
146     /**
147      * @param slave a slave scheduler to register
148      * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
149      */
150     private boolean register(Scheduler slave) {
151         boolean canRegister = slave != null && slave != this;
152         if (canRegister) {
153             Controller controller = new Controller(slave);
154             canRegister = !slaves.contains(controller);
155             if (canRegister) {
156                 slaves.add(controller);
157                 slave.setController(controller);
158             }
159         }
160         return canRegister;
161     }
162 
163     /**
164      * @return <tt>true</tt> if new tasks can be scheduled.
165      */
166     private boolean canSchedule() {
167         return !shutdown && (masterController == null || masterController.canSchedule());
168     }
169 
170     protected void logQuietly(Throwable t) {
171         t.printStackTrace(System.err);
172     }
173 
174     protected void logQuietly(String msg) {
175         System.err.println(msg);
176     }
177 
178     /**
179      * Attempts to stop all actively executing tasks and immediately returns a collection
180      * of descriptions of those tasks which have started prior to this call.
181      * <p>
182      * This scheduler and other registered schedulers will shutdown, see {@link #register(Scheduler)}.
183      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
184      *
185      * @param shutdownNow if <tt>true</tt> interrupts waiting methods
186      * @return collection of descriptions started before shutting down
187      */
188     public Collection<Description> shutdown(boolean shutdownNow) {
189         shutdown = true;
190         ArrayList<Description> activeChildren = new ArrayList<Description>();
191 
192         if (started && description != null) {
193             activeChildren.add(description);
194         }
195 
196         for (Controller slave : slaves) {
197             try {
198                 activeChildren.addAll(slave.shutdown(shutdownNow));
199             } catch (Throwable t) {
200                 logQuietly(t);
201             }
202         }
203 
204         try {
205             balancer.releaseAllPermits();
206         } finally {
207             if (shutdownNow) {
208                 strategy.stopNow();
209             } else {
210                 strategy.stop();
211             }
212         }
213 
214         return activeChildren;
215     }
216 
217     protected void beforeExecute() {
218     }
219 
220     protected void afterExecute() {
221     }
222 
223     public void schedule(Runnable childStatement) {
224         if (childStatement == null) {
225             logQuietly("cannot schedule null");
226         } else if (canSchedule() && strategy.canSchedule()) {
227             try {
228                 balancer.acquirePermit();
229                 Runnable task = wrapTask(childStatement);
230                 strategy.schedule(task);
231                 started = true;
232             } catch (RejectedExecutionException e) {
233                 shutdown(false);
234             } catch (Throwable t) {
235                 balancer.releasePermit();
236                 logQuietly(t);
237             }
238         }
239     }
240 
241     public void finished() {
242         try {
243             strategy.finished();
244         } catch (InterruptedException e) {
245             logQuietly(e);
246         } finally {
247             for (Controller slave : slaves) {
248                 slave.awaitFinishedQuietly();
249             }
250         }
251     }
252 
253     private Runnable wrapTask(final Runnable task) {
254         return new Runnable() {
255             public void run() {
256                 try {
257                     beforeExecute();
258                     task.run();
259                 } finally {
260                     try {
261                         afterExecute();
262                     } finally {
263                         balancer.releasePermit();
264                     }
265                 }
266             }
267         };
268     }
269 
270     protected ShutdownHandler newShutdownHandler() {
271         return new ShutdownHandler();
272     }
273 
274     /**
275      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
276      */
277     private final class Controller {
278         private final Scheduler slave;
279 
280         private Controller(Scheduler slave) {
281             this.slave = slave;
282         }
283 
284         /**
285          * @return <tt>true</tt> if new children can be scheduled.
286          */
287         boolean canSchedule() {
288             return Scheduler.this.canSchedule();
289         }
290 
291         void awaitFinishedQuietly() {
292             try {
293                 slave.finished();
294             } catch(Throwable t) {
295                 slave.logQuietly(t);
296             }
297         }
298 
299         Collection<Description> shutdown(boolean shutdownNow) {
300             return slave.shutdown(shutdownNow);
301         }
302 
303         @Override
304         public int hashCode() {
305             return slave.hashCode();
306         }
307 
308         @Override
309         public boolean equals(Object o) {
310             return o == this || (o instanceof Controller) && slave.equals(((Controller) o).slave);
311         }
312     }
313 
314     public class ShutdownHandler implements RejectedExecutionHandler {
315         private volatile RejectedExecutionHandler poolHandler;
316 
317         protected ShutdownHandler() {
318             poolHandler = null;
319         }
320 
321         public void setRejectedExecutionHandler(RejectedExecutionHandler poolHandler) {
322             this.poolHandler = poolHandler;
323         }
324 
325         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
326             if (executor.isShutdown()) {
327                 shutdown(false);
328             }
329             final RejectedExecutionHandler poolHandler = this.poolHandler;
330             if (poolHandler != null) {
331                 poolHandler.rejectedExecution(r, executor);
332             }
333         }
334     }
335 }