1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.report.ConsoleStream;
23 import org.junit.runner.Description;
24 import org.junit.runners.model.RunnerScheduler;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.PrintStream;
28 import java.util.Collection;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CopyOnWriteArraySet;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.RejectedExecutionHandler;
34 import java.util.concurrent.ThreadPoolExecutor;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class Scheduler
49 implements RunnerScheduler
50 {
51 private final Balancer balancer;
52
53 private final SchedulingStrategy strategy;
54
55 private final Set<Controller> slaves = new CopyOnWriteArraySet<>();
56
57 private final Description description;
58
59 private final ConsoleStream logger;
60
61 private volatile boolean shutdown = false;
62
63 private volatile boolean started = false;
64
65 private volatile boolean finished = false;
66
67 private volatile Controller masterController;
68
69
70
71
72
73
74
75
76
77
78
79 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy )
80 {
81 this( logger, description, strategy, -1 );
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, int concurrency )
98 {
99 this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, Balancer balancer )
118 {
119 strategy.setDefaultShutdownHandler( newShutdownHandler() );
120 this.logger = logger;
121 this.description = description;
122 this.strategy = strategy;
123 this.balancer = balancer;
124 masterController = null;
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
139 SchedulingStrategy strategy, Balancer balancer )
140 {
141 this( logger, description, strategy, balancer );
142 strategy.setDefaultShutdownHandler( newShutdownHandler() );
143 masterScheduler.register( this );
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157
158 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
159 SchedulingStrategy strategy, int concurrency )
160 {
161 this( logger, description, strategy, concurrency );
162 strategy.setDefaultShutdownHandler( newShutdownHandler() );
163 masterScheduler.register( this );
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177 public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
178 SchedulingStrategy strategy )
179 {
180 this( logger, description, masterScheduler, strategy, 0 );
181 }
182
183 private void setController( Controller masterController )
184 {
185 if ( masterController == null )
186 {
187 throw new NullPointerException( "null ExecutionController" );
188 }
189 this.masterController = masterController;
190 }
191
192
193
194
195
196 private boolean register( Scheduler slave )
197 {
198 boolean canRegister = slave != null && slave != this;
199 if ( canRegister )
200 {
201 Controller controller = new Controller( slave );
202 canRegister = !slaves.contains( controller );
203 if ( canRegister )
204 {
205 slaves.add( controller );
206 slave.setController( controller );
207 }
208 }
209 return canRegister;
210 }
211
212
213
214
215 private boolean canSchedule()
216 {
217 return !shutdown && ( masterController == null || masterController.canSchedule() );
218 }
219
220 protected void logQuietly( Throwable t )
221 {
222 ByteArrayOutputStream out = new ByteArrayOutputStream();
223 try ( PrintStream stream = new PrintStream( out ) )
224 {
225 t.printStackTrace( stream );
226 }
227 logger.println( out.toString() );
228 }
229
230 protected void logQuietly( String msg )
231 {
232 logger.println( msg );
233 }
234
235
236
237
238
239
240
241
242
243
244
245 protected ShutdownResult describeStopped( boolean stopNow )
246 {
247 Collection<Description> executedTests = new ConcurrentLinkedQueue<>();
248 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<>();
249 stop( executedTests, incompleteTests, false, stopNow );
250 return new ShutdownResult( executedTests, incompleteTests );
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
270 boolean tryCancelFutures, boolean stopNow )
271 {
272 shutdown = true;
273 try
274 {
275 if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
276 {
277 if ( executedTests != null )
278 {
279 executedTests.add( description );
280 }
281
282 if ( incompleteTests != null && !finished )
283 {
284 incompleteTests.add( description );
285 }
286 }
287
288 for ( Controller slave : slaves )
289 {
290 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
291 }
292 }
293 finally
294 {
295 try
296 {
297 balancer.releaseAllPermits();
298 }
299 finally
300 {
301 if ( stopNow )
302 {
303 strategy.stopNow();
304 }
305 else if ( tryCancelFutures )
306 {
307 strategy.stop();
308 }
309 else
310 {
311 strategy.disable();
312 }
313 }
314 }
315 }
316
317 protected boolean shutdownThreadPoolsAwaitingKilled()
318 {
319 if ( masterController == null )
320 {
321 stop( null, null, true, false );
322 boolean isNotInterrupted = true;
323 if ( strategy != null )
324 {
325 isNotInterrupted = strategy.destroy();
326 }
327 for ( Controller slave : slaves )
328 {
329 isNotInterrupted &= slave.destroy();
330 }
331 return isNotInterrupted;
332 }
333 else
334 {
335 throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
336 }
337 }
338
339 protected void beforeExecute()
340 {
341 }
342
343 protected void afterExecute()
344 {
345 }
346
347 @Override
348 public void schedule( Runnable childStatement )
349 {
350 if ( childStatement == null )
351 {
352 logQuietly( "cannot schedule null" );
353 }
354 else if ( canSchedule() && strategy.canSchedule() )
355 {
356 try
357 {
358 boolean isNotInterrupted = balancer.acquirePermit();
359 if ( isNotInterrupted && !shutdown )
360 {
361 Runnable task = wrapTask( childStatement );
362 strategy.schedule( task );
363 started = true;
364 }
365 }
366 catch ( RejectedExecutionException e )
367 {
368 stop( null, null, true, false );
369 }
370 catch ( Throwable t )
371 {
372 balancer.releasePermit();
373 logQuietly( t );
374 }
375 }
376 }
377
378 @Override
379 public void finished()
380 {
381 try
382 {
383 strategy.finished();
384 }
385 catch ( InterruptedException e )
386 {
387 logQuietly( e );
388 }
389 finally
390 {
391 finished = true;
392 }
393 }
394
395 private Runnable wrapTask( final Runnable task )
396 {
397 return new Runnable()
398 {
399 @Override
400 public void run()
401 {
402 try
403 {
404 beforeExecute();
405 task.run();
406 }
407 finally
408 {
409 try
410 {
411 afterExecute();
412 }
413 finally
414 {
415 balancer.releasePermit();
416 }
417 }
418 }
419 };
420 }
421
422 protected ShutdownHandler newShutdownHandler()
423 {
424 return new ShutdownHandler();
425 }
426
427
428
429
430 private final class Controller
431 {
432 private final Scheduler slave;
433
434 private Controller( Scheduler slave )
435 {
436 this.slave = slave;
437 }
438
439
440
441
442 boolean canSchedule()
443 {
444 return Scheduler.this.canSchedule();
445 }
446
447 void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
448 boolean tryCancelFutures, boolean shutdownNow )
449 {
450 slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
451 }
452
453
454
455
456 boolean destroy()
457 {
458 return slave.strategy.destroy();
459 }
460
461 @Override
462 public int hashCode()
463 {
464 return slave.hashCode();
465 }
466
467 @Override
468 public boolean equals( Object o )
469 {
470 return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
471 }
472 }
473
474
475
476
477
478
479
480
481
482 public class ShutdownHandler
483 implements RejectedExecutionHandler
484 {
485 private volatile RejectedExecutionHandler poolHandler;
486
487 protected ShutdownHandler()
488 {
489 poolHandler = null;
490 }
491
492 public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
493 {
494 this.poolHandler = poolHandler;
495 }
496
497 @Override
498 public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
499 {
500 if ( executor.isShutdown() )
501 {
502 Scheduler.this.stop( null, null, true, false );
503 }
504 final RejectedExecutionHandler poolHandler = this.poolHandler;
505 if ( poolHandler != null )
506 {
507 poolHandler.rejectedExecution( r, executor );
508 }
509 }
510 }
511 }