View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24  import org.apache.maven.surefire.testset.TestSetFailedException;
25  
26  import java.io.DataInputStream;
27  import java.io.EOFException;
28  import java.io.IOException;
29  import java.io.PrintStream;
30  import java.util.Iterator;
31  import java.util.NoSuchElementException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import static java.util.Objects.requireNonNull;
40  import static java.lang.Thread.State.NEW;
41  import static java.lang.Thread.State.RUNNABLE;
42  import static java.lang.Thread.State.TERMINATED;
43  import static java.lang.StrictMath.max;
44  import static org.apache.maven.surefire.booter.Command.toShutdown;
45  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
48  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
49  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
50  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
51  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
52  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
53  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
54  import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
55  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
56  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
57  
58  /**
59   * Reader of commands coming from plugin(master) process.
60   *
61   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
62   * @since 2.19
63   */
64  public final class CommandReader
65  {
66      private static final String LAST_TEST_SYMBOL = "";
67  
68      private static final CommandReader READER = new CommandReader();
69  
70      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
71  
72      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
73  
74      private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
75  
76      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
77  
78      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
79  
80      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
81  
82      private volatile Shutdown shutdown;
83  
84      private int iteratedCount;
85  
86      private volatile ConsoleLogger logger = new NullConsoleLogger();
87  
88      private CommandReader()
89      {
90      }
91  
92      public static CommandReader getReader()
93      {
94          final CommandReader reader = READER;
95          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
96          {
97              reader.commandThread.start();
98          }
99          return reader;
100     }
101 
102     public CommandReader setShutdown( Shutdown shutdown )
103     {
104         this.shutdown = shutdown;
105         return this;
106     }
107 
108     public CommandReader setLogger( ConsoleLogger logger )
109     {
110         this.logger = requireNonNull( logger, "null logger" );
111         return this;
112     }
113 
114     public boolean awaitStarted()
115         throws TestSetFailedException
116     {
117         if ( state.get() == RUNNABLE )
118         {
119             try
120             {
121                 startMonitor.await();
122                 return true;
123             }
124             catch ( InterruptedException e )
125             {
126                 DumpErrorSingleton.getSingleton().dumpException( e );
127                 throw new TestSetFailedException( e.getLocalizedMessage() );
128             }
129         }
130         else
131         {
132             return false;
133         }
134     }
135 
136     /**
137      * @param listener listener called with <b>Any</b> {@link MasterProcessCommand command type}
138      */
139     public void addListener( CommandListener listener )
140     {
141         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
142     }
143 
144     public void addTestListener( CommandListener listener )
145     {
146         addListener( RUN_CLASS, listener );
147     }
148 
149     public void addTestsFinishedListener( CommandListener listener )
150     {
151         addListener( TEST_SET_FINISHED, listener );
152     }
153 
154     public void addSkipNextTestsListener( CommandListener listener )
155     {
156         addListener( SKIP_SINCE_NEXT_TEST, listener );
157     }
158 
159     public void addShutdownListener( CommandListener listener )
160     {
161         addListener( SHUTDOWN, listener );
162     }
163 
164     public void addNoopListener( CommandListener listener )
165     {
166         addListener( NOOP, listener );
167     }
168 
169     public void addByeAckListener( CommandListener listener )
170     {
171         addListener( BYE_ACK, listener );
172     }
173 
174     private void addListener( MasterProcessCommand cmd, CommandListener listener )
175     {
176         listeners.add( new BiProperty<>( cmd, listener ) );
177     }
178 
179     public void removeListener( CommandListener listener )
180     {
181         for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
182         {
183             BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
184             if ( listener == listenerWrapper.getP2() )
185             {
186                 it.remove();
187             }
188         }
189     }
190 
191     /**
192      * @return test classes which have been retrieved by {@link CommandReader#getIterableClasses(PrintStream)}.
193      */
194     Iterator<String> iterated()
195     {
196         return testClasses.subList( 0, iteratedCount ).iterator();
197     }
198 
199     /**
200      * The iterator can be used only in one Thread.
201      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
202      *
203      * @param originalOutStream original stream in current JVM process
204      * @return Iterator with test classes lazily loaded as commands from the main process
205      */
206     Iterable<String> getIterableClasses( PrintStream originalOutStream )
207     {
208         return new ClassesIterable( originalOutStream );
209     }
210 
211     public void stop()
212     {
213         if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
214         {
215             makeQueueFull();
216             listeners.clear();
217             commandThread.interrupt();
218         }
219     }
220 
221     private boolean isStopped()
222     {
223         return state.get() == TERMINATED;
224     }
225 
226     /**
227      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
228      */
229     private boolean isQueueFull()
230     {
231         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
232         // and we need both atomic.
233         //
234         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
235         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
236         // The insert operation is concurrent with this method.
237         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
238         // allow adding a string after LAST_TEST_SYMBOL.
239         int searchFrom = max( 0, testClasses.size() - 1 );
240         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
241     }
242 
243     private void makeQueueFull()
244     {
245         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
246     }
247 
248     private boolean insertToQueue( String test )
249     {
250         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
251     }
252 
253     private final class ClassesIterable
254         implements Iterable<String>
255     {
256         private final PrintStream originalOutStream;
257 
258         ClassesIterable( PrintStream originalOutStream )
259         {
260             this.originalOutStream = originalOutStream;
261         }
262 
263         @Override
264         public Iterator<String> iterator()
265         {
266             return new ClassesIterator( originalOutStream );
267         }
268     }
269 
270     private final class ClassesIterator
271         implements Iterator<String>
272     {
273         private final PrintStream originalOutStream;
274 
275         private String clazz;
276 
277         private int nextQueueIndex;
278 
279         private ClassesIterator( PrintStream originalOutStream )
280         {
281             this.originalOutStream = originalOutStream;
282         }
283 
284         @Override
285         public boolean hasNext()
286         {
287             popUnread();
288             return isNotBlank( clazz );
289         }
290 
291         @Override
292         public String next()
293         {
294             popUnread();
295             try
296             {
297                 if ( isBlank( clazz ) )
298                 {
299                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
300                 }
301                 else
302                 {
303                     return clazz;
304                 }
305             }
306             finally
307             {
308                 clazz = null;
309             }
310         }
311 
312         @Override
313         public void remove()
314         {
315             throw new UnsupportedOperationException();
316         }
317 
318         private void popUnread()
319         {
320             if ( shouldFinish() )
321             {
322                 clazz = null;
323                 return;
324             }
325 
326             if ( isBlank( clazz ) )
327             {
328                 requestNextTest();
329                 CommandReader.this.awaitNextTest();
330                 if ( shouldFinish() )
331                 {
332                     clazz = null;
333                     return;
334                 }
335                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
336                 CommandReader.this.iteratedCount = nextQueueIndex;
337             }
338 
339             if ( CommandReader.this.isStopped() )
340             {
341                 clazz = null;
342             }
343         }
344 
345         private void requestNextTest()
346         {
347             byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
348             synchronized ( originalOutStream )
349             {
350                 originalOutStream.write( encoded, 0, encoded.length );
351                 originalOutStream.flush();
352             }
353         }
354 
355         private boolean shouldFinish()
356         {
357             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
358             return CommandReader.this.isStopped() || wasLastTestRead;
359         }
360 
361         private boolean isEndSymbolAt( int index )
362         {
363             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
364         }
365     }
366 
367     private void awaitNextTest()
368     {
369         nextCommandNotifier.acquireUninterruptibly();
370     }
371 
372     private void wakeupIterator()
373     {
374         nextCommandNotifier.release();
375     }
376 
377     private final class CommandRunnable
378         implements Runnable
379     {
380         @Override
381         public void run()
382         {
383             CommandReader.this.startMonitor.countDown();
384             DataInputStream stdIn = new DataInputStream( System.in );
385             boolean isTestSetFinished = false;
386             try
387             {
388                 while ( CommandReader.this.state.get() == RUNNABLE )
389                 {
390                     Command command = decode( stdIn );
391                     if ( command == null )
392                     {
393                         String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
394                         DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
395                         logger.error( errorMessage );
396                         break;
397                     }
398                     else
399                     {
400                         switch ( command.getCommandType() )
401                         {
402                             case RUN_CLASS:
403                                 String test = command.getData();
404                                 boolean inserted = CommandReader.this.insertToQueue( test );
405                                 if ( inserted )
406                                 {
407                                     CommandReader.this.wakeupIterator();
408                                     insertToListeners( command );
409                                 }
410                                 break;
411                             case TEST_SET_FINISHED:
412                                 CommandReader.this.makeQueueFull();
413                                 isTestSetFinished = true;
414                                 CommandReader.this.wakeupIterator();
415                                 insertToListeners( command );
416                                 break;
417                             case SHUTDOWN:
418                                 CommandReader.this.makeQueueFull();
419                                 CommandReader.this.wakeupIterator();
420                                 insertToListeners( command );
421                                 break;
422                             default:
423                                 insertToListeners( command );
424                                 break;
425                         }
426                     }
427                 }
428             }
429             catch ( EOFException e )
430             {
431                 CommandReader.this.state.set( TERMINATED );
432                 if ( !isTestSetFinished )
433                 {
434                     String msg = "TestSet has not finished before stream error has appeared >> "
435                                          + "initializing exit by non-null configuration: "
436                                          + CommandReader.this.shutdown;
437                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
438 
439                     exitByConfiguration();
440                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
441                 }
442             }
443             catch ( IOException e )
444             {
445                 CommandReader.this.state.set( TERMINATED );
446                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
447                 if ( !( e.getCause() instanceof InterruptedException ) )
448                 {
449                     String msg = "[SUREFIRE] std/in stream corrupted";
450                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
451                     logger.error( msg, e );
452                 }
453             }
454             finally
455             {
456                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
457                 if ( !isTestSetFinished )
458                 {
459                     CommandReader.this.makeQueueFull();
460                 }
461                 CommandReader.this.wakeupIterator();
462             }
463         }
464 
465         private void insertToListeners( Command cmd )
466         {
467             MasterProcessCommand expectedCommandType = cmd.getCommandType();
468             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
469             {
470                 MasterProcessCommand commandType = listenerWrapper.getP1();
471                 CommandListener listener = listenerWrapper.getP2();
472                 if ( commandType == null || commandType == expectedCommandType )
473                 {
474                     listener.update( cmd );
475                 }
476             }
477         }
478 
479         private void exitByConfiguration()
480         {
481             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
482             if ( shutdown != null )
483             {
484                 CommandReader.this.makeQueueFull();
485                 CommandReader.this.wakeupIterator();
486                 insertToListeners( toShutdown( shutdown ) );
487                 if ( shutdown.isExit() )
488                 {
489                     System.exit( 1 );
490                 }
491                 else if ( shutdown.isKill() )
492                 {
493                     Runtime.getRuntime().halt( 1 );
494                 }
495                 // else is default: other than Shutdown.DEFAULT should not happen; otherwise you missed enum case
496             }
497         }
498     }
499 
500 }