1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.report.ConsoleStream;
23 import org.junit.runner.Description;
24 import org.junit.runners.model.RunnerScheduler;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.PrintStream;
28 import java.util.Collection;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CopyOnWriteArraySet;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.RejectedExecutionHandler;
34 import java.util.concurrent.ThreadPoolExecutor;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class Scheduler
49 implements RunnerScheduler
50 {
51 private final Balancer balancer;
52
53 private final SchedulingStrategy strategy;
54
55 private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56
57 private final Description description;
58
59 private final ConsoleStream logger;
60
61 private volatile boolean shutdown = false;
62
63 private volatile boolean started = false;
64
65 private volatile boolean finished = false;
66
67 private volatile Controller masterController;
68
69
70
71
72
73
74
75
76
77
78
79 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy )
80 {
81 this( logger, description, strategy, -1 );
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, int concurrency )
98 {
99 this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, Balancer balancer )
118 {
119 strategy.setDefaultShutdownHandler( newShutdownHandler() );
120 this.logger = logger;
121 this.description = description;
122 this.strategy = strategy;
123 this.balancer = balancer;
124 masterController = null;
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
139 SchedulingStrategy strategy, Balancer balancer )
140 {
141 this( logger, description, strategy, balancer );
142 strategy.setDefaultShutdownHandler( newShutdownHandler() );
143 masterScheduler.register( this );
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157
158 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
159 SchedulingStrategy strategy, int concurrency )
160 {
161 this( logger, description, strategy, concurrency );
162 strategy.setDefaultShutdownHandler( newShutdownHandler() );
163 masterScheduler.register( this );
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
178 SchedulingStrategy strategy )
179 {
180 this( logger, description, masterScheduler, strategy, 0 );
181 }
182
183 private void setController( Controller masterController )
184 {
185 if ( masterController == null )
186 {
187 throw new NullPointerException( "null ExecutionController" );
188 }
189 this.masterController = masterController;
190 }
191
192
193
194
195
196 private boolean register( Scheduler slave )
197 {
198 boolean canRegister = slave != null && slave != this;
199 if ( canRegister )
200 {
201 Controller controller = new Controller( slave );
202 canRegister = !slaves.contains( controller );
203 if ( canRegister )
204 {
205 slaves.add( controller );
206 slave.setController( controller );
207 }
208 }
209 return canRegister;
210 }
211
212
213
214
215 private boolean canSchedule()
216 {
217 return !shutdown && ( masterController == null || masterController.canSchedule() );
218 }
219
220 protected void logQuietly( Throwable t )
221 {
222 ByteArrayOutputStream out = new ByteArrayOutputStream();
223 PrintStream stream = new PrintStream( out );
224 try
225 {
226 t.printStackTrace( stream );
227 }
228 finally
229 {
230 stream.close();
231 }
232 logger.println( out.toString() );
233 }
234
235 protected void logQuietly( String msg )
236 {
237 logger.println( msg );
238 }
239
240
241
242
243
244
245
246
247
248
249
250 protected ShutdownResult describeStopped( boolean stopNow )
251 {
252 Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
253 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
254 stop( executedTests, incompleteTests, false, stopNow );
255 return new ShutdownResult( executedTests, incompleteTests );
256 }
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274 private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
275 boolean tryCancelFutures, boolean stopNow )
276 {
277 shutdown = true;
278 try
279 {
280 if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
281 {
282 if ( executedTests != null )
283 {
284 executedTests.add( description );
285 }
286
287 if ( incompleteTests != null && !finished )
288 {
289 incompleteTests.add( description );
290 }
291 }
292
293 for ( Controller slave : slaves )
294 {
295 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
296 }
297 }
298 finally
299 {
300 try
301 {
302 balancer.releaseAllPermits();
303 }
304 finally
305 {
306 if ( stopNow )
307 {
308 strategy.stopNow();
309 }
310 else if ( tryCancelFutures )
311 {
312 strategy.stop();
313 }
314 else
315 {
316 strategy.disable();
317 }
318 }
319 }
320 }
321
322 protected boolean shutdownThreadPoolsAwaitingKilled()
323 {
324 if ( masterController == null )
325 {
326 stop( null, null, true, false );
327 boolean isNotInterrupted = true;
328 if ( strategy != null )
329 {
330 isNotInterrupted = strategy.destroy();
331 }
332 for ( Controller slave : slaves )
333 {
334 isNotInterrupted &= slave.destroy();
335 }
336 return isNotInterrupted;
337 }
338 else
339 {
340 throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
341 }
342 }
343
344 protected void beforeExecute()
345 {
346 }
347
348 protected void afterExecute()
349 {
350 }
351
352 @Override
353 public void schedule( Runnable childStatement )
354 {
355 if ( childStatement == null )
356 {
357 logQuietly( "cannot schedule null" );
358 }
359 else if ( canSchedule() && strategy.canSchedule() )
360 {
361 try
362 {
363 boolean isNotInterrupted = balancer.acquirePermit();
364 if ( isNotInterrupted && !shutdown )
365 {
366 Runnable task = wrapTask( childStatement );
367 strategy.schedule( task );
368 started = true;
369 }
370 }
371 catch ( RejectedExecutionException e )
372 {
373 stop( null, null, true, false );
374 }
375 catch ( Throwable t )
376 {
377 balancer.releasePermit();
378 logQuietly( t );
379 }
380 }
381 }
382
383 @Override
384 public void finished()
385 {
386 try
387 {
388 strategy.finished();
389 }
390 catch ( InterruptedException e )
391 {
392 logQuietly( e );
393 }
394 finally
395 {
396 finished = true;
397 }
398 }
399
400 private Runnable wrapTask( final Runnable task )
401 {
402 return new Runnable()
403 {
404 @Override
405 public void run()
406 {
407 try
408 {
409 beforeExecute();
410 task.run();
411 }
412 finally
413 {
414 try
415 {
416 afterExecute();
417 }
418 finally
419 {
420 balancer.releasePermit();
421 }
422 }
423 }
424 };
425 }
426
427 protected ShutdownHandler newShutdownHandler()
428 {
429 return new ShutdownHandler();
430 }
431
432
433
434
435 private final class Controller
436 {
437 private final Scheduler slave;
438
439 private Controller( Scheduler slave )
440 {
441 this.slave = slave;
442 }
443
444
445
446
447 boolean canSchedule()
448 {
449 return Scheduler.this.canSchedule();
450 }
451
452 void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
453 boolean tryCancelFutures, boolean shutdownNow )
454 {
455 slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
456 }
457
458
459
460
461 boolean destroy()
462 {
463 return slave.strategy.destroy();
464 }
465
466 @Override
467 public int hashCode()
468 {
469 return slave.hashCode();
470 }
471
472 @Override
473 public boolean equals( Object o )
474 {
475 return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
476 }
477 }
478
479
480
481
482
483
484
485
486
487 public class ShutdownHandler
488 implements RejectedExecutionHandler
489 {
490 private volatile RejectedExecutionHandler poolHandler;
491
492 protected ShutdownHandler()
493 {
494 poolHandler = null;
495 }
496
497 public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
498 {
499 this.poolHandler = poolHandler;
500 }
501
502 @Override
503 public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
504 {
505 if ( executor.isShutdown() )
506 {
507 Scheduler.this.stop( null, null, true, false );
508 }
509 final RejectedExecutionHandler poolHandler = this.poolHandler;
510 if ( poolHandler != null )
511 {
512 poolHandler.rejectedExecution( r, executor );
513 }
514 }
515 }
516 }