View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24  import org.apache.maven.surefire.testset.TestSetFailedException;
25  
26  import java.io.DataInputStream;
27  import java.io.EOFException;
28  import java.io.IOException;
29  import java.io.PrintStream;
30  import java.util.Iterator;
31  import java.util.NoSuchElementException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import static java.lang.Thread.State.NEW;
40  import static java.lang.Thread.State.RUNNABLE;
41  import static java.lang.Thread.State.TERMINATED;
42  import static java.lang.StrictMath.max;
43  import static org.apache.maven.surefire.booter.Command.toShutdown;
44  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
45  import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
48  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
49  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
50  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
51  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
52  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53  import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
54  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
55  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
56  import static org.apache.maven.surefire.util.internal.ObjectUtils.requireNonNull;
57  
58  /**
59   * Reader of commands coming from plugin(master) process.
60   *
61   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
62   * @since 2.19
63   */
64  public final class CommandReader
65  {
66      private static final String LAST_TEST_SYMBOL = "";
67  
68      private static final CommandReader READER = new CommandReader();
69  
70      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
71          = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
72  
73      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74  
75      private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
76  
77      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78  
79      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80  
81      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
82  
83      private volatile Shutdown shutdown;
84  
85      private int iteratedCount;
86  
87      private volatile ConsoleLogger logger = new NullConsoleLogger();
88  
89      private CommandReader()
90      {
91      }
92  
93      public static CommandReader getReader()
94      {
95          final CommandReader reader = READER;
96          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
97          {
98              reader.commandThread.start();
99          }
100         return reader;
101     }
102 
103     public CommandReader setShutdown( Shutdown shutdown )
104     {
105         this.shutdown = shutdown;
106         return this;
107     }
108 
109     public CommandReader setLogger( ConsoleLogger logger )
110     {
111         this.logger = requireNonNull( logger, "null logger" );
112         return this;
113     }
114 
115     public boolean awaitStarted()
116         throws TestSetFailedException
117     {
118         if ( state.get() == RUNNABLE )
119         {
120             try
121             {
122                 startMonitor.await();
123                 return true;
124             }
125             catch ( InterruptedException e )
126             {
127                 DumpErrorSingleton.getSingleton().dumpException( e );
128                 throw new TestSetFailedException( e.getLocalizedMessage() );
129             }
130         }
131         else
132         {
133             return false;
134         }
135     }
136 
137     /**
138      * @param listener listener called with <b>Any</b> {@link MasterProcessCommand command type}
139      */
140     public void addListener( CommandListener listener )
141     {
142         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
143     }
144 
145     public void addTestListener( CommandListener listener )
146     {
147         addListener( RUN_CLASS, listener );
148     }
149 
150     public void addTestsFinishedListener( CommandListener listener )
151     {
152         addListener( TEST_SET_FINISHED, listener );
153     }
154 
155     public void addSkipNextTestsListener( CommandListener listener )
156     {
157         addListener( SKIP_SINCE_NEXT_TEST, listener );
158     }
159 
160     public void addShutdownListener( CommandListener listener )
161     {
162         addListener( SHUTDOWN, listener );
163     }
164 
165     public void addNoopListener( CommandListener listener )
166     {
167         addListener( NOOP, listener );
168     }
169 
170     public void addByeAckListener( CommandListener listener )
171     {
172         addListener( BYE_ACK, listener );
173     }
174 
175     private void addListener( MasterProcessCommand cmd, CommandListener listener )
176     {
177         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
178     }
179 
180     public void removeListener( CommandListener listener )
181     {
182         for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
183         {
184             BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
185             if ( listener == listenerWrapper.getP2() )
186             {
187                 it.remove();
188             }
189         }
190     }
191 
192     /**
193      * @return test classes which have been retrieved by {@link CommandReader#getIterableClasses(PrintStream)}.
194      */
195     Iterator<String> iterated()
196     {
197         return testClasses.subList( 0, iteratedCount ).iterator();
198     }
199 
200     /**
201      * The iterator can be used only in one Thread.
202      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
203      *
204      * @param originalOutStream original stream in current JVM process
205      * @return Iterator with test classes lazily loaded as commands from the main process
206      */
207     Iterable<String> getIterableClasses( PrintStream originalOutStream )
208     {
209         return new ClassesIterable( originalOutStream );
210     }
211 
212     public void stop()
213     {
214         if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
215         {
216             makeQueueFull();
217             listeners.clear();
218             commandThread.interrupt();
219         }
220     }
221 
222     private boolean isStopped()
223     {
224         return state.get() == TERMINATED;
225     }
226 
227     /**
228      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
229      */
230     private boolean isQueueFull()
231     {
232         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
233         // and we need both atomic.
234         //
235         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
236         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
237         // The insert operation is concurrent with this method.
238         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
239         // allow adding a string after LAST_TEST_SYMBOL.
240         int searchFrom = max( 0, testClasses.size() - 1 );
241         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
242     }
243 
244     private void makeQueueFull()
245     {
246         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
247     }
248 
249     private boolean insertToQueue( String test )
250     {
251         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
252     }
253 
254     private final class ClassesIterable
255         implements Iterable<String>
256     {
257         private final PrintStream originalOutStream;
258 
259         ClassesIterable( PrintStream originalOutStream )
260         {
261             this.originalOutStream = originalOutStream;
262         }
263 
264         @Override
265         public Iterator<String> iterator()
266         {
267             return new ClassesIterator( originalOutStream );
268         }
269     }
270 
271     private final class ClassesIterator
272         implements Iterator<String>
273     {
274         private final PrintStream originalOutStream;
275 
276         private String clazz;
277 
278         private int nextQueueIndex;
279 
280         private ClassesIterator( PrintStream originalOutStream )
281         {
282             this.originalOutStream = originalOutStream;
283         }
284 
285         @Override
286         public boolean hasNext()
287         {
288             popUnread();
289             return isNotBlank( clazz );
290         }
291 
292         @Override
293         public String next()
294         {
295             popUnread();
296             try
297             {
298                 if ( isBlank( clazz ) )
299                 {
300                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
301                 }
302                 else
303                 {
304                     return clazz;
305                 }
306             }
307             finally
308             {
309                 clazz = null;
310             }
311         }
312 
313         @Override
314         public void remove()
315         {
316             throw new UnsupportedOperationException();
317         }
318 
319         private void popUnread()
320         {
321             if ( shouldFinish() )
322             {
323                 clazz = null;
324                 return;
325             }
326 
327             if ( isBlank( clazz ) )
328             {
329                 requestNextTest();
330                 CommandReader.this.awaitNextTest();
331                 if ( shouldFinish() )
332                 {
333                     clazz = null;
334                     return;
335                 }
336                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
337                 CommandReader.this.iteratedCount = nextQueueIndex;
338             }
339 
340             if ( CommandReader.this.isStopped() )
341             {
342                 clazz = null;
343             }
344         }
345 
346         private void requestNextTest()
347         {
348             byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
349             synchronized ( originalOutStream )
350             {
351                 originalOutStream.write( encoded, 0, encoded.length );
352                 originalOutStream.flush();
353             }
354         }
355 
356         private boolean shouldFinish()
357         {
358             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
359             return CommandReader.this.isStopped() || wasLastTestRead;
360         }
361 
362         private boolean isEndSymbolAt( int index )
363         {
364             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
365         }
366     }
367 
368     private void awaitNextTest()
369     {
370         nextCommandNotifier.acquireUninterruptibly();
371     }
372 
373     private void wakeupIterator()
374     {
375         nextCommandNotifier.release();
376     }
377 
378     private final class CommandRunnable
379         implements Runnable
380     {
381         @Override
382         public void run()
383         {
384             CommandReader.this.startMonitor.countDown();
385             DataInputStream stdIn = new DataInputStream( System.in );
386             boolean isTestSetFinished = false;
387             try
388             {
389                 while ( CommandReader.this.state.get() == RUNNABLE )
390                 {
391                     Command command = decode( stdIn );
392                     if ( command == null )
393                     {
394                         String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
395                         DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
396                         logger.error( errorMessage );
397                         break;
398                     }
399                     else
400                     {
401                         switch ( command.getCommandType() )
402                         {
403                             case RUN_CLASS:
404                                 String test = command.getData();
405                                 boolean inserted = CommandReader.this.insertToQueue( test );
406                                 if ( inserted )
407                                 {
408                                     CommandReader.this.wakeupIterator();
409                                     insertToListeners( command );
410                                 }
411                                 break;
412                             case TEST_SET_FINISHED:
413                                 CommandReader.this.makeQueueFull();
414                                 isTestSetFinished = true;
415                                 CommandReader.this.wakeupIterator();
416                                 insertToListeners( command );
417                                 break;
418                             case SHUTDOWN:
419                                 CommandReader.this.makeQueueFull();
420                                 CommandReader.this.wakeupIterator();
421                                 insertToListeners( command );
422                                 break;
423                             default:
424                                 insertToListeners( command );
425                                 break;
426                         }
427                     }
428                 }
429             }
430             catch ( EOFException e )
431             {
432                 CommandReader.this.state.set( TERMINATED );
433                 if ( !isTestSetFinished )
434                 {
435                     String msg = "TestSet has not finished before stream error has appeared >> "
436                                          + "initializing exit by non-null configuration: "
437                                          + CommandReader.this.shutdown;
438                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
439 
440                     exitByConfiguration();
441                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
442                 }
443             }
444             catch ( IOException e )
445             {
446                 CommandReader.this.state.set( TERMINATED );
447                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
448                 if ( !( e.getCause() instanceof InterruptedException ) )
449                 {
450                     String msg = "[SUREFIRE] std/in stream corrupted";
451                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
452                     logger.error( msg, e );
453                 }
454             }
455             finally
456             {
457                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
458                 if ( !isTestSetFinished )
459                 {
460                     CommandReader.this.makeQueueFull();
461                 }
462                 CommandReader.this.wakeupIterator();
463             }
464         }
465 
466         private void insertToListeners( Command cmd )
467         {
468             MasterProcessCommand expectedCommandType = cmd.getCommandType();
469             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
470             {
471                 MasterProcessCommand commandType = listenerWrapper.getP1();
472                 CommandListener listener = listenerWrapper.getP2();
473                 if ( commandType == null || commandType == expectedCommandType )
474                 {
475                     listener.update( cmd );
476                 }
477             }
478         }
479 
480         private void exitByConfiguration()
481         {
482             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
483             if ( shutdown != null )
484             {
485                 CommandReader.this.makeQueueFull();
486                 CommandReader.this.wakeupIterator();
487                 insertToListeners( toShutdown( shutdown ) );
488                 if ( shutdown.isExit() )
489                 {
490                     System.exit( 1 );
491                 }
492                 else if ( shutdown.isKill() )
493                 {
494                     Runtime.getRuntime().halt( 1 );
495                 }
496                 // else is default: other than Shutdown.DEFAULT should not happen; otherwise you missed enum case
497             }
498         }
499     }
500 
501 }