View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.testset.TestSetFailedException;
23  
24  import java.io.DataInputStream;
25  import java.io.EOFException;
26  import java.io.IOException;
27  import java.io.PrintStream;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import static java.lang.Thread.State.NEW;
36  import static java.lang.Thread.State.RUNNABLE;
37  import static java.lang.Thread.State.TERMINATED;
38  import static java.util.concurrent.locks.LockSupport.park;
39  import static java.util.concurrent.locks.LockSupport.unpark;
40  import static org.apache.maven.surefire.booter.Command.toShutdown;
41  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
42  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
43  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
44  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
45  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
48  import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
49  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
50  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
51  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
52  
53  /**
54   * Reader of commands coming from plugin(master) process.
55   *
56   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
57   * @since 2.19
58   */
59  public final class MasterProcessReader
60  {
61      private static final MasterProcessReader READER = new MasterProcessReader();
62  
63      private final Queue<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>> listeners
64          = new ConcurrentLinkedQueue<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>>();
65  
66      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
67  
68      private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
69  
70      private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
71  
72      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
73  
74      private final Node headTestClassQueue = new Node();
75  
76      private volatile Node tailTestClassQueue = headTestClassQueue;
77  
78      private volatile Shutdown shutdown;
79  
80      private static class Node
81      {
82          final AtomicReference<Node> successor = new AtomicReference<Node>();
83          volatile String item;
84      }
85  
86      public static MasterProcessReader getReader()
87      {
88          final MasterProcessReader reader = READER;
89          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
90          {
91              reader.commandThread.start();
92          }
93          return reader;
94      }
95  
96      public MasterProcessReader setShutdown( Shutdown shutdown )
97      {
98          this.shutdown = shutdown;
99          return this;
100     }
101 
102     public boolean awaitStarted()
103         throws TestSetFailedException
104     {
105         if ( state.get() == RUNNABLE )
106         {
107             try
108             {
109                 startMonitor.await();
110                 return true;
111             }
112             catch ( InterruptedException e )
113             {
114                 throw new TestSetFailedException( e.getLocalizedMessage() );
115             }
116         }
117         else
118         {
119             return false;
120         }
121     }
122 
123     /**
124      * @param listener listener called with <em>Any</em> {@link MasterProcessCommand command type}
125      */
126     public void addListener( MasterProcessListener listener )
127     {
128         listeners.add( new TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>( null, listener ) );
129     }
130 
131     public void addTestListener( MasterProcessListener listener )
132     {
133         addListener( RUN_CLASS, listener );
134     }
135 
136     public void addTestsFinishedListener( MasterProcessListener listener )
137     {
138         addListener( TEST_SET_FINISHED, listener );
139     }
140 
141     public void addSkipNextListener( MasterProcessListener listener )
142     {
143         addListener( SKIP_SINCE_NEXT_TEST, listener );
144     }
145 
146     public void addShutdownListener( MasterProcessListener listener )
147     {
148         addListener( SHUTDOWN, listener );
149     }
150 
151     public void addNoopListener( MasterProcessListener listener )
152     {
153         addListener( NOOP, listener );
154     }
155 
156     private void addListener( MasterProcessCommand cmd, MasterProcessListener listener )
157     {
158         listeners.add( new TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>( cmd, listener ) );
159     }
160 
161     public void removeListener( MasterProcessListener listener )
162     {
163         for ( Iterator<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>> it = listeners.iterator();
164             it.hasNext(); )
165         {
166             TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener> listenerWrapper = it.next();
167             if ( listener == listenerWrapper.getP2() )
168             {
169                 it.remove();
170             }
171         }
172     }
173 
174     Iterable<String> getIterableClasses( PrintStream originalOutStream )
175     {
176         return new ClassesIterable( headTestClassQueue, originalOutStream );
177     }
178 
179     public void stop()
180     {
181         if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
182         {
183             makeQueueFull();
184             listeners.clear();
185             commandThread.interrupt();
186         }
187     }
188 
189     private boolean isStopped()
190     {
191         return state.get() == TERMINATED;
192     }
193 
194     private static boolean isLastNode( Node current )
195     {
196         return current.successor.get() == current;
197     }
198 
199     private boolean isQueueFull()
200     {
201         return isLastNode( tailTestClassQueue );
202     }
203 
204     /**
205      * thread-safety: Must be called from single thread like here the reader thread only.
206      */
207     private boolean addTestClassToQueue( String item )
208     {
209         if ( tailTestClassQueue.item == null )
210         {
211             tailTestClassQueue.item = item;
212             Node newNode = new Node();
213             tailTestClassQueue.successor.set( newNode );
214             tailTestClassQueue = newNode;
215             return true;
216         }
217         else
218         {
219             return false;
220         }
221     }
222 
223     /**
224      * After this method returns the queue is closed, new item cannot be added and method
225      * {@link #isQueueFull()} returns true.
226      */
227     @SuppressWarnings( { "all", "checkstyle:needbraces", "checkstyle:emptystatement" } )
228     public void makeQueueFull()
229     {
230         // order between (#compareAndSet, and #get) matters in multithreading
231         for ( Node tail = this.tailTestClassQueue;
232               !tail.successor.compareAndSet( null, tail ) && tail.successor.get() != tail;
233               tail = tail.successor.get() );
234     }
235 
236     /**
237      * thread-safety: Must be called from single thread like here the reader thread only.
238      */
239     private void insertToQueue( Command cmd )
240     {
241         MasterProcessCommand expectedCommandType = cmd.getCommandType();
242         switch ( expectedCommandType )
243         {
244             case RUN_CLASS:
245                 addTestClassToQueue( cmd.getData() );
246                 break;
247             case TEST_SET_FINISHED:
248                 makeQueueFull();
249                 break;
250             default:
251                 // checkstyle noop
252                 break;
253         }
254     }
255 
256     private void insertToListeners( Command cmd )
257     {
258         MasterProcessCommand expectedCommandType = cmd.getCommandType();
259         for ( TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener> listenerWrapper
260             : MasterProcessReader.this.listeners )
261         {
262             MasterProcessCommand commandType = listenerWrapper.getP1();
263             MasterProcessListener listener = listenerWrapper.getP2();
264             if ( commandType == null || commandType == expectedCommandType )
265             {
266                 listener.update( cmd );
267             }
268         }
269     }
270 
271     /**
272      * thread-safety: Must be called from single thread like here the reader thread only.
273      */
274     private void insert( Command cmd )
275     {
276         insertToQueue( cmd );
277         insertToListeners( cmd );
278     }
279 
280     private final class ClassesIterable
281         implements Iterable<String>
282     {
283         private final Node head;
284         private final PrintStream originalOutStream;
285 
286         ClassesIterable( Node head, PrintStream originalOutStream )
287         {
288             this.head = head;
289             this.originalOutStream = originalOutStream;
290         }
291 
292         public Iterator<String> iterator()
293         {
294             return new ClassesIterator( head, originalOutStream );
295         }
296     }
297 
298     private final class ClassesIterator
299         implements Iterator<String>
300     {
301         private final PrintStream originalOutStream;
302 
303         private Node current;
304 
305         private String clazz;
306 
307         private ClassesIterator( Node current, PrintStream originalOutStream )
308         {
309             this.current = current;
310             this.originalOutStream = originalOutStream;
311         }
312 
313         public boolean hasNext()
314         {
315             popUnread();
316             return isNotBlank( clazz );
317         }
318 
319         public String next()
320         {
321             popUnread();
322             try
323             {
324                 if ( isBlank( clazz ) )
325                 {
326                     throw new NoSuchElementException();
327                 }
328                 else
329                 {
330                     return clazz;
331                 }
332             }
333             finally
334             {
335                 clazz = null;
336             }
337         }
338 
339         public void remove()
340         {
341             throw new UnsupportedOperationException();
342         }
343 
344         private void popUnread()
345         {
346             if ( isStopped() )
347             {
348                 clazz = null;
349                 return;
350             }
351 
352             if ( isBlank( clazz ) )
353             {
354                 do
355                 {
356                     requestNextTest();
357                     if ( isLastNode( current ) )
358                     {
359                         clazz = null;
360                     }
361                     else if ( current.item == null )
362                     {
363                         do
364                         {
365                             await();
366                             /**
367                              * {@link java.util.concurrent.locks.LockSupport#park()}
368                              * may spuriously (that is, for no reason) return, therefore the loop here.
369                              * Could be waken up by System.exit or closing the stream.
370                              */
371                             if ( isStopped() )
372                             {
373                                 clazz = null;
374                                 return;
375                             }
376                         } while ( current.item == null && !isLastNode( current ) );
377                         clazz = current.item;
378                         current = current.successor.get();
379                     }
380                     else
381                     {
382                         clazz = current.item;
383                         current = current.successor.get();
384                     }
385                 }
386                 while ( tryNullWhiteClass() );
387             }
388 
389             if ( isStopped() )
390             {
391                 clazz = null;
392             }
393         }
394 
395         private boolean tryNullWhiteClass()
396         {
397             if ( clazz != null && isBlank( clazz ) )
398             {
399                 clazz = null;
400                 return true;
401             }
402             else
403             {
404                 return false;
405             }
406         }
407 
408         private void requestNextTest()
409         {
410             byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
411             originalOutStream.write( encoded, 0, encoded.length );
412         }
413     }
414 
415     /**
416      * thread-safety: Must be called from single thread like here the reader thread only.
417      */
418     private Command read( DataInputStream stdIn )
419         throws IOException
420     {
421         Command command = decode( stdIn );
422         if ( command != null )
423         {
424             insertToQueue( command );
425         }
426         return command;
427     }
428 
429     private void await()
430     {
431         final Thread currentThread = Thread.currentThread();
432         try
433         {
434             waiters.add( currentThread );
435             park();
436         }
437         finally
438         {
439             waiters.remove( currentThread );
440         }
441     }
442 
443     private void wakeupWaiters()
444     {
445         for ( Thread waiter : waiters )
446         {
447             unpark( waiter );
448         }
449     }
450 
451     private final class CommandRunnable
452         implements Runnable
453     {
454         public void run()
455         {
456             MasterProcessReader.this.startMonitor.countDown();
457             DataInputStream stdIn = new DataInputStream( System.in );
458             boolean isTestSetFinished = false;
459             try
460             {
461                 while ( MasterProcessReader.this.state.get() == RUNNABLE )
462                 {
463                     Command command = read( stdIn );
464                     if ( command == null )
465                     {
466                         System.err.println( "[SUREFIRE] std/in stream corrupted: first sequence not recognized" );
467                         break;
468                     }
469                     else
470                     {
471                         switch ( command.getCommandType() )
472                         {
473                             case TEST_SET_FINISHED:
474                                 isTestSetFinished = true;
475                                 wakeupWaiters();
476                                 break;
477                             case RUN_CLASS:
478                                 wakeupWaiters();
479                                 break;
480                             case SHUTDOWN:
481                                 insertToQueue( Command.TEST_SET_FINISHED );
482                                 wakeupWaiters();
483                                 break;
484                             default:
485                                 // checkstyle do nothing
486                                 break;
487                         }
488 
489                         insertToListeners( command );
490                     }
491                 }
492             }
493             catch ( EOFException e )
494             {
495                 MasterProcessReader.this.state.set( TERMINATED );
496                 if ( !isTestSetFinished )
497                 {
498                     exitByConfiguration();
499                     // does not go to finally
500                 }
501             }
502             catch ( IOException e )
503             {
504                 MasterProcessReader.this.state.set( TERMINATED );
505                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
506                 if ( !( e.getCause() instanceof InterruptedException ) )
507                 {
508                     System.err.println( "[SUREFIRE] std/in stream corrupted" );
509                     e.printStackTrace();
510                 }
511             }
512             finally
513             {
514                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
515                 if ( !isTestSetFinished )
516                 {
517                     insert( Command.TEST_SET_FINISHED );
518                 }
519                 wakeupWaiters();
520             }
521         }
522 
523         /**
524          * thread-safety: Must be called from single thread like here the reader thread only.
525          */
526         private void exitByConfiguration()
527         {
528             Shutdown shutdown = MasterProcessReader.this.shutdown; // won't read inconsistent changes through the stack
529             if ( shutdown != null )
530             {
531                 insert( Command.TEST_SET_FINISHED ); // lazily
532                 wakeupWaiters();
533                 insertToListeners( toShutdown( shutdown ) );
534                 switch ( shutdown )
535                 {
536                     case EXIT:
537                         System.exit( 1 );
538                     case KILL:
539                         Runtime.getRuntime().halt( 1 );
540                     case DEFAULT:
541                     default:
542                         // should not happen; otherwise you missed enum case
543                         break;
544                 }
545             }
546         }
547     }
548 }