View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.NotifiableTestStream;
23  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
24  import org.apache.maven.plugin.surefire.report.DefaultReporterFactory;
25  import org.apache.maven.shared.utils.cli.StreamConsumer;
26  import org.apache.maven.surefire.report.ConsoleOutputReceiver;
27  import org.apache.maven.surefire.report.ReportEntry;
28  import org.apache.maven.surefire.report.RunListener;
29  import org.apache.maven.surefire.report.RunMode;
30  import org.apache.maven.surefire.report.StackTraceWriter;
31  import org.apache.maven.surefire.report.TestSetReportEntry;
32  
33  import java.io.BufferedReader;
34  import java.io.File;
35  import java.io.IOException;
36  import java.io.StringReader;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.TreeSet;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicLong;
45  
46  import static java.lang.System.currentTimeMillis;
47  import static java.util.Collections.unmodifiableMap;
48  import static org.apache.maven.surefire.booter.Shutdown.KILL;
49  import static org.apache.maven.surefire.report.CategorizedReportEntry.reportEntry;
50  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
51  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
52  
53  // todo move to the same package with ForkStarter
54  
55  /**
56   * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
57   *
58   * @author Kristian Rosenvold
59   */
60  public class ForkClient
61       implements StreamConsumer
62  {
63      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
64      private static final long START_TIME_ZERO = 0L;
65      private static final long START_TIME_NEGATIVE_TIMEOUT = -1L;
66  
67      private final DefaultReporterFactory defaultReporterFactory;
68  
69      private final Map<String, String> testVmSystemProperties = new ConcurrentHashMap<>();
70  
71      private final NotifiableTestStream notifiableTestStream;
72  
73      private final Queue<String> testsInProgress = new ConcurrentLinkedQueue<>();
74  
75      /**
76       * <em>testSetStartedAt</em> is set to non-zero after received
77       * {@link org.apache.maven.surefire.booter.ForkedChannelEncoder#testSetStarting(ReportEntry, boolean)}.
78       */
79      private final AtomicLong testSetStartedAt = new AtomicLong( START_TIME_ZERO );
80  
81      private final ForkedChannelDecoder decoder = new ForkedChannelDecoder();
82  
83      private final ConsoleLogger log;
84  
85      /**
86       * prevents from printing same warning
87       */
88      private final AtomicBoolean printedErrorStream;
89  
90      private final int forkNumber;
91  
92      /**
93       * Used by single Thread started by {@link ThreadedStreamConsumer} and therefore does not need to be volatile.
94       */
95      private final ForkedChannelDecoderErrorHandler errorHandler;
96  
97      private RunListener testSetReporter;
98  
99      /**
100      * Written by one Thread and read by another: Main Thread and ForkStarter's Thread.
101      */
102     private volatile boolean saidGoodBye;
103 
104     private volatile StackTraceWriter errorInFork;
105 
106     public ForkClient( DefaultReporterFactory defaultReporterFactory, NotifiableTestStream notifiableTestStream,
107                        ConsoleLogger log, AtomicBoolean printedErrorStream, int forkNumber )
108     {
109         this.defaultReporterFactory = defaultReporterFactory;
110         this.notifiableTestStream = notifiableTestStream;
111         this.log = log;
112         this.printedErrorStream = printedErrorStream;
113         this.forkNumber = forkNumber;
114         decoder.setTestSetStartingListener( new TestSetStartingListener() );
115         decoder.setTestSetCompletedListener( new TestSetCompletedListener() );
116         decoder.setTestStartingListener( new TestStartingListener() );
117         decoder.setTestSucceededListener( new TestSucceededListener() );
118         decoder.setTestFailedListener( new TestFailedListener() );
119         decoder.setTestSkippedListener( new TestSkippedListener() );
120         decoder.setTestErrorListener( new TestErrorListener() );
121         decoder.setTestAssumptionFailureListener( new TestAssumptionFailureListener() );
122         decoder.setSystemPropertiesListener( new SystemPropertiesListener() );
123         decoder.setStdOutListener( new StdOutListener() );
124         decoder.setStdErrListener( new StdErrListener() );
125         decoder.setConsoleInfoListener( new ConsoleListener() );
126         decoder.setAcquireNextTestListener( new AcquireNextTestListener() );
127         decoder.setConsoleErrorListener( new ErrorListener() );
128         decoder.setByeListener( new ByeListener() );
129         decoder.setStopOnNextTestListener( new StopOnNextTestListener() );
130         decoder.setConsoleDebugListener( new DebugListener() );
131         decoder.setConsoleWarningListener( new WarningListener() );
132         errorHandler = new ErrorHandler();
133     }
134 
135     private final class ErrorHandler implements ForkedChannelDecoderErrorHandler
136     {
137         @Override
138         public void handledError( String line, Throwable e )
139         {
140             logStreamWarning( line, e );
141         }
142     }
143 
144     private final class TestSetStartingListener
145             implements ForkedProcessReportEventListener<TestSetReportEntry>
146     {
147         @Override
148         public void handle( RunMode runMode, TestSetReportEntry reportEntry )
149         {
150             getTestSetReporter().testSetStarting( reportEntry );
151             setCurrentStartTime();
152         }
153     }
154 
155     private final class TestSetCompletedListener
156             implements ForkedProcessReportEventListener<TestSetReportEntry>
157     {
158         @Override
159         public void handle( RunMode runMode, TestSetReportEntry reportEntry )
160         {
161             testsInProgress.clear();
162             TestSetReportEntry entry = reportEntry( reportEntry.getSourceName(), reportEntry.getSourceText(),
163                     reportEntry.getName(), reportEntry.getNameText(),
164                     reportEntry.getGroup(), reportEntry.getStackTraceWriter(), reportEntry.getElapsed(),
165                     reportEntry.getMessage(), getTestVmSystemProperties() );
166             getTestSetReporter().testSetCompleted( entry );
167         }
168     }
169 
170     private final class TestStartingListener implements ForkedProcessReportEventListener
171     {
172         @Override
173         public void handle( RunMode runMode, ReportEntry reportEntry )
174         {
175             testsInProgress.offer( reportEntry.getSourceName() );
176             getTestSetReporter().testStarting( reportEntry );
177         }
178     }
179 
180     private final class TestSucceededListener implements ForkedProcessReportEventListener
181     {
182         @Override
183         public void handle( RunMode runMode, ReportEntry reportEntry )
184         {
185             testsInProgress.remove( reportEntry.getSourceName() );
186             getTestSetReporter().testSucceeded( reportEntry );
187         }
188     }
189 
190     private final class TestFailedListener implements ForkedProcessReportEventListener
191     {
192         @Override
193         public void handle( RunMode runMode, ReportEntry reportEntry )
194         {
195             testsInProgress.remove( reportEntry.getSourceName() );
196             getTestSetReporter().testFailed( reportEntry );
197         }
198     }
199 
200     private final class TestSkippedListener implements ForkedProcessReportEventListener
201     {
202         @Override
203         public void handle( RunMode runMode, ReportEntry reportEntry )
204         {
205             testsInProgress.remove( reportEntry.getSourceName() );
206             getTestSetReporter().testSkipped( reportEntry );
207         }
208     }
209 
210     private final class TestErrorListener implements ForkedProcessReportEventListener
211     {
212         @Override
213         public void handle( RunMode runMode, ReportEntry reportEntry )
214         {
215             testsInProgress.remove( reportEntry.getSourceName() );
216             getTestSetReporter().testError( reportEntry );
217         }
218     }
219 
220     private final class TestAssumptionFailureListener implements ForkedProcessReportEventListener
221     {
222         @Override
223         public void handle( RunMode runMode, ReportEntry reportEntry )
224         {
225             testsInProgress.remove( reportEntry.getSourceName() );
226             getTestSetReporter().testAssumptionFailure( reportEntry );
227         }
228     }
229 
230     private final class SystemPropertiesListener implements ForkedProcessPropertyEventListener
231     {
232         @Override
233         public void handle( RunMode runMode, String key, String value )
234         {
235             testVmSystemProperties.put( key, value );
236         }
237     }
238 
239     private final class StdOutListener implements ForkedProcessStandardOutErrEventListener
240     {
241         @Override
242         public void handle( RunMode runMode, String output, boolean newLine )
243         {
244             writeTestOutput( output, newLine, true );
245         }
246     }
247 
248     private final class StdErrListener implements ForkedProcessStandardOutErrEventListener
249     {
250         @Override
251         public void handle( RunMode runMode, String output, boolean newLine )
252         {
253             writeTestOutput( output, newLine, false );
254         }
255     }
256 
257     private final class ConsoleListener implements ForkedProcessStringEventListener
258     {
259         @Override
260         public void handle( String msg )
261         {
262             getOrCreateConsoleLogger()
263                     .info( msg );
264         }
265     }
266 
267     private final class AcquireNextTestListener implements ForkedProcessEventListener
268     {
269         @Override
270         public void handle()
271         {
272             notifiableTestStream.provideNewTest();
273         }
274     }
275 
276     private class ErrorListener implements ForkedProcessStackTraceEventListener
277     {
278         @Override
279         public void handle( String msg, String smartStackTrace, String stackTrace )
280         {
281             if ( errorInFork == null )
282             {
283                 errorInFork = deserializeStackTraceWriter( msg, smartStackTrace, stackTrace );
284                 if ( msg != null )
285                 {
286                     getOrCreateConsoleLogger()
287                             .error( msg );
288                 }
289             }
290             dumpToLoFile( msg, null );
291         }
292     }
293 
294     private final class ByeListener implements ForkedProcessEventListener
295     {
296         @Override
297         public void handle()
298         {
299             saidGoodBye = true;
300             notifiableTestStream.acknowledgeByeEventReceived();
301         }
302     }
303 
304     private final class StopOnNextTestListener implements ForkedProcessEventListener
305     {
306         @Override
307         public void handle()
308         {
309             stopOnNextTest();
310         }
311     }
312 
313     private final class DebugListener implements ForkedProcessStringEventListener
314     {
315         @Override
316         public void handle( String msg )
317         {
318             getOrCreateConsoleLogger()
319                     .debug( msg );
320         }
321     }
322 
323     private final class WarningListener implements ForkedProcessStringEventListener
324     {
325         @Override
326         public void handle( String msg )
327         {
328             getOrCreateConsoleLogger()
329                     .warning( msg );
330         }
331     }
332 
333     /**
334      * Overridden by a subclass, see {@link org.apache.maven.plugin.surefire.booterclient.ForkStarter}.
335      */
336     protected void stopOnNextTest()
337     {
338     }
339 
340     public void kill()
341     {
342         if ( !saidGoodBye )
343         {
344             notifiableTestStream.shutdown( KILL );
345         }
346     }
347 
348     /**
349      * Called in concurrent Thread.
350      * Will shutdown if timeout was reached.
351      *
352      * @param currentTimeMillis    current time in millis seconds
353      * @param forkedProcessTimeoutInSeconds timeout in seconds given by MOJO
354      */
355     public final void tryToTimeout( long currentTimeMillis, int forkedProcessTimeoutInSeconds )
356     {
357         if ( forkedProcessTimeoutInSeconds > 0 )
358         {
359             final long forkedProcessTimeoutInMillis = 1000 * forkedProcessTimeoutInSeconds;
360             final long startedAt = testSetStartedAt.get();
361             if ( startedAt > START_TIME_ZERO && currentTimeMillis - startedAt >= forkedProcessTimeoutInMillis )
362             {
363                 testSetStartedAt.set( START_TIME_NEGATIVE_TIMEOUT );
364                 notifiableTestStream.shutdown( KILL );
365             }
366         }
367     }
368 
369     public final DefaultReporterFactory getDefaultReporterFactory()
370     {
371         return defaultReporterFactory;
372     }
373 
374     @Override
375     public final void consumeLine( String s )
376     {
377         if ( isNotBlank( s ) )
378         {
379             processLine( s );
380         }
381     }
382 
383     private void setCurrentStartTime()
384     {
385         if ( testSetStartedAt.get() == START_TIME_ZERO ) // JIT can optimize <= no JNI call
386         {
387             // Not necessary to call JNI library library #currentTimeMillis
388             // which may waste 10 - 30 machine cycles in callback. Callbacks should be fast.
389             testSetStartedAt.compareAndSet( START_TIME_ZERO, currentTimeMillis() );
390         }
391     }
392 
393     public final boolean hadTimeout()
394     {
395         return testSetStartedAt.get() == START_TIME_NEGATIVE_TIMEOUT;
396     }
397 
398     private RunListener getTestSetReporter()
399     {
400         if ( testSetReporter == null )
401         {
402             testSetReporter = defaultReporterFactory.createReporter();
403         }
404         return testSetReporter;
405     }
406 
407     private void processLine( String event )
408     {
409         decoder.handleEvent( event, errorHandler );
410     }
411 
412     private File dumpToLoFile( String msg, Throwable e )
413     {
414         File reportsDir = defaultReporterFactory.getReportsDirectory();
415         InPluginProcessDumpSingleton util = InPluginProcessDumpSingleton.getSingleton();
416         return e == null
417                 ? util.dumpStreamText( msg, reportsDir, forkNumber )
418                 : util.dumpStreamException( e, msg, reportsDir, forkNumber );
419     }
420 
421     private void logStreamWarning( String event, Throwable e )
422     {
423         if ( event == null || !event.contains( PRINTABLE_JVM_NATIVE_STREAM ) )
424         {
425             String msg = "Corrupted STDOUT by directly writing to native stream in forked JVM " + forkNumber + ".";
426             File dump = dumpToLoFile( msg + " Stream '" + event + "'.", e );
427 
428             if ( printedErrorStream.compareAndSet( false, true ) )
429             {
430                 log.warning( msg + " See FAQ web page and the dump file " + dump.getAbsolutePath() );
431             }
432 
433             if ( log.isDebugEnabled() && event != null )
434             {
435                 log.debug( event );
436             }
437         }
438         else
439         {
440             if ( log.isDebugEnabled() )
441             {
442                 log.debug( event );
443             }
444             else if ( log.isInfoEnabled() )
445             {
446                 log.info( event );
447             }
448             else
449             {
450                 // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
451                 System.out.println( event );
452             }
453         }
454     }
455 
456     private void writeTestOutput( String output, boolean newLine, boolean isStdout )
457     {
458         getOrCreateConsoleOutputReceiver()
459                 .writeTestOutput( output, newLine, isStdout );
460     }
461 
462     public final void consumeMultiLineContent( String s )
463             throws IOException
464     {
465         if ( isBlank( s ) )
466         {
467             logStreamWarning( s, null );
468         }
469         else
470         {
471             BufferedReader stringReader = new BufferedReader( new StringReader( s ) );
472             for ( String s1 = stringReader.readLine(); s1 != null; s1 = stringReader.readLine() )
473             {
474                 consumeLine( s1 );
475             }
476         }
477     }
478 
479     public final Map<String, String> getTestVmSystemProperties()
480     {
481         return unmodifiableMap( testVmSystemProperties );
482     }
483 
484     /**
485      * Used when getting reporters on the plugin side of a fork.
486      * Used by testing purposes only. May not be volatile variable.
487      *
488      * @return A mock provider reporter
489      */
490     public final RunListener getReporter()
491     {
492         return getTestSetReporter();
493     }
494 
495     private ConsoleOutputReceiver getOrCreateConsoleOutputReceiver()
496     {
497         return (ConsoleOutputReceiver) getTestSetReporter();
498     }
499 
500     private ConsoleLogger getOrCreateConsoleLogger()
501     {
502         return (ConsoleLogger) getTestSetReporter();
503     }
504 
505     public void close( boolean hadTimeout )
506     {
507         // no op
508     }
509 
510     public final boolean isSaidGoodBye()
511     {
512         return saidGoodBye;
513     }
514 
515     public final StackTraceWriter getErrorInFork()
516     {
517         return errorInFork;
518     }
519 
520     public final boolean isErrorInFork()
521     {
522         return errorInFork != null;
523     }
524 
525     public Set<String> testsInProgress()
526     {
527         return new TreeSet<>( testsInProgress );
528     }
529 
530     public boolean hasTestsInProgress()
531     {
532         return !testsInProgress.isEmpty();
533     }
534 
535     private StackTraceWriter deserializeStackTraceWriter( String stackTraceMessage,
536                                                           String smartStackTrace, String stackTrace )
537     {
538         boolean hasTrace = stackTrace != null;
539         return hasTrace ? new DeserializedStacktraceWriter( stackTraceMessage, smartStackTrace, stackTrace ) : null;
540     }
541 }