View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24  import org.apache.maven.surefire.testset.TestSetFailedException;
25  
26  import java.io.DataInputStream;
27  import java.io.EOFException;
28  import java.io.IOException;
29  import java.util.Iterator;
30  import java.util.NoSuchElementException;
31  import java.util.Queue;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.CopyOnWriteArrayList;
34  import java.util.concurrent.CountDownLatch;
35  import java.util.concurrent.Semaphore;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import static java.util.Objects.requireNonNull;
39  import static java.lang.Thread.State.NEW;
40  import static java.lang.Thread.State.RUNNABLE;
41  import static java.lang.Thread.State.TERMINATED;
42  import static java.lang.StrictMath.max;
43  import static org.apache.maven.surefire.booter.Command.toShutdown;
44  import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
45  import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
46  import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
47  import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
48  import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
49  import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
50  import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
51  import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
52  import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
53  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
54  
55  /**
56   * Reader of commands coming from plugin(master) process.
57   *
58   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
59   * @since 2.19
60   */
61  public final class CommandReader
62  {
63      private static final String LAST_TEST_SYMBOL = "";
64  
65      private static final CommandReader READER = new CommandReader();
66  
67      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
68  
69      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
70  
71      private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
72  
73      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
74  
75      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
76  
77      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
78  
79      private volatile Shutdown shutdown;
80  
81      private int iteratedCount;
82  
83      private volatile ConsoleLogger logger = new NullConsoleLogger();
84  
85      private CommandReader()
86      {
87      }
88  
89      public static CommandReader getReader()
90      {
91          final CommandReader reader = READER;
92          if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
93          {
94              reader.commandThread.start();
95          }
96          return reader;
97      }
98  
99      public CommandReader setShutdown( Shutdown shutdown )
100     {
101         this.shutdown = shutdown;
102         return this;
103     }
104 
105     public CommandReader setLogger( ConsoleLogger logger )
106     {
107         this.logger = requireNonNull( logger, "null logger" );
108         return this;
109     }
110 
111     public boolean awaitStarted()
112         throws TestSetFailedException
113     {
114         if ( state.get() == RUNNABLE )
115         {
116             try
117             {
118                 startMonitor.await();
119                 return true;
120             }
121             catch ( InterruptedException e )
122             {
123                 DumpErrorSingleton.getSingleton().dumpException( e );
124                 throw new TestSetFailedException( e.getLocalizedMessage() );
125             }
126         }
127         else
128         {
129             return false;
130         }
131     }
132 
133     /**
134      * @param listener listener called with <b>Any</b> {@link MasterProcessCommand command type}
135      */
136     public void addListener( CommandListener listener )
137     {
138         listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
139     }
140 
141     public void addTestListener( CommandListener listener )
142     {
143         addListener( RUN_CLASS, listener );
144     }
145 
146     public void addTestsFinishedListener( CommandListener listener )
147     {
148         addListener( TEST_SET_FINISHED, listener );
149     }
150 
151     public void addSkipNextTestsListener( CommandListener listener )
152     {
153         addListener( SKIP_SINCE_NEXT_TEST, listener );
154     }
155 
156     public void addShutdownListener( CommandListener listener )
157     {
158         addListener( SHUTDOWN, listener );
159     }
160 
161     public void addNoopListener( CommandListener listener )
162     {
163         addListener( NOOP, listener );
164     }
165 
166     public void addByeAckListener( CommandListener listener )
167     {
168         addListener( BYE_ACK, listener );
169     }
170 
171     private void addListener( MasterProcessCommand cmd, CommandListener listener )
172     {
173         listeners.add( new BiProperty<>( cmd, listener ) );
174     }
175 
176     public void removeListener( CommandListener listener )
177     {
178         for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
179         {
180             BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
181             if ( listener == listenerWrapper.getP2() )
182             {
183                 it.remove();
184             }
185         }
186     }
187 
188     /**
189      * @return test classes which have been retrieved by {@link CommandReader#getIterableClasses(ForkedChannelEncoder)}.
190      */
191     Iterator<String> iterated()
192     {
193         return testClasses.subList( 0, iteratedCount ).iterator();
194     }
195 
196     /**
197      * The iterator can be used only in one Thread.
198      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
199      *
200      * @param eventChannel original stream in current JVM process
201      * @return Iterator with test classes lazily loaded as commands from the main process
202      */
203     Iterable<String> getIterableClasses( ForkedChannelEncoder eventChannel )
204     {
205         return new ClassesIterable( eventChannel );
206     }
207 
208     public void stop()
209     {
210         if ( !isStopped() )
211         {
212             state.set( TERMINATED );
213             makeQueueFull();
214             listeners.clear();
215             commandThread.interrupt();
216         }
217     }
218 
219     private boolean isStopped()
220     {
221         return state.get() == TERMINATED;
222     }
223 
224     /**
225      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
226      */
227     private boolean isQueueFull()
228     {
229         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
230         // and we need both atomic.
231         //
232         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
233         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
234         // The insert operation is concurrent with this method.
235         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
236         // allow adding a string after LAST_TEST_SYMBOL.
237         int searchFrom = max( 0, testClasses.size() - 1 );
238         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
239     }
240 
241     private void makeQueueFull()
242     {
243         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
244     }
245 
246     private boolean insertToQueue( String test )
247     {
248         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
249     }
250 
251     private final class ClassesIterable
252         implements Iterable<String>
253     {
254         private final ForkedChannelEncoder eventChannel;
255 
256         ClassesIterable( ForkedChannelEncoder eventChannel )
257         {
258             this.eventChannel = eventChannel;
259         }
260 
261         @Override
262         public Iterator<String> iterator()
263         {
264             return new ClassesIterator( eventChannel );
265         }
266     }
267 
268     private final class ClassesIterator
269         implements Iterator<String>
270     {
271         private final ForkedChannelEncoder eventChannel;
272 
273         private String clazz;
274 
275         private int nextQueueIndex;
276 
277         private ClassesIterator( ForkedChannelEncoder eventChannel )
278         {
279             this.eventChannel = eventChannel;
280         }
281 
282         @Override
283         public boolean hasNext()
284         {
285             popUnread();
286             return isNotBlank( clazz );
287         }
288 
289         @Override
290         public String next()
291         {
292             popUnread();
293             try
294             {
295                 if ( isBlank( clazz ) )
296                 {
297                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
298                 }
299                 else
300                 {
301                     return clazz;
302                 }
303             }
304             finally
305             {
306                 clazz = null;
307             }
308         }
309 
310         @Override
311         public void remove()
312         {
313             throw new UnsupportedOperationException();
314         }
315 
316         private void popUnread()
317         {
318             if ( shouldFinish() )
319             {
320                 clazz = null;
321                 return;
322             }
323 
324             if ( isBlank( clazz ) )
325             {
326                 requestNextTest();
327                 CommandReader.this.awaitNextTest();
328                 if ( shouldFinish() )
329                 {
330                     clazz = null;
331                     return;
332                 }
333                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
334                 CommandReader.this.iteratedCount = nextQueueIndex;
335             }
336 
337             if ( CommandReader.this.isStopped() )
338             {
339                 clazz = null;
340             }
341         }
342 
343         private void requestNextTest()
344         {
345             eventChannel.acquireNextTest();
346         }
347 
348         private boolean shouldFinish()
349         {
350             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
351             return CommandReader.this.isStopped() || wasLastTestRead;
352         }
353 
354         private boolean isEndSymbolAt( int index )
355         {
356             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
357         }
358     }
359 
360     private void awaitNextTest()
361     {
362         nextCommandNotifier.acquireUninterruptibly();
363     }
364 
365     private void wakeupIterator()
366     {
367         nextCommandNotifier.release();
368     }
369 
370     private final class CommandRunnable
371         implements Runnable
372     {
373         @Override
374         public void run()
375         {
376             CommandReader.this.startMonitor.countDown();
377             DataInputStream stdIn = new DataInputStream( System.in );
378             boolean isTestSetFinished = false;
379             try
380             {
381                 while ( CommandReader.this.state.get() == RUNNABLE )
382                 {
383                     Command command = decode( stdIn );
384                     if ( command == null )
385                     {
386                         String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
387                         DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
388                         logger.error( errorMessage );
389                         break;
390                     }
391                     else
392                     {
393                         switch ( command.getCommandType() )
394                         {
395                             case RUN_CLASS:
396                                 String test = command.getData();
397                                 boolean inserted = CommandReader.this.insertToQueue( test );
398                                 if ( inserted )
399                                 {
400                                     CommandReader.this.wakeupIterator();
401                                     callListeners( command );
402                                 }
403                                 break;
404                             case TEST_SET_FINISHED:
405                                 CommandReader.this.makeQueueFull();
406                                 isTestSetFinished = true;
407                                 CommandReader.this.wakeupIterator();
408                                 callListeners( command );
409                                 break;
410                             case SHUTDOWN:
411                                 CommandReader.this.makeQueueFull();
412                                 CommandReader.this.wakeupIterator();
413                                 callListeners( command );
414                                 break;
415                             default:
416                                 callListeners( command );
417                                 break;
418                         }
419                     }
420                 }
421             }
422             catch ( EOFException e )
423             {
424                 CommandReader.this.state.set( TERMINATED );
425                 if ( !isTestSetFinished )
426                 {
427                     String msg = "TestSet has not finished before stream error has appeared >> "
428                                          + "initializing exit by non-null configuration: "
429                                          + CommandReader.this.shutdown;
430                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
431 
432                     exitByConfiguration();
433                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
434                 }
435             }
436             catch ( IOException e )
437             {
438                 CommandReader.this.state.set( TERMINATED );
439                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
440                 if ( !( e.getCause() instanceof InterruptedException ) )
441                 {
442                     String msg = "[SUREFIRE] std/in stream corrupted";
443                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
444                     logger.error( msg, e );
445                 }
446             }
447             finally
448             {
449                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
450                 if ( !isTestSetFinished )
451                 {
452                     CommandReader.this.makeQueueFull();
453                 }
454                 CommandReader.this.wakeupIterator();
455             }
456         }
457 
458         private void callListeners( Command cmd )
459         {
460             MasterProcessCommand expectedCommandType = cmd.getCommandType();
461             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
462             {
463                 MasterProcessCommand commandType = listenerWrapper.getP1();
464                 CommandListener listener = listenerWrapper.getP2();
465                 if ( commandType == null || commandType == expectedCommandType )
466                 {
467                     listener.update( cmd );
468                 }
469             }
470         }
471 
472         private void exitByConfiguration()
473         {
474             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
475             if ( shutdown != null )
476             {
477                 CommandReader.this.makeQueueFull();
478                 CommandReader.this.wakeupIterator();
479                 callListeners( toShutdown( shutdown ) );
480             }
481         }
482     }
483 }