View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24  import org.apache.maven.surefire.testset.TestSetFailedException;
25  
26  import java.io.DataInputStream;
27  import java.io.EOFException;
28  import java.io.IOException;
29  import java.io.PrintStream;
30  import java.util.Iterator;
31  import java.util.NoSuchElementException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import static java.lang.Thread.State.NEW;
40  import static java.lang.Thread.State.RUNNABLE;
41  import static java.lang.Thread.State.TERMINATED;
42  import static java.lang.StrictMath.max;
43  import static org.apache.maven.surefire.booter.Command.toShutdown;
44  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
45  import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
48  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
49  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
50  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
51  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
52  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53  import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
54  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
55  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
56  import static org.apache.maven.surefire.util.internal.ObjectUtils.requireNonNull;
57  
58  /**
59   * Reader of commands coming from plugin(master) process.
60   *
61   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
62   * @since 2.19
63   */
64  public final class CommandReader
65  {
66      private static final String LAST_TEST_SYMBOL = "";
67  
68      private static final CommandReader READER = new CommandReader();
69  
70      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
71          = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
72  
73      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74  
75      private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
76  
77      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78  
79      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80  
81      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
82  
83      private volatile Shutdown shutdown;
84  
85      private int iteratedCount;
86  
87      private volatile ConsoleLogger logger = new NullConsoleLogger();
88  
89      private CommandReader()
90      {
91      }
92  
93      public static CommandReader getReader()
94      {
95          final CommandReader reader = READER;
96          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
97          {
98              reader.commandThread.start();
99          }
100         return reader;
101     }
102 
103     public CommandReader setShutdown( Shutdown shutdown )
104     {
105         this.shutdown = shutdown;
106         return this;
107     }
108 
109     public CommandReader setLogger( ConsoleLogger logger )
110     {
111         this.logger = requireNonNull( logger, "null logger" );
112         return this;
113     }
114 
115     public boolean awaitStarted()
116         throws TestSetFailedException
117     {
118         if ( state.get() == RUNNABLE )
119         {
120             try
121             {
122                 startMonitor.await();
123                 return true;
124             }
125             catch ( InterruptedException e )
126             {
127                 DumpErrorSingleton.getSingleton().dumpException( e );
128                 throw new TestSetFailedException( e.getLocalizedMessage() );
129             }
130         }
131         else
132         {
133             return false;
134         }
135     }
136 
137     /**
138      * @param listener listener called with <em>Any</em> {@link MasterProcessCommand command type}
139      */
140     public void addListener( CommandListener listener )
141     {
142         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
143     }
144 
145     public void addTestListener( CommandListener listener )
146     {
147         addListener( RUN_CLASS, listener );
148     }
149 
150     public void addTestsFinishedListener( CommandListener listener )
151     {
152         addListener( TEST_SET_FINISHED, listener );
153     }
154 
155     public void addSkipNextTestsListener( CommandListener listener )
156     {
157         addListener( SKIP_SINCE_NEXT_TEST, listener );
158     }
159 
160     public void addShutdownListener( CommandListener listener )
161     {
162         addListener( SHUTDOWN, listener );
163     }
164 
165     public void addNoopListener( CommandListener listener )
166     {
167         addListener( NOOP, listener );
168     }
169 
170     public void addByeAckListener( CommandListener listener )
171     {
172         addListener( BYE_ACK, listener );
173     }
174 
175     private void addListener( MasterProcessCommand cmd, CommandListener listener )
176     {
177         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
178     }
179 
180     public void removeListener( CommandListener listener )
181     {
182         for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
183         {
184             BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
185             if ( listener == listenerWrapper.getP2() )
186             {
187                 it.remove();
188             }
189         }
190     }
191 
192     /**
193      * @return test classes which have been retrieved by {@link CommandReader#getIterableClasses(PrintStream)}.
194      */
195     Iterator<String> iterated()
196     {
197         return testClasses.subList( 0, iteratedCount ).iterator();
198     }
199 
200     /**
201      * The iterator can be used only in one Thread.
202      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
203      *
204      * @param originalOutStream original stream in current JVM process
205      * @return Iterator with test classes lazily loaded as commands from the main process
206      */
207     Iterable<String> getIterableClasses( PrintStream originalOutStream )
208     {
209         return new ClassesIterable( originalOutStream );
210     }
211 
212     public void stop()
213     {
214         if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
215         {
216             makeQueueFull();
217             listeners.clear();
218             commandThread.interrupt();
219         }
220     }
221 
222     private boolean isStopped()
223     {
224         return state.get() == TERMINATED;
225     }
226 
227     /**
228      * @return <tt>true</tt> if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
229      */
230     private boolean isQueueFull()
231     {
232         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
233         // and we need both atomic.
234         //
235         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
236         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
237         // The insert operation is concurrent with this method.
238         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
239         // allow adding a string after LAST_TEST_SYMBOL.
240         int searchFrom = max( 0, testClasses.size() - 1 );
241         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
242     }
243 
244     private void makeQueueFull()
245     {
246         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
247     }
248 
249     private boolean insertToQueue( String test )
250     {
251         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
252     }
253 
254     private final class ClassesIterable
255         implements Iterable<String>
256     {
257         private final PrintStream originalOutStream;
258 
259         ClassesIterable( PrintStream originalOutStream )
260         {
261             this.originalOutStream = originalOutStream;
262         }
263 
264         public Iterator<String> iterator()
265         {
266             return new ClassesIterator( originalOutStream );
267         }
268     }
269 
270     private final class ClassesIterator
271         implements Iterator<String>
272     {
273         private final PrintStream originalOutStream;
274 
275         private String clazz;
276 
277         private int nextQueueIndex;
278 
279         private ClassesIterator( PrintStream originalOutStream )
280         {
281             this.originalOutStream = originalOutStream;
282         }
283 
284         public boolean hasNext()
285         {
286             popUnread();
287             return isNotBlank( clazz );
288         }
289 
290         public String next()
291         {
292             popUnread();
293             try
294             {
295                 if ( isBlank( clazz ) )
296                 {
297                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
298                 }
299                 else
300                 {
301                     return clazz;
302                 }
303             }
304             finally
305             {
306                 clazz = null;
307             }
308         }
309 
310         public void remove()
311         {
312             throw new UnsupportedOperationException();
313         }
314 
315         private void popUnread()
316         {
317             if ( shouldFinish() )
318             {
319                 clazz = null;
320                 return;
321             }
322 
323             if ( isBlank( clazz ) )
324             {
325                 requestNextTest();
326                 CommandReader.this.awaitNextTest();
327                 if ( shouldFinish() )
328                 {
329                     clazz = null;
330                     return;
331                 }
332                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
333                 CommandReader.this.iteratedCount = nextQueueIndex;
334             }
335 
336             if ( CommandReader.this.isStopped() )
337             {
338                 clazz = null;
339             }
340         }
341 
342         private void requestNextTest()
343         {
344             byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
345             synchronized ( originalOutStream )
346             {
347                 originalOutStream.write( encoded, 0, encoded.length );
348                 originalOutStream.flush();
349             }
350         }
351 
352         private boolean shouldFinish()
353         {
354             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
355             return CommandReader.this.isStopped() || wasLastTestRead;
356         }
357 
358         private boolean isEndSymbolAt( int index )
359         {
360             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
361         }
362     }
363 
364     private void awaitNextTest()
365     {
366         nextCommandNotifier.acquireUninterruptibly();
367     }
368 
369     private void wakeupIterator()
370     {
371         nextCommandNotifier.release();
372     }
373 
374     private final class CommandRunnable
375         implements Runnable
376     {
377         public void run()
378         {
379             CommandReader.this.startMonitor.countDown();
380             DataInputStream stdIn = new DataInputStream( System.in );
381             boolean isTestSetFinished = false;
382             try
383             {
384                 while ( CommandReader.this.state.get() == RUNNABLE )
385                 {
386                     Command command = decode( stdIn );
387                     if ( command == null )
388                     {
389                         String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
390                         DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
391                         logger.error( errorMessage );
392                         break;
393                     }
394                     else
395                     {
396                         switch ( command.getCommandType() )
397                         {
398                             case RUN_CLASS:
399                                 String test = command.getData();
400                                 boolean inserted = CommandReader.this.insertToQueue( test );
401                                 if ( inserted )
402                                 {
403                                     CommandReader.this.wakeupIterator();
404                                     insertToListeners( command );
405                                 }
406                                 break;
407                             case TEST_SET_FINISHED:
408                                 CommandReader.this.makeQueueFull();
409                                 isTestSetFinished = true;
410                                 CommandReader.this.wakeupIterator();
411                                 insertToListeners( command );
412                                 break;
413                             case SHUTDOWN:
414                                 CommandReader.this.makeQueueFull();
415                                 CommandReader.this.wakeupIterator();
416                                 insertToListeners( command );
417                                 break;
418                             default:
419                                 insertToListeners( command );
420                                 break;
421                         }
422                     }
423                 }
424             }
425             catch ( EOFException e )
426             {
427                 CommandReader.this.state.set( TERMINATED );
428                 if ( !isTestSetFinished )
429                 {
430                     String msg = "TestSet has not finished before stream error has appeared >> "
431                                          + "initializing exit by non-null configuration: "
432                                          + CommandReader.this.shutdown;
433                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
434 
435                     exitByConfiguration();
436                     // does not go to finally
437                 }
438             }
439             catch ( IOException e )
440             {
441                 CommandReader.this.state.set( TERMINATED );
442                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
443                 if ( !( e.getCause() instanceof InterruptedException ) )
444                 {
445                     String msg = "[SUREFIRE] std/in stream corrupted";
446                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
447                     logger.error( msg, e );
448                 }
449             }
450             finally
451             {
452                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
453                 if ( !isTestSetFinished )
454                 {
455                     CommandReader.this.makeQueueFull();
456                 }
457                 CommandReader.this.wakeupIterator();
458             }
459         }
460 
461         private void insertToListeners( Command cmd )
462         {
463             MasterProcessCommand expectedCommandType = cmd.getCommandType();
464             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
465             {
466                 MasterProcessCommand commandType = listenerWrapper.getP1();
467                 CommandListener listener = listenerWrapper.getP2();
468                 if ( commandType == null || commandType == expectedCommandType )
469                 {
470                     listener.update( cmd );
471                 }
472             }
473         }
474 
475         private void exitByConfiguration()
476         {
477             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
478             if ( shutdown != null )
479             {
480                 CommandReader.this.makeQueueFull();
481                 CommandReader.this.wakeupIterator();
482                 insertToListeners( toShutdown( shutdown ) );
483                 if ( shutdown.isExit() )
484                 {
485                     System.exit( 1 );
486                 }
487                 else if ( shutdown.isKill() )
488                 {
489                     Runtime.getRuntime().halt( 1 );
490                 }
491                 // else is default: should not happen; otherwise you missed enum case
492             }
493         }
494     }
495 
496 }