1 package org.apache.maven.plugin.surefire.booterclient;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.logging.Log;
23 import org.apache.maven.plugin.surefire.CommonReflector;
24 import org.apache.maven.plugin.surefire.StartupReportConfiguration;
25 import org.apache.maven.plugin.surefire.SurefireProperties;
26 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.AbstractForkInputStream;
27 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.NotifiableTestStream;
28 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.OutputStreamFlushableCommandline;
29 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream;
30 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStream;
31 import org.apache.maven.plugin.surefire.booterclient.output.ForkClient;
32 import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
33 import org.apache.maven.plugin.surefire.report.DefaultReporterFactory;
34 import org.apache.maven.shared.utils.cli.CommandLineCallable;
35 import org.apache.maven.shared.utils.cli.CommandLineException;
36 import org.apache.maven.surefire.booter.Classpath;
37 import org.apache.maven.surefire.booter.ClasspathConfiguration;
38 import org.apache.maven.surefire.booter.KeyValueSource;
39 import org.apache.maven.surefire.booter.PropertiesWrapper;
40 import org.apache.maven.surefire.booter.ProviderConfiguration;
41 import org.apache.maven.surefire.booter.ProviderFactory;
42 import org.apache.maven.surefire.booter.Shutdown;
43 import org.apache.maven.surefire.booter.StartupConfiguration;
44 import org.apache.maven.surefire.booter.SurefireBooterForkException;
45 import org.apache.maven.surefire.booter.SurefireExecutionException;
46 import org.apache.maven.surefire.providerapi.SurefireProvider;
47 import org.apache.maven.surefire.report.StackTraceWriter;
48 import org.apache.maven.surefire.suite.RunResult;
49 import org.apache.maven.surefire.testset.TestRequest;
50 import org.apache.maven.surefire.util.DefaultScanResult;
51
52 import java.io.File;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.nio.charset.Charset;
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.Map;
59 import java.util.Properties;
60 import java.util.Queue;
61 import java.util.concurrent.ArrayBlockingQueue;
62 import java.util.concurrent.Callable;
63 import java.util.concurrent.ConcurrentLinkedQueue;
64 import java.util.concurrent.ExecutionException;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.concurrent.Future;
68 import java.util.concurrent.ScheduledExecutorService;
69 import java.util.concurrent.ScheduledFuture;
70 import java.util.concurrent.ThreadFactory;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.LinkedBlockingQueue;
73 import java.util.concurrent.ThreadPoolExecutor;
74 import java.util.concurrent.atomic.AtomicInteger;
75 import java.util.concurrent.atomic.AtomicReference;
76
77 import static java.util.concurrent.TimeUnit.MILLISECONDS;
78 import static java.util.concurrent.TimeUnit.SECONDS;
79 import static org.apache.maven.shared.utils.cli.CommandLineUtils.executeCommandLineAsCallable;
80 import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.addShutDownHook;
81 import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.removeShutdownHook;
82 import static org.apache.maven.surefire.util.internal.StringUtils.FORK_STREAM_CHARSET_NAME;
83 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
84 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
85 import static org.apache.maven.plugin.surefire.AbstractSurefireMojo.createCopyAndReplaceForkNumPlaceholder;
86 import static org.apache.maven.plugin.surefire.booterclient.lazytestprovider.
87 TestLessInputStream.TestLessInputStreamBuilder;
88 import static org.apache.maven.surefire.util.internal.ConcurrencyUtils.countDownToZero;
89 import static org.apache.maven.surefire.booter.Classpath.join;
90 import static org.apache.maven.surefire.booter.SystemPropertyManager.writePropertiesFile;
91 import static org.apache.maven.surefire.suite.RunResult.timeout;
92 import static org.apache.maven.surefire.suite.RunResult.failure;
93 import static org.apache.maven.surefire.suite.RunResult.SUCCESS;
94 import static java.lang.StrictMath.min;
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public class ForkStarter
111 {
112 private static final long PING_IN_SECONDS = 10;
113
114 private static final int TIMEOUT_CHECK_PERIOD_MILLIS = 100;
115
116 private static final ThreadFactory FORKED_JVM_DAEMON_THREAD_FACTORY
117 = newDaemonThreadFactory( "surefire-fork-starter" );
118
119 private static final ThreadFactory SHUTDOWN_HOOK_THREAD_FACTORY
120 = newDaemonThreadFactory( "surefire-jvm-killer-shutdownhook" );
121
122 private static final AtomicInteger SYSTEM_PROPERTIES_FILE_COUNTER = new AtomicInteger();
123
124 private final ScheduledExecutorService pingThreadScheduler = createPingScheduler();
125
126 private final ScheduledExecutorService timeoutCheckScheduler;
127
128 private final Queue<ForkClient> currentForkClients;
129
130 private final int forkedProcessTimeoutInSeconds;
131
132 private final ProviderConfiguration providerConfiguration;
133
134 private final StartupConfiguration startupConfiguration;
135
136 private final ForkConfiguration forkConfiguration;
137
138 private final StartupReportConfiguration startupReportConfiguration;
139
140 private final Log log;
141
142 private final DefaultReporterFactory defaultReporterFactory;
143
144 private final Collection<DefaultReporterFactory> defaultReporterFactories;
145
146
147
148
149 private static class InputStreamCloser
150 implements Runnable
151 {
152 private final AtomicReference<InputStream> testProvidingInputStream;
153
154 public InputStreamCloser( InputStream testProvidingInputStream )
155 {
156 this.testProvidingInputStream = new AtomicReference<InputStream>( testProvidingInputStream );
157 }
158
159 public void run()
160 {
161 InputStream stream = testProvidingInputStream.getAndSet( null );
162 if ( stream != null )
163 {
164 try
165 {
166 stream.close();
167 }
168 catch ( IOException e )
169 {
170
171 }
172 }
173 }
174 }
175
176 public ForkStarter( ProviderConfiguration providerConfiguration, StartupConfiguration startupConfiguration,
177 ForkConfiguration forkConfiguration, int forkedProcessTimeoutInSeconds,
178 StartupReportConfiguration startupReportConfiguration, Log log )
179 {
180 this.forkConfiguration = forkConfiguration;
181 this.providerConfiguration = providerConfiguration;
182 this.forkedProcessTimeoutInSeconds = forkedProcessTimeoutInSeconds;
183 this.startupConfiguration = startupConfiguration;
184 this.startupReportConfiguration = startupReportConfiguration;
185 this.log = log;
186 defaultReporterFactory = new DefaultReporterFactory( startupReportConfiguration );
187 defaultReporterFactory.runStarting();
188 defaultReporterFactories = new ConcurrentLinkedQueue<DefaultReporterFactory>();
189 currentForkClients = new ConcurrentLinkedQueue<ForkClient>();
190 timeoutCheckScheduler = createTimeoutCheckScheduler();
191 triggerTimeoutCheck();
192 }
193
194 public RunResult run( SurefireProperties effectiveSystemProperties, DefaultScanResult scanResult )
195 throws SurefireBooterForkException, SurefireExecutionException
196 {
197 try
198 {
199 Map<String, String> providerProperties = providerConfiguration.getProviderProperties();
200 scanResult.writeTo( providerProperties );
201 return isForkOnce()
202 ? run( effectiveSystemProperties, providerProperties )
203 : run( effectiveSystemProperties );
204 }
205 finally
206 {
207 defaultReporterFactory.mergeFromOtherFactories( defaultReporterFactories );
208 defaultReporterFactory.close();
209 pingThreadScheduler.shutdownNow();
210 timeoutCheckScheduler.shutdownNow();
211 }
212 }
213
214 private RunResult run( SurefireProperties effectiveSystemProperties, Map<String, String> providerProperties )
215 throws SurefireBooterForkException
216 {
217 DefaultReporterFactory forkedReporterFactory = new DefaultReporterFactory( startupReportConfiguration );
218 defaultReporterFactories.add( forkedReporterFactory );
219 TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
220 PropertiesWrapper props = new PropertiesWrapper( providerProperties );
221 TestLessInputStream stream = builder.build();
222 ForkClient forkClient =
223 new ForkClient( forkedReporterFactory, startupReportConfiguration.getTestVmSystemProperties(), stream );
224 Thread shutdown = createImmediateShutdownHookThread( builder, providerConfiguration.getShutdown() );
225 ScheduledFuture<?> ping = triggerPingTimerForShutdown( builder );
226 try
227 {
228 addShutDownHook( shutdown );
229 return fork( null, props, forkClient, effectiveSystemProperties, stream, false );
230 }
231 finally
232 {
233 removeShutdownHook( shutdown );
234 ping.cancel( true );
235 builder.removeStream( stream );
236 }
237 }
238
239 private RunResult run( SurefireProperties effectiveSystemProperties )
240 throws SurefireBooterForkException
241 {
242 return forkConfiguration.isReuseForks()
243 ? runSuitesForkOnceMultiple( effectiveSystemProperties, forkConfiguration.getForkCount() )
244 : runSuitesForkPerTestSet( effectiveSystemProperties, forkConfiguration.getForkCount() );
245 }
246
247 private boolean isForkOnce()
248 {
249 return forkConfiguration.isReuseForks() && ( forkConfiguration.getForkCount() == 1 || hasSuiteXmlFiles() );
250 }
251
252 private boolean hasSuiteXmlFiles()
253 {
254 TestRequest testSuiteDefinition = providerConfiguration.getTestSuiteDefinition();
255 return testSuiteDefinition != null && testSuiteDefinition.getSuiteXmlFiles() != null
256 && !testSuiteDefinition.getSuiteXmlFiles().isEmpty();
257 }
258
259 @SuppressWarnings( "checkstyle:magicnumber" )
260 private RunResult runSuitesForkOnceMultiple( final SurefireProperties effectiveSystemProperties, int forkCount )
261 throws SurefireBooterForkException
262 {
263 ThreadPoolExecutor executorService = new ThreadPoolExecutor( forkCount, forkCount, 60, TimeUnit.SECONDS,
264 new ArrayBlockingQueue<Runnable>( forkCount ) );
265 executorService.setThreadFactory( FORKED_JVM_DAEMON_THREAD_FACTORY );
266
267 final Queue<String> tests = new ConcurrentLinkedQueue<String>();
268
269 for ( Class<?> clazz : getSuitesIterator() )
270 {
271 tests.add( clazz.getName() );
272 }
273
274 final Queue<TestProvidingInputStream> testStreams = new ConcurrentLinkedQueue<TestProvidingInputStream>();
275
276 for ( int forkNum = 0, total = min( forkCount, tests.size() ); forkNum < total; forkNum++ )
277 {
278 testStreams.add( new TestProvidingInputStream( tests ) );
279 }
280
281 ScheduledFuture<?> ping = triggerPingTimerForShutdown( testStreams );
282 Thread shutdown = createShutdownHookThread( testStreams, providerConfiguration.getShutdown() );
283
284 try
285 {
286 addShutDownHook( shutdown );
287 int failFastCount = providerConfiguration.getSkipAfterFailureCount();
288 final AtomicInteger notifyStreamsToSkipTestsJustNow = new AtomicInteger( failFastCount );
289 Collection<Future<RunResult>> results = new ArrayList<Future<RunResult>>( forkCount );
290 for ( final TestProvidingInputStream testProvidingInputStream : testStreams )
291 {
292 Callable<RunResult> pf = new Callable<RunResult>()
293 {
294 public RunResult call()
295 throws Exception
296 {
297 DefaultReporterFactory reporter = new DefaultReporterFactory( startupReportConfiguration );
298 defaultReporterFactories.add( reporter );
299
300 Properties vmProps = startupReportConfiguration.getTestVmSystemProperties();
301
302 ForkClient forkClient = new ForkClient( reporter, vmProps, testProvidingInputStream )
303 {
304 @Override
305 protected void stopOnNextTest()
306 {
307 if ( countDownToZero( notifyStreamsToSkipTestsJustNow ) )
308 {
309 notifyStreamsToSkipTests( testStreams );
310 }
311 }
312 };
313
314 return fork( null, new PropertiesWrapper( providerConfiguration.getProviderProperties() ),
315 forkClient, effectiveSystemProperties, testProvidingInputStream, true );
316 }
317 };
318 results.add( executorService.submit( pf ) );
319 }
320 return awaitResultsDone( results, executorService );
321 }
322 finally
323 {
324 removeShutdownHook( shutdown );
325 ping.cancel( true );
326 closeExecutor( executorService );
327 }
328 }
329
330 private static void notifyStreamsToSkipTests( Collection<? extends NotifiableTestStream> notifiableTestStreams )
331 {
332 for ( NotifiableTestStream notifiableTestStream : notifiableTestStreams )
333 {
334 notifiableTestStream.skipSinceNextTest();
335 }
336 }
337
338 @SuppressWarnings( "checkstyle:magicnumber" )
339 private RunResult runSuitesForkPerTestSet( final SurefireProperties effectiveSystemProperties, int forkCount )
340 throws SurefireBooterForkException
341 {
342 ArrayList<Future<RunResult>> results = new ArrayList<Future<RunResult>>( 500 );
343 ThreadPoolExecutor executorService =
344 new ThreadPoolExecutor( forkCount, forkCount, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
345 executorService.setThreadFactory( FORKED_JVM_DAEMON_THREAD_FACTORY );
346 final TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
347 ScheduledFuture<?> ping = triggerPingTimerForShutdown( builder );
348 Thread shutdown = createCachableShutdownHookThread( builder, providerConfiguration.getShutdown() );
349 try
350 {
351 addShutDownHook( shutdown );
352 int failFastCount = providerConfiguration.getSkipAfterFailureCount();
353 final AtomicInteger notifyStreamsToSkipTestsJustNow = new AtomicInteger( failFastCount );
354 for ( final Object testSet : getSuitesIterator() )
355 {
356 Callable<RunResult> pf = new Callable<RunResult>()
357 {
358 public RunResult call()
359 throws Exception
360 {
361 DefaultReporterFactory forkedReporterFactory =
362 new DefaultReporterFactory( startupReportConfiguration );
363 defaultReporterFactories.add( forkedReporterFactory );
364 Properties vmProps = startupReportConfiguration.getTestVmSystemProperties();
365 ForkClient forkClient = new ForkClient( forkedReporterFactory, vmProps,
366 builder.getImmediateCommands() )
367 {
368 @Override
369 protected void stopOnNextTest()
370 {
371 if ( countDownToZero( notifyStreamsToSkipTestsJustNow ) )
372 {
373 builder.getCachableCommands().skipSinceNextTest();
374 }
375 }
376 };
377 TestLessInputStream stream = builder.build();
378 try
379 {
380 return fork( testSet,
381 new PropertiesWrapper( providerConfiguration.getProviderProperties() ),
382 forkClient, effectiveSystemProperties, stream, false );
383 }
384 finally
385 {
386 builder.removeStream( stream );
387 }
388 }
389 };
390 results.add( executorService.submit( pf ) );
391 }
392 return awaitResultsDone( results, executorService );
393 }
394 finally
395 {
396 removeShutdownHook( shutdown );
397 ping.cancel( true );
398 closeExecutor( executorService );
399 }
400 }
401
402 private static RunResult awaitResultsDone( Collection<Future<RunResult>> results, ExecutorService executorService )
403 throws SurefireBooterForkException
404 {
405 RunResult globalResult = new RunResult( 0, 0, 0, 0 );
406 for ( Future<RunResult> result : results )
407 {
408 try
409 {
410 RunResult cur = result.get();
411 if ( cur != null )
412 {
413 globalResult = globalResult.aggregate( cur );
414 }
415 else
416 {
417 throw new SurefireBooterForkException( "No results for " + result.toString() );
418 }
419 }
420 catch ( InterruptedException e )
421 {
422 executorService.shutdownNow();
423 Thread.currentThread().interrupt();
424 throw new SurefireBooterForkException( "Interrupted", e );
425 }
426 catch ( ExecutionException e )
427 {
428 Throwable realException = e.getCause();
429 String error = realException == null ? "" : realException.getLocalizedMessage();
430 throw new SurefireBooterForkException( "ExecutionException " + error, realException );
431 }
432 }
433 return globalResult;
434 }
435
436 @SuppressWarnings( "checkstyle:magicnumber" )
437 private void closeExecutor( ExecutorService executorService )
438 throws SurefireBooterForkException
439 {
440 executorService.shutdown();
441 try
442 {
443
444 executorService.awaitTermination( 60 * 60, TimeUnit.SECONDS );
445 }
446 catch ( InterruptedException e )
447 {
448 Thread.currentThread().interrupt();
449 throw new SurefireBooterForkException( "Interrupted", e );
450 }
451 }
452
453 private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkClient forkClient,
454 SurefireProperties effectiveSystemProperties,
455 AbstractForkInputStream testProvidingInputStream, boolean readTestsFromInStream )
456 throws SurefireBooterForkException
457 {
458 int forkNumber = ForkNumberBucket.drawNumber();
459 try
460 {
461 return fork( testSet, providerProperties, forkClient, effectiveSystemProperties, forkNumber,
462 testProvidingInputStream, readTestsFromInStream );
463 }
464 finally
465 {
466 ForkNumberBucket.returnNumber( forkNumber );
467 }
468 }
469
470 private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkClient forkClient,
471 SurefireProperties effectiveSystemProperties, int forkNumber,
472 AbstractForkInputStream testProvidingInputStream, boolean readTestsFromInStream )
473 throws SurefireBooterForkException
474 {
475 final File surefireProperties;
476 final File systPropsFile;
477 try
478 {
479 BooterSerializer booterSerializer = new BooterSerializer( forkConfiguration );
480
481 surefireProperties = booterSerializer.serialize( providerProperties, providerConfiguration,
482 startupConfiguration, testSet, readTestsFromInStream );
483
484 if ( effectiveSystemProperties != null )
485 {
486 SurefireProperties filteredProperties =
487 createCopyAndReplaceForkNumPlaceholder( effectiveSystemProperties, forkNumber );
488
489 systPropsFile = writePropertiesFile( filteredProperties, forkConfiguration.getTempDirectory(),
490 "surefire_" + SYSTEM_PROPERTIES_FILE_COUNTER.getAndIncrement(),
491 forkConfiguration.isDebug() );
492 }
493 else
494 {
495 systPropsFile = null;
496 }
497 }
498 catch ( IOException e )
499 {
500 throw new SurefireBooterForkException( "Error creating properties files for forking", e );
501 }
502
503
504 final Classpath bootClasspathConfiguration = startupConfiguration.isProviderMainClass()
505 ? startupConfiguration.getClasspathConfiguration().getProviderClasspath()
506 : forkConfiguration.getBootClasspath();
507
508 Classpath bootClasspath = join(
509 join( bootClasspathConfiguration, startupConfiguration.getClasspathConfiguration().getTestClasspath() ),
510 startupConfiguration.getClasspathConfiguration().getProviderClasspath() );
511
512 if ( log.isDebugEnabled() )
513 {
514 log.debug( bootClasspath.getLogMessage( "boot" ) );
515 log.debug( bootClasspath.getCompactLogMessage( "boot(compact)" ) );
516 }
517
518 OutputStreamFlushableCommandline cli =
519 forkConfiguration.createCommandLine( bootClasspath.getClassPath(), startupConfiguration, forkNumber );
520
521 InputStreamCloser inputStreamCloser = new InputStreamCloser( testProvidingInputStream );
522 Thread inputStreamCloserHook = newDaemonThread( inputStreamCloser, "input-stream-closer" );
523 testProvidingInputStream.setFlushReceiverProvider( cli );
524 addShutDownHook( inputStreamCloserHook );
525
526 cli.createArg().setFile( surefireProperties );
527
528 if ( systPropsFile != null )
529 {
530 cli.createArg().setFile( systPropsFile );
531 }
532
533 ThreadedStreamConsumer threadedStreamConsumer = new ThreadedStreamConsumer( forkClient );
534
535 if ( forkConfiguration.isDebug() )
536 {
537 System.out.println( "Forking command line: " + cli );
538 }
539
540 RunResult runResult = null;
541
542 try
543 {
544 CommandLineCallable future =
545 executeCommandLineAsCallable( cli, testProvidingInputStream, threadedStreamConsumer,
546 threadedStreamConsumer, 0, inputStreamCloser,
547 Charset.forName( FORK_STREAM_CHARSET_NAME ) );
548
549 currentForkClients.add( forkClient );
550
551 int result = future.call();
552
553 if ( forkClient.hadTimeout() )
554 {
555 runResult = timeout( forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult() );
556 }
557 else if ( result != SUCCESS )
558 {
559 throw new SurefireBooterForkException( "Error occurred in starting fork, check output in log" );
560 }
561 }
562 catch ( CommandLineException e )
563 {
564 runResult = failure( forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult(), e );
565 throw new SurefireBooterForkException( "Error while executing forked tests.", e.getCause() );
566 }
567 finally
568 {
569 currentForkClients.remove( forkClient );
570 threadedStreamConsumer.close();
571 inputStreamCloser.run();
572 removeShutdownHook( inputStreamCloserHook );
573
574 if ( runResult == null )
575 {
576 runResult = forkClient.getDefaultReporterFactory().getGlobalRunStatistics().getRunResult();
577 }
578
579 if ( !runResult.isTimeout() )
580 {
581 StackTraceWriter errorInFork = forkClient.getErrorInFork();
582 if ( errorInFork != null )
583 {
584
585 throw new RuntimeException(
586 "There was an error in the forked process\n" + errorInFork.writeTraceToString() );
587 }
588 if ( !forkClient.isSaidGoodBye() )
589 {
590
591 throw new RuntimeException(
592 "The forked VM terminated without properly saying goodbye. VM crash or System.exit called?"
593 + "\nCommand was " + cli.toString() );
594 }
595 }
596 forkClient.close( runResult.isTimeout() );
597 }
598
599 return runResult;
600 }
601
602 private Iterable<Class<?>> getSuitesIterator()
603 throws SurefireBooterForkException
604 {
605 try
606 {
607 final ClasspathConfiguration classpathConfiguration = startupConfiguration.getClasspathConfiguration();
608 ClassLoader unifiedClassLoader = classpathConfiguration.createMergedClassLoader();
609
610 CommonReflector commonReflector = new CommonReflector( unifiedClassLoader );
611 Object reporterFactory = commonReflector.createReportingReporterFactory( startupReportConfiguration );
612
613 ProviderFactory providerFactory =
614 new ProviderFactory( startupConfiguration, providerConfiguration, unifiedClassLoader, reporterFactory );
615 SurefireProvider surefireProvider = providerFactory.createProvider( false );
616 return surefireProvider.getSuites();
617 }
618 catch ( SurefireExecutionException e )
619 {
620 throw new SurefireBooterForkException( "Unable to create classloader to find test suites", e );
621 }
622 }
623
624 private static Thread createImmediateShutdownHookThread( final TestLessInputStreamBuilder builder,
625 final Shutdown shutdownType )
626 {
627 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
628 {
629 public void run()
630 {
631 builder.getImmediateCommands().shutdown( shutdownType );
632 }
633 } );
634 }
635
636 private static Thread createCachableShutdownHookThread( final TestLessInputStreamBuilder builder,
637 final Shutdown shutdownType )
638 {
639 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
640 {
641 public void run()
642 {
643 builder.getCachableCommands().shutdown( shutdownType );
644 }
645 } );
646 }
647
648 private static Thread createShutdownHookThread( final Iterable<TestProvidingInputStream> streams,
649 final Shutdown shutdownType )
650 {
651 return SHUTDOWN_HOOK_THREAD_FACTORY.newThread( new Runnable()
652 {
653 public void run()
654 {
655 for ( TestProvidingInputStream stream : streams )
656 {
657 stream.shutdown( shutdownType );
658 }
659 }
660 } );
661 }
662
663 private static ScheduledExecutorService createPingScheduler()
664 {
665 ThreadFactory threadFactory = newDaemonThreadFactory( "ping-timer-" + PING_IN_SECONDS + "sec" );
666 return Executors.newScheduledThreadPool( 1, threadFactory );
667 }
668
669 private static ScheduledExecutorService createTimeoutCheckScheduler()
670 {
671 ThreadFactory threadFactory = newDaemonThreadFactory( "timeout-check-timer" );
672 return Executors.newScheduledThreadPool( 1, threadFactory );
673 }
674
675 private ScheduledFuture<?> triggerPingTimerForShutdown( final TestLessInputStreamBuilder builder )
676 {
677 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
678 {
679 public void run()
680 {
681 builder.getImmediateCommands().noop();
682 }
683 }, 0, PING_IN_SECONDS, SECONDS );
684 }
685
686 private ScheduledFuture<?> triggerPingTimerForShutdown( final Iterable<TestProvidingInputStream> streams )
687 {
688 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
689 {
690 public void run()
691 {
692 for ( TestProvidingInputStream stream : streams )
693 {
694 stream.noop();
695 }
696 }
697 }, 0, PING_IN_SECONDS, SECONDS );
698 }
699
700 private ScheduledFuture<?> triggerTimeoutCheck()
701 {
702 return pingThreadScheduler.scheduleAtFixedRate( new Runnable()
703 {
704 public void run()
705 {
706 long systemTime = System.currentTimeMillis();
707 for ( ForkClient forkClient : currentForkClients )
708 {
709 forkClient.tryToTimeout( systemTime, forkedProcessTimeoutInSeconds );
710 }
711 }
712 }, 0, TIMEOUT_CHECK_PERIOD_MILLIS, MILLISECONDS );
713 }
714 }