1 package org.apache.maven.plugin.surefire.booterclient;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.logging.Log;
23 import org.apache.maven.plugin.surefire.CommonReflector;
24 import org.apache.maven.plugin.surefire.StartupReportConfiguration;
25 import org.apache.maven.plugin.surefire.SurefireProperties;
26 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.AbstractForkInputStream;
27 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.NotifiableTestStream;
28 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.OutputStreamFlushableCommandline;
29 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream;
30 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStream;
31 import org.apache.maven.plugin.surefire.booterclient.output.ForkClient;
32 import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
33 import org.apache.maven.plugin.surefire.report.DefaultReporterFactory;
34 import org.apache.maven.shared.utils.cli.CommandLineCallable;
35 import org.apache.maven.shared.utils.cli.CommandLineException;
36 import org.apache.maven.surefire.booter.Classpath;
37 import org.apache.maven.surefire.booter.ClasspathConfiguration;
38 import org.apache.maven.surefire.booter.KeyValueSource;
39 import org.apache.maven.surefire.booter.PropertiesWrapper;
40 import org.apache.maven.surefire.booter.ProviderConfiguration;
41 import org.apache.maven.surefire.booter.ProviderFactory;
42 import org.apache.maven.surefire.booter.Shutdown;
43 import org.apache.maven.surefire.booter.StartupConfiguration;
44 import org.apache.maven.surefire.booter.SurefireBooterForkException;
45 import org.apache.maven.surefire.booter.SurefireExecutionException;
46 import org.apache.maven.surefire.providerapi.SurefireProvider;
47 import org.apache.maven.surefire.report.StackTraceWriter;
48 import org.apache.maven.surefire.suite.RunResult;
49 import org.apache.maven.surefire.testset.TestRequest;
50 import org.apache.maven.surefire.util.DefaultScanResult;
51
52 import java.io.File;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.nio.charset.Charset;
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.Map;
59 import java.util.Properties;
60 import java.util.Queue;
61 import java.util.concurrent.ArrayBlockingQueue;
62 import java.util.concurrent.Callable;
63 import java.util.concurrent.ConcurrentLinkedQueue;
64 import java.util.concurrent.ExecutionException;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.concurrent.Future;
68 import java.util.concurrent.ScheduledExecutorService;
69 import java.util.concurrent.ScheduledFuture;
70 import java.util.concurrent.ThreadFactory;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.LinkedBlockingQueue;
73 import java.util.concurrent.ThreadPoolExecutor;
74 import java.util.concurrent.atomic.AtomicInteger;
75 import java.util.concurrent.atomic.AtomicReference;
76
77 import static java.util.concurrent.TimeUnit.MILLISECONDS;
78 import static java.util.concurrent.TimeUnit.SECONDS;
79 import static org.apache.maven.shared.utils.cli.CommandLineUtils.executeCommandLineAsCallable;
80 import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.addShutDownHook;
81 import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.removeShutdownHook;
82 import static org.apache.maven.surefire.util.internal.StringUtils.FORK_STREAM_CHARSET_NAME;
83 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
84 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
85 import static org.apache.maven.plugin.surefire.AbstractSurefireMojo.createCopyAndReplaceForkNumPlaceholder;
86 import static org.apache.maven.plugin.surefire.booterclient.lazytestprovider.
87 TestLessInputStream.TestLessInputStreamBuilder;
88 import static org.apache.maven.surefire.util.internal.ConcurrencyUtils.countDownToZero;
89 import static org.apache.maven.surefire.booter.Classpath.join;
90 import static org.apache.maven.surefire.booter.SystemPropertyManager.writePropertiesFile;
91 import static org.apache.maven.surefire.suite.RunResult.timeout;
92 import static org.apache.maven.surefire.suite.RunResult.failure;
93 import static org.apache.maven.surefire.suite.RunResult.SUCCESS;
94 import static java.lang.StrictMath.min;
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public class ForkStarter
111 {
112 private static final long PING_IN_SECONDS = 10;
113
114 private static final int TIMEOUT_CHECK_PERIOD_MILLIS = 100;
115
116 private static final ThreadFactory FORKED_JVM_DAEMON_THREAD_FACTORY
117 = newDaemonThreadFactory( "surefire-fork-starter" );
118
119 private static final ThreadFactory SHUTDOWN_HOOK_THREAD_FACTORY
120 = newDaemonThreadFactory( "surefire-jvm-killer-shutdownhook" );
121
122 private static final AtomicInteger SYSTEM_PROPERTIES_FILE_COUNTER = new AtomicInteger();
123
124 private final ScheduledExecutorService pingThreadScheduler = createPingScheduler();
125
126 private final ScheduledExecutorService timeoutCheckScheduler;
127
128 private final Queue<ForkClient> currentForkClients;
129
130 private final int forkedProcessTimeoutInSeconds;
131
132 private final ProviderConfiguration providerConfiguration;
133
134 private final StartupConfiguration startupConfiguration;
135
136 private final ForkConfiguration forkConfiguration;
137
138 private final StartupReportConfiguration startupReportConfiguration;
139
140 private final Log log;
141
142 private final DefaultReporterFactory defaultReporterFactory;
143
144 private final Collection<DefaultReporterFactory> defaultReporterFactories;
145
146
147
148
149 private static class InputStreamCloser
150 implements Runnable
151 {
152 private final AtomicReference<InputStream> testProvidingInputStream;
153
154 public InputStreamCloser( InputStream testProvidingInputStream )
155 {
156 this.testProvidingInputStream = new AtomicReference<InputStream>( testProvidingInputStream );
157 }
158
159 public void run()
160 {
161 InputStream stream = testProvidingInputStream.getAndSet( null );
162 if ( stream != null )
163 {
164 try
165 {
166 stream.close();
167 }
168 catch ( IOException e )
169 {
170
171 }
172 }
173 }
174 }
175
176 public ForkStarter( ProviderConfiguration providerConfiguration, StartupConfiguration startupConfiguration,
177 ForkConfiguration forkConfiguration, int forkedProcessTimeoutInSeconds,
178 StartupReportConfiguration startupReportConfiguration, Log log )
179 {
180 this.forkConfiguration = forkConfiguration;
181 this.providerConfiguration = providerConfiguration;
182 this.forkedProcessTimeoutInSeconds = forkedProcessTimeoutInSeconds;
183 this.startupConfiguration = startupConfiguration;
184 this.startupReportConfiguration = startupReportConfiguration;
185 this.log = log;
186 defaultReporterFactory = new DefaultReporterFactory( startupReportConfiguration );
187 defaultReporterFactory.runStarting();
188 defaultReporterFactories = new ConcurrentLinkedQueue<DefaultReporterFactory>();
189 currentForkClients = new ConcurrentLinkedQueue<ForkClient>();
190 timeoutCheckScheduler = createTimeoutCheckScheduler();
191 triggerTimeoutCheck();
192 }
193
194 public RunResult run( SurefireProperties effectiveSystemProperties, DefaultScanResult scanResult )
195 throws SurefireBooterForkException, SurefireExecutionException
196 {
197 try
198 {
199 Map<String, String> providerProperties = providerConfiguration.getProviderProperties();
200 scanResult.writeTo( providerProperties );
201 return isForkOnce()
202 ? run( effectiveSystemProperties, providerProperties )
203 : run( effectiveSystemProperties );
204 }
205 finally
206 {
207 defaultReporterFactory.mergeFromOtherFactories( defaultReporterFactories );
208 defaultReporterFactory.close();
209 pingThreadScheduler.shutdownNow();
210 timeoutCheckScheduler.shutdownNow();
211 }
212 }
213
214 private RunResult run( SurefireProperties effectiveSystemProperties, Map<String, String> providerProperties )
215 throws SurefireBooterForkException
216 {
217 DefaultReporterFactory forkedReporterFactory = new DefaultReporterFactory( startupReportConfiguration );
218 defaultReporterFactories.add( forkedReporterFactory );
219 TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
220 PropertiesWrapper props = new PropertiesWrapper( providerProperties );
221 TestLessInputStream stream = builder.build();
222 ForkClient forkClient =
223 new ForkClient( forkedReporterFactory, startupReportConfiguration.getTestVmSystemProperties(), stream );
224 Thread shutdown = createImmediateShutdownHookThread( builder, providerConfiguration.getShutdown() );
225 ScheduledFuture<?> ping = triggerPingTimerForShutdown( builder );
226 try
227 {
228 addShutDownHook( shutdown );
229 return fork( null, props, forkClient, effectiveSystemProperties, stream, false );
230 }
231 finally
232 {
233 removeShutdownHook( shutdown );
234 ping.cancel( true );
235 builder.removeStream( stream );
236 }
237 }
238
239 private RunResult run( SurefireProperties effectiveSystemProperties )
240 throws SurefireBooterForkException
241 {
242 return forkConfiguration.isReuseForks()
243 ? runSuitesForkOnceMultiple( effectiveSystemProperties, forkConfiguration.getForkCount() )
244 : runSuitesForkPerTestSet( effectiveSystemProperties, forkConfiguration.getForkCount() );
245 }
246
247 private boolean isForkOnce()
248 {
249 return forkConfiguration.isReuseForks() && ( forkConfiguration.getForkCount() == 1 || hasSuiteXmlFiles() );
250 }
251
252 private boolean hasSuiteXmlFiles()
253 {
254 TestRequest testSuiteDefinition = providerConfiguration.getTestSuiteDefinition();
255 return testSuiteDefinition != null && !testSuiteDefinition.getSuiteXmlFiles().isEmpty();
256 }
257
258 @SuppressWarnings( "checkstyle:magicnumber" )
259 private RunResult runSuitesForkOnceMultiple( final SurefireProperties effectiveSystemProperties, int forkCount )
260 throws SurefireBooterForkException
261 {
262 ThreadPoolExecutor executorService = new ThreadPoolExecutor( forkCount, forkCount, 60, TimeUnit.SECONDS,
263 new ArrayBlockingQueue<Runnable>( forkCount ) );
264 executorService.setThreadFactory( FORKED_JVM_DAEMON_THREAD_FACTORY );
265
266 final Queue<String> tests = new ConcurrentLinkedQueue<String>();
267
268 for ( Class<?> clazz : getSuitesIterator() )
269 {
270 tests.add( clazz.getName() );
271 }
272
273 final Queue<TestProvidingInputStream> testStreams = new ConcurrentLinkedQueue<TestProvidingInputStream>();
274
275 for ( int forkNum = 0, total = min( forkCount, tests.size() ); forkNum < total; forkNum++ )
276 {
277 testStreams.add( new TestProvidingInputStream( tests ) );
278 }
279
280 ScheduledFuture<?> ping = triggerPingTimerForShutdown( testStreams );
281 Thread shutdown = createShutdownHookThread( testStreams, providerConfiguration.getShutdown() );
282
283 try
284 {
285 addShutDownHook( shutdown );
286 int failFastCount = providerConfiguration.getSkipAfterFailureCount();
287 final AtomicInteger notifyStreamsToSkipTestsJustNow = new AtomicInteger( failFastCount );
288 Collection<Future<RunResult>> results = new ArrayList<Future<RunResult>>( forkCount );
289 for ( final TestProvidingInputStream testProvidingInputStream : testStreams )
290 {
291 Callable<RunResult> pf = new Callable<RunResult>()
292 {
293 public RunResult call()
294 throws Exception
295 {
296 DefaultReporterFactory reporter = new DefaultReporterFactory( startupReportConfiguration );
297 defaultReporterFactories.add( reporter );
298
299 Properties vmProps = startupReportConfiguration.getTestVmSystemProperties();
300
301 ForkClient forkClient = new ForkClient( reporter, vmProps, testProvidingInputStream )
302 {
303 @Override
304 protected void stopOnNextTest()
305 {
306 if ( countDownToZero( notifyStreamsToSkipTestsJustNow ) )
307 {
308 notifyStreamsToSkipTests( testStreams );
309 }
310 }
311 };
312
313 return fork( null, new PropertiesWrapper( providerConfiguration.getProviderProperties() ),
314 forkClient, effectiveSystemProperties, testProvidingInputStream, true );
315 }
316 };
317 results.add( executorService.submit( pf ) );
318 }
319 return awaitResultsDone( results, executorService );
320 }
321 finally
322 {
323 removeShutdownHook( shutdown );
324 ping.cancel( true );
325 closeExecutor( executorService );
326 }
327 }
328
329 private static void notifyStreamsToSkipTests( Collection<? extends NotifiableTestStream> notifiableTestStreams )
330 {
331 for ( NotifiableTestStream notifiableTestStream : notifiableTestStreams )
332 {
333 notifiableTestStream.skipSinceNextTest();
334 }
335 }
336
337 @SuppressWarnings( "checkstyle:magicnumber" )
338 private RunResult runSuitesForkPerTestSet( final SurefireProperties effectiveSystemProperties, int forkCount )
339 throws SurefireBooterForkException
340 {
341 ArrayList<Future<RunResult>> results = new ArrayList<Future<RunResult>>( 500 );
342 ThreadPoolExecutor executorService =
343 new ThreadPoolExecutor( forkCount, forkCount, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
344 executorService.setThreadFactory( FORKED_JVM_DAEMON_THREAD_FACTORY );
345 final TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
346 ScheduledFuture<?> ping = triggerPingTimerForShutdown( builder );
347 Thread shutdown = createCachableShutdownHookThread( builder, providerConfiguration.getShutdown() );
348 try
349 {
350 addShutDownHook( shutdown );
351 int failFastCount = providerConfiguration.getSkipAfterFailureCount();
352 final AtomicInteger notifyStreamsToSkipTestsJustNow = new AtomicInteger( failFastCount );
353 for ( final Object testSet : getSuitesIterator() )
354 {
355 Callable<RunResult> pf = new Callable<RunResult>()
356 {
357 public RunResult call()
358 throws Exception
359 {
360 DefaultReporterFactory forkedReporterFactory =
361 new DefaultReporterFactory( startupReportConfiguration );
362 defaultReporterFactories.add( forkedReporterFactory );
363 Properties vmProps = startupReportConfiguration.getTestVmSystemProperties();
364 ForkClient forkClient = new ForkClient( forkedReporterFactory, vmProps,
365 builder.getImmediateCommands() )
366 {
367 @Override
368 protected void stopOnNextTest()
369 {
370 if ( countDownToZero( notifyStreamsToSkipTestsJustNow ) )
371 {
372 builder.getCachableCommands().skipSinceNextTest();
373 }
374 }
375 };
376 TestLessInputStream stream = builder.build();
377 try
378 {
379 return fork( testSet,
380 new PropertiesWrapper( providerConfiguration.getProviderProperties() ),
381 forkClient, effectiveSystemProperties, stream, false );
382 }
383 finally
384 {
385 builder.removeStream( stream );
386 }
387 }
388 };
389 results.add( executorService.submit( pf ) );
390 }
391 return awaitResultsDone( results, executorService );
392 }
393 finally
394 {
395 removeShutdownHook( shutdown );
396 ping.cancel( true );
397 closeExecutor( executorService );
398 }
399 }
400
401 private static RunResult awaitResultsDone( Collection<Future<RunResult>> results, ExecutorService executorService )
402 throws SurefireBooterForkException
403 {
404 RunResult globalResult = new RunResult( 0, 0, 0, 0 );
405 for ( Future<RunResult> result : results )
406 {
407 try
408 {
409 RunResult cur = result.get();
410 if ( cur != null )
411 {
412 globalResult = globalResult.aggregate( cur );
413 }
414 else
415 {
416 throw new SurefireBooterForkException( "No results for " + result.toString() );
417 }
418 }
419 catch ( InterruptedException e )
420 {
421 executorService.shutdownNow();
422 Thread.currentThread().interrupt();
423 throw new SurefireBooterForkException( "Interrupted", e );
424 }
425 catch ( ExecutionException e )
426 {
427 Throwable realException = e.getCause();
428 String error = realException == null ? "" : realException.getLocalizedMessage();
429 throw new SurefireBooterForkException( "ExecutionException " + error, realException );
430 }
431 }
432 return globalResult;
433 }
434
435 @SuppressWarnings( "checkstyle:magicnumber" )
436 private void closeExecutor( ExecutorService executorService )
437 throws SurefireBooterForkException
438 {
439 executorService.shutdown();
440 try
441 {
442
443 executorService.awaitTermination( 60 * 60, TimeUnit.SECONDS );
444 }
445 catch ( InterruptedException e )
446 {
447 Thread.currentThread().interrupt();
448 throw new SurefireBooterForkException( "Interrupted", e );
449 }
450 }
451
452 private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkClient forkClient,
453 SurefireProperties effectiveSystemProperties,
454 AbstractForkInputStream testProvidingInputStream, boolean readTestsFromInStream )
455 throws SurefireBooterForkException
456 {
457 int forkNumber = ForkNumberBucket.drawNumber();
458 try
459 {
460 return fork( testSet, providerProperties, forkClient, effectiveSystemProperties, forkNumber,
461 testProvidingInputStream, readTestsFromInStream );
462 }
463 finally
464 {
465 ForkNumberBucket.returnNumber( forkNumber );
466 }
467 }
468
469 private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkClient forkClient,
470 SurefireProperties effectiveSystemProperties, int forkNumber,
471 AbstractForkInputStream testProvidingInputStream, boolean readTestsFromInStream )
472 throws SurefireBooterForkException
473 {
474 final File surefireProperties;
475 final File systPropsFile;
476 try
477 {
478 BooterSerializer booterSerializer = new BooterSerializer( forkConfiguration );
479
480 surefireProperties = booterSerializer.serialize( providerProperties, providerConfiguration,
481 startupConfiguration, testSet, readTestsFromInStream );
482
483 if ( effectiveSystemProperties != null )
484 {
485 SurefireProperties filteredProperties =
486 createCopyAndReplaceForkNumPlaceholder( effectiveSystemProperties, forkNumber );
487
488 systPropsFile = writePropertiesFile( filteredProperties, forkConfiguration.getTempDirectory(),
489 "surefire_" + SYSTEM_PROPERTIES_FILE_COUNTER.getAndIncrement(),
490 forkConfiguration.isDebug() );
491 }
492 else
493 {
494 systPropsFile = null;
495 }
496 }
497 catch ( IOException e )
498 {
499 throw new SurefireBooterForkException( "Error creating properties files for forking", e );
500 }
501
502
503 final Classpath bootClasspathConfiguration = startupConfiguration.isProviderMainClass()
504 ? startupConfiguration.getClasspathConfiguration().getProviderClasspath()
505 : forkConfiguration.getBootClasspath();
506
507 Classpath bootClasspath = join(
508 join( bootClasspathConfiguration, startupConfiguration.getClasspathConfiguration().getTestClasspath() ),
509 startupConfiguration.getClasspathConfiguration().getProviderClasspath() );
510
511 if ( log.isDebugEnabled() )
512 {
513 log.debug( bootClasspath.getLogMessage( "boot" ) );
514 log.debug( bootClasspath.getCompactLogMessage( "boot(compact)" ) );
515 }
516
517 OutputStreamFlushableCommandline cli =
518 forkConfiguration.createCommandLine( bootClasspath.getClassPath(), startupConfiguration, forkNumber );
519
520 InputStreamCloser inputStreamCloser = new InputStreamCloser( testProvidingInputStream );
521 Thread inputStreamCloserHook = newDaemonThread( inputStreamCloser, "input-stream-closer" );
522 testProvidingInputStream.setFlushReceiverProvider( cli );
523 addShutDownHook( inputStreamCloserHook );
524
525 cli.createArg().setFile( surefireProperties );
526
527 if ( systPropsFile != null )
528 {
529 cli.createArg().setFile( systPropsFile );
530 }
531
532 ThreadedStreamConsumer threadedStreamConsumer = new ThreadedStreamConsumer( forkClient );
533
534 if ( forkConfiguration.isDebug() )
535 {
536 System.out.println( "Forking command line: " + cli );
537 }
538
539 RunResult runResult = null;
540
541 try
542 {
543 CommandLineCallable future =
544 executeCommandLineAsCallable( cli, testProvidingInputStream, threadedStreamConsumer,
545 threadedStreamConsumer, 0, inputStreamCloser,
546 Charset.forName( FORK_STREAM_CHARSET_NAME ) );
547
548 currentForkClients.add( forkClient );
549
550 int result = future.call();
551
552 if ( forkClient.hadTimeout() )
553 {
554 runResult = timeout( forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult() );
555 }
556 else if ( result != SUCCESS )
557 {
558 throw new SurefireBooterForkException( "Error occurred in starting fork, check output in log" );
559 }
560 }
561 catch ( CommandLineException e )
562 {
563 runResult = failure( forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult(), e );
564 throw new SurefireBooterForkException( "Error while executing forked tests.", e.getCause() );
565 }
566 finally
567 {
568 currentForkClients.remove( forkClient );
569 threadedStreamConsumer.close();
570 inputStreamCloser.run();
571 removeShutdownHook( inputStreamCloserHook );
572
573 if ( runResult == null )
574 {
575 runResult = forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult();
576 }
577
578 if ( !runResult.isTimeout() )
579 {
580 StackTraceWriter errorInFork = forkClient.getErrorInFork();
581 if ( errorInFork != null )
582 {
583
584 throw new RuntimeException(
585 "There was an error in the forked process\n" + errorInFork.writeTraceToString() );
586 }
587 if ( !forkClient.isSaidGoodBye() )
588 {
589
590 throw new RuntimeException(
591 "The forked VM terminated without properly saying goodbye. VM crash or System.exit called?"
592 + "\nCommand was " + cli.toString() );
593 }
594 }
595 forkClient.close( runResult.isTimeout() );
596 }
597
598 return runResult;
599 }
600
601 private Iterable<Class<?>> getSuitesIterator()
602 throws SurefireBooterForkException
603 {
604 try
605 {
606 final ClasspathConfiguration classpathConfiguration = startupConfiguration.getClasspathConfiguration();
607 ClassLoader unifiedClassLoader = classpathConfiguration.createMergedClassLoader();
608
609 CommonReflector commonReflector = new CommonReflector( unifiedClassLoader );
610 Object reporterFactory = commonReflector.createReportingReporterFactory( startupReportConfiguration );
611
612 ProviderFactory providerFactory =
613 new ProviderFactory( startupConfiguration, providerConfiguration, unifiedClassLoader, reporterFactory );
614 SurefireProvider surefireProvider = providerFactory.createProvider( false );
615 return surefireProvider.getSuites();
616 }
617 catch ( SurefireExecutionException e )
618 {
619 throw new SurefireBooterForkException( "Unable to create classloader to find test suites", e );
620 }
621 }
622
623 private static Thread createImmediateShutdownHookThread( final TestLessInputStreamBuilder builder,
624 final Shutdown shutdownType )
625 {
626 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
627 {
628 public void run()
629 {
630 builder.getImmediateCommands().shutdown( shutdownType );
631 }
632 } );
633 }
634
635 private static Thread createCachableShutdownHookThread( final TestLessInputStreamBuilder builder,
636 final Shutdown shutdownType )
637 {
638 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
639 {
640 public void run()
641 {
642 builder.getCachableCommands().shutdown( shutdownType );
643 }
644 } );
645 }
646
647 private static Thread createShutdownHookThread( final Iterable<TestProvidingInputStream> streams,
648 final Shutdown shutdownType )
649 {
650 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
651 {
652 public void run()
653 {
654 for ( TestProvidingInputStream stream : streams )
655 {
656 stream.shutdown( shutdownType );
657 }
658 }
659 } );
660 }
661
662 private static ScheduledExecutorService createPingScheduler()
663 {
664 ThreadFactory threadFactory = newDaemonThreadFactory( "ping-timer-" + PING_IN_SECONDS + "sec" );
665 return Executors.newScheduledThreadPool( 1, threadFactory );
666 }
667
668 private static ScheduledExecutorService createTimeoutCheckScheduler()
669 {
670 ThreadFactory threadFactory = newDaemonThreadFactory( "timeout-check-timer" );
671 return Executors.newScheduledThreadPool( 1, threadFactory );
672 }
673
674 private ScheduledFuture<?> triggerPingTimerForShutdown( final TestLessInputStreamBuilder builder )
675 {
676 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
677 {
678 public void run()
679 {
680 builder.getImmediateCommands().noop();
681 }
682 }, 0, PING_IN_SECONDS, SECONDS );
683 }
684
685 private ScheduledFuture<?> triggerPingTimerForShutdown( final Iterable<TestProvidingInputStream> streams )
686 {
687 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
688 {
689 public void run()
690 {
691 for ( TestProvidingInputStream stream : streams )
692 {
693 stream.noop();
694 }
695 }
696 }, 0, PING_IN_SECONDS, SECONDS );
697 }
698
699 private ScheduledFuture<?> triggerTimeoutCheck()
700 {
701 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
702 {
703 public void run()
704 {
705 long systemTime = System.currentTimeMillis();
706 for ( ForkClient forkClient : currentForkClients )
707 {
708 forkClient.tryToTimeout( systemTime, forkedProcessTimeoutInSeconds );
709 }
710 }
711 }, 0, TIMEOUT_CHECK_PERIOD_MILLIS, MILLISECONDS );
712 }
713 }