View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.testset.TestSetFailedException;
23  
24  import java.io.DataInputStream;
25  import java.io.EOFException;
26  import java.io.IOException;
27  import java.io.PrintStream;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.CopyOnWriteArrayList;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.Semaphore;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import static java.lang.Thread.State.NEW;
38  import static java.lang.Thread.State.RUNNABLE;
39  import static java.lang.Thread.State.TERMINATED;
40  import static java.lang.StrictMath.max;
41  import static org.apache.maven.surefire.booter.Command.toShutdown;
42  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
43  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
44  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
45  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
48  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
49  import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
50  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
51  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
52  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53  
54  /**
55   * Reader of commands coming from plugin(master) process.
56   *
57   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
58   * @since 2.19
59   */
60  public final class CommandReader
61  {
62      private static final String LAST_TEST_SYMBOL = "";
63  
64      private static final CommandReader READER = new CommandReader();
65  
66      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
67          = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
68  
69      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
70  
71      private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
72  
73      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
74  
75      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
76  
77      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
78  
79      private volatile Shutdown shutdown;
80  
81      private int iteratedCount;
82  
83      public static CommandReader getReader()
84      {
85          final CommandReader reader = READER;
86          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
87          {
88              reader.commandThread.start();
89          }
90          return reader;
91      }
92  
93      public CommandReader setShutdown( Shutdown shutdown )
94      {
95          this.shutdown = shutdown;
96          return this;
97      }
98  
99      public boolean awaitStarted()
100         throws TestSetFailedException
101     {
102         if ( state.get() == RUNNABLE )
103         {
104             try
105             {
106                 startMonitor.await();
107                 return true;
108             }
109             catch ( InterruptedException e )
110             {
111                 throw new TestSetFailedException( e.getLocalizedMessage() );
112             }
113         }
114         else
115         {
116             return false;
117         }
118     }
119 
120     /**
121      * @param listener listener called with <em>Any</em> {@link MasterProcessCommand command type}
122      */
123     public void addListener( CommandListener listener )
124     {
125         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
126     }
127 
128     public void addTestListener( CommandListener listener )
129     {
130         addListener( RUN_CLASS, listener );
131     }
132 
133     public void addTestsFinishedListener( CommandListener listener )
134     {
135         addListener( TEST_SET_FINISHED, listener );
136     }
137 
138     public void addSkipNextTestsListener( CommandListener listener )
139     {
140         addListener( SKIP_SINCE_NEXT_TEST, listener );
141     }
142 
143     public void addShutdownListener( CommandListener listener )
144     {
145         addListener( SHUTDOWN, listener );
146     }
147 
148     public void addNoopListener( CommandListener listener )
149     {
150         addListener( NOOP, listener );
151     }
152 
153     private void addListener( MasterProcessCommand cmd, CommandListener listener )
154     {
155         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
156     }
157 
158     public void removeListener( CommandListener listener )
159     {
160         for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
161         {
162             BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
163             if ( listener == listenerWrapper.getP2() )
164             {
165                 it.remove();
166             }
167         }
168     }
169 
170     /**
171      * @return test classes which have been retrieved by {@link CommandReader#getIterableClasses(PrintStream)}.
172      */
173     Iterator<String> iterated()
174     {
175         return testClasses.subList( 0, iteratedCount ).iterator();
176     }
177 
178     /**
179      * The iterator can be used only in one Thread.
180      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
181      *
182      * @param originalOutStream original stream in current JVM process
183      * @return Iterator with test classes lazily loaded as commands from the main process
184      */
185     Iterable<String> getIterableClasses( PrintStream originalOutStream )
186     {
187         return new ClassesIterable( originalOutStream );
188     }
189 
190     public void stop()
191     {
192         if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
193         {
194             makeQueueFull();
195             listeners.clear();
196             commandThread.interrupt();
197         }
198     }
199 
200     private boolean isStopped()
201     {
202         return state.get() == TERMINATED;
203     }
204 
205     /**
206      * @return <tt>true</tt> if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
207      */
208     private boolean isQueueFull()
209     {
210         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
211         // and we need both atomic.
212         //
213         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
214         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
215         // The insert operation is concurrent with this method.
216         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
217         // allow adding a string after LAST_TEST_SYMBOL.
218         int searchFrom = max( 0, testClasses.size() - 1 );
219         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
220     }
221 
222     private void makeQueueFull()
223     {
224         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
225     }
226 
227     private boolean insertToQueue( String test )
228     {
229         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
230     }
231 
232     private final class ClassesIterable
233         implements Iterable<String>
234     {
235         private final PrintStream originalOutStream;
236 
237         ClassesIterable( PrintStream originalOutStream )
238         {
239             this.originalOutStream = originalOutStream;
240         }
241 
242         public Iterator<String> iterator()
243         {
244             return new ClassesIterator( originalOutStream );
245         }
246     }
247 
248     private final class ClassesIterator
249         implements Iterator<String>
250     {
251         private final PrintStream originalOutStream;
252 
253         private String clazz;
254 
255         private int nextQueueIndex;
256 
257         private ClassesIterator( PrintStream originalOutStream )
258         {
259             this.originalOutStream = originalOutStream;
260         }
261 
262         public boolean hasNext()
263         {
264             popUnread();
265             return isNotBlank( clazz );
266         }
267 
268         public String next()
269         {
270             popUnread();
271             try
272             {
273                 if ( isBlank( clazz ) )
274                 {
275                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
276                 }
277                 else
278                 {
279                     return clazz;
280                 }
281             }
282             finally
283             {
284                 clazz = null;
285             }
286         }
287 
288         public void remove()
289         {
290             throw new UnsupportedOperationException();
291         }
292 
293         private void popUnread()
294         {
295             if ( shouldFinish() )
296             {
297                 clazz = null;
298                 return;
299             }
300 
301             if ( isBlank( clazz ) )
302             {
303                 requestNextTest();
304                 CommandReader.this.awaitNextTest();
305                 if ( shouldFinish() )
306                 {
307                     clazz = null;
308                     return;
309                 }
310                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
311                 CommandReader.this.iteratedCount = nextQueueIndex;
312             }
313 
314             if ( CommandReader.this.isStopped() )
315             {
316                 clazz = null;
317             }
318         }
319 
320         private void requestNextTest()
321         {
322             byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
323             originalOutStream.write( encoded, 0, encoded.length );
324         }
325 
326         private boolean shouldFinish()
327         {
328             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
329             return CommandReader.this.isStopped() || wasLastTestRead;
330         }
331 
332         private boolean isEndSymbolAt( int index )
333         {
334             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
335         }
336     }
337 
338     private void awaitNextTest()
339     {
340         nextCommandNotifier.acquireUninterruptibly();
341     }
342 
343     private void wakeupIterator()
344     {
345         nextCommandNotifier.release();
346     }
347 
348     private final class CommandRunnable
349         implements Runnable
350     {
351         public void run()
352         {
353             CommandReader.this.startMonitor.countDown();
354             DataInputStream stdIn = new DataInputStream( System.in );
355             boolean isTestSetFinished = false;
356             try
357             {
358                 while ( CommandReader.this.state.get() == RUNNABLE )
359                 {
360                     Command command = decode( stdIn );
361                     if ( command == null )
362                     {
363                         System.err.println( "[SUREFIRE] std/in stream corrupted: first sequence not recognized" );
364                         break;
365                     }
366                     else
367                     {
368                         switch ( command.getCommandType() )
369                         {
370                             case RUN_CLASS:
371                                 String test = command.getData();
372                                 boolean inserted = CommandReader.this.insertToQueue( test );
373                                 if ( inserted )
374                                 {
375                                     CommandReader.this.wakeupIterator();
376                                     insertToListeners( command );
377                                 }
378                                 break;
379                             case TEST_SET_FINISHED:
380                                 CommandReader.this.makeQueueFull();
381                                 isTestSetFinished = true;
382                                 CommandReader.this.wakeupIterator();
383                                 insertToListeners( command );
384                                 break;
385                             case SHUTDOWN:
386                                 CommandReader.this.makeQueueFull();
387                                 CommandReader.this.wakeupIterator();
388                                 insertToListeners( command );
389                                 break;
390                             default:
391                                 insertToListeners( command );
392                                 break;
393                         }
394                     }
395                 }
396             }
397             catch ( EOFException e )
398             {
399                 CommandReader.this.state.set( TERMINATED );
400                 if ( !isTestSetFinished )
401                 {
402                     exitByConfiguration();
403                     // does not go to finally
404                 }
405             }
406             catch ( IOException e )
407             {
408                 CommandReader.this.state.set( TERMINATED );
409                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
410                 if ( !( e.getCause() instanceof InterruptedException ) )
411                 {
412                     System.err.println( "[SUREFIRE] std/in stream corrupted" );
413                     e.printStackTrace();
414                 }
415             }
416             finally
417             {
418                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
419                 if ( !isTestSetFinished )
420                 {
421                     CommandReader.this.makeQueueFull();
422                 }
423                 CommandReader.this.wakeupIterator();
424             }
425         }
426 
427         private void insertToListeners( Command cmd )
428         {
429             MasterProcessCommand expectedCommandType = cmd.getCommandType();
430             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
431             {
432                 MasterProcessCommand commandType = listenerWrapper.getP1();
433                 CommandListener listener = listenerWrapper.getP2();
434                 if ( commandType == null || commandType == expectedCommandType )
435                 {
436                     listener.update( cmd );
437                 }
438             }
439         }
440 
441         private void exitByConfiguration()
442         {
443             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
444             if ( shutdown != null )
445             {
446                 CommandReader.this.makeQueueFull();
447                 CommandReader.this.wakeupIterator();
448                 insertToListeners( toShutdown( shutdown ) );
449                 switch ( shutdown )
450                 {
451                     case EXIT:
452                         System.exit( 1 );
453                     case KILL:
454                         Runtime.getRuntime().halt( 1 );
455                     case DEFAULT:
456                     default:
457                         // should not happen; otherwise you missed enum case
458                         break;
459                 }
460             }
461         }
462     }
463 
464 }