View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.surefire.api.booter.BiProperty;
24  import org.apache.maven.surefire.api.booter.Command;
25  import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
26  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
27  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
28  import org.apache.maven.surefire.api.booter.MasterProcessCommand;
29  import org.apache.maven.surefire.api.booter.Shutdown;
30  import org.apache.maven.surefire.api.provider.CommandChainReader;
31  import org.apache.maven.surefire.api.provider.CommandListener;
32  import org.apache.maven.surefire.api.testset.TestSetFailedException;
33  
34  import java.io.EOFException;
35  import java.io.IOException;
36  import java.io.InterruptedIOException;
37  import java.nio.channels.ClosedChannelException;
38  import java.util.Iterator;
39  import java.util.NoSuchElementException;
40  import java.util.Queue;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.CopyOnWriteArrayList;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.Semaphore;
45  import java.util.concurrent.atomic.AtomicReference;
46  
47  import static java.lang.StrictMath.max;
48  import static java.lang.Thread.State.NEW;
49  import static java.lang.Thread.State.RUNNABLE;
50  import static java.lang.Thread.State.TERMINATED;
51  import static java.util.Objects.requireNonNull;
52  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
53  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
54  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
55  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
56  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
57  import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
58  import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
59  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
60  
61  /**
62   * Reader of commands coming from plugin(master) process.
63   *
64   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
65   * @since 2.19
66   */
67  public final class CommandReader implements CommandChainReader
68  {
69      private static final String LAST_TEST_SYMBOL = "";
70  
71      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
72  
73      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74  
75      private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
76  
77      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78  
79      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80  
81      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
82  
83      private final MasterProcessChannelDecoder decoder;
84  
85      private final Shutdown shutdown;
86  
87      private final ConsoleLogger logger;
88  
89      private int iteratedCount;
90  
91      public CommandReader( MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger )
92      {
93          this.decoder = requireNonNull( decoder, "null decoder" );
94          this.shutdown = requireNonNull( shutdown, "null Shutdown config" );
95          this.logger = requireNonNull( logger, "null logger" );
96          state.set( RUNNABLE );
97          commandThread.start();
98      }
99  
100     @Override
101     public boolean awaitStarted()
102         throws TestSetFailedException
103     {
104         if ( state.get() == RUNNABLE )
105         {
106             try
107             {
108                 startMonitor.await();
109                 return true;
110             }
111             catch ( InterruptedException e )
112             {
113                 DumpErrorSingleton.getSingleton().dumpException( e );
114                 throw new TestSetFailedException( e.getLocalizedMessage() );
115             }
116         }
117         else
118         {
119             return false;
120         }
121     }
122 
123     @Override
124     public void addSkipNextTestsListener( CommandListener listener )
125     {
126         addListener( SKIP_SINCE_NEXT_TEST, listener );
127     }
128 
129     @Override
130     public void addShutdownListener( CommandListener listener )
131     {
132         addListener( SHUTDOWN, listener );
133     }
134 
135     public void addNoopListener( CommandListener listener )
136     {
137         addListener( NOOP, listener );
138     }
139 
140     public void addByeAckListener( CommandListener listener )
141     {
142         addListener( BYE_ACK, listener );
143     }
144 
145     private void addListener( MasterProcessCommand cmd, CommandListener listener )
146     {
147         listeners.add( new BiProperty<>( cmd, listener ) );
148     }
149 
150     /**
151      * @return test classes which have been retrieved by
152      * {@link CommandReader#getIterableClasses(MasterProcessChannelEncoder)}.
153      */
154     Iterator<String> iterated()
155     {
156         return testClasses.subList( 0, iteratedCount ).iterator();
157     }
158 
159     /**
160      * The iterator can be used only in one Thread.
161      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
162      *
163      * @param eventChannel original stream in current JVM process
164      * @return Iterator with test classes lazily loaded as commands from the main process
165      */
166     Iterable<String> getIterableClasses( MasterProcessChannelEncoder eventChannel )
167     {
168         return new ClassesIterable( eventChannel );
169     }
170 
171     public void stop()
172     {
173         if ( !isStopped() )
174         {
175             state.set( TERMINATED );
176             makeQueueFull();
177             listeners.clear();
178             commandThread.interrupt();
179         }
180     }
181 
182     private boolean isStopped()
183     {
184         return state.get() == TERMINATED;
185     }
186 
187     /**
188      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
189      */
190     private boolean isQueueFull()
191     {
192         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
193         // and we need both atomic.
194         //
195         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
196         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
197         // The insert operation is concurrent with this method.
198         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
199         // allow adding a string after LAST_TEST_SYMBOL.
200         int searchFrom = max( 0, testClasses.size() - 1 );
201         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
202     }
203 
204     private void makeQueueFull()
205     {
206         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
207     }
208 
209     private boolean insertToQueue( String test )
210     {
211         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
212     }
213 
214     private final class ClassesIterable
215         implements Iterable<String>
216     {
217         private final MasterProcessChannelEncoder eventChannel;
218 
219         ClassesIterable( MasterProcessChannelEncoder eventChannel )
220         {
221             this.eventChannel = eventChannel;
222         }
223 
224         @Override
225         public Iterator<String> iterator()
226         {
227             return new ClassesIterator( eventChannel );
228         }
229     }
230 
231     private final class ClassesIterator
232         implements Iterator<String>
233     {
234         private final MasterProcessChannelEncoder eventChannel;
235 
236         private String clazz;
237 
238         private int nextQueueIndex;
239 
240         private ClassesIterator( MasterProcessChannelEncoder eventChannel )
241         {
242             this.eventChannel = eventChannel;
243         }
244 
245         @Override
246         public boolean hasNext()
247         {
248             popUnread();
249             return isNotBlank( clazz );
250         }
251 
252         @Override
253         public String next()
254         {
255             popUnread();
256             try
257             {
258                 if ( isBlank( clazz ) )
259                 {
260                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
261                 }
262                 else
263                 {
264                     return clazz;
265                 }
266             }
267             finally
268             {
269                 clazz = null;
270             }
271         }
272 
273         @Override
274         public void remove()
275         {
276             throw new UnsupportedOperationException();
277         }
278 
279         private void popUnread()
280         {
281             if ( shouldFinish() )
282             {
283                 clazz = null;
284                 return;
285             }
286 
287             if ( isBlank( clazz ) )
288             {
289                 requestNextTest();
290                 CommandReader.this.awaitNextTest();
291                 if ( shouldFinish() )
292                 {
293                     clazz = null;
294                     return;
295                 }
296                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
297                 CommandReader.this.iteratedCount = nextQueueIndex;
298             }
299 
300             if ( CommandReader.this.isStopped() )
301             {
302                 clazz = null;
303             }
304         }
305 
306         private void requestNextTest()
307         {
308             eventChannel.acquireNextTest();
309         }
310 
311         private boolean shouldFinish()
312         {
313             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
314             return CommandReader.this.isStopped() || wasLastTestRead;
315         }
316 
317         private boolean isEndSymbolAt( int index )
318         {
319             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
320         }
321     }
322 
323     private void awaitNextTest()
324     {
325         nextCommandNotifier.acquireUninterruptibly();
326     }
327 
328     private void wakeupIterator()
329     {
330         nextCommandNotifier.release();
331     }
332 
333     private final class CommandRunnable
334         implements Runnable
335     {
336         @Override
337         public void run()
338         {
339             CommandReader.this.startMonitor.countDown();
340             boolean isTestSetFinished = false;
341             try ( MasterProcessChannelDecoder commandReader = CommandReader.this.decoder )
342             {
343                 while ( CommandReader.this.state.get() == RUNNABLE )
344                 {
345                     Command command = commandReader.decode();
346                     switch ( command.getCommandType() )
347                     {
348                         case RUN_CLASS:
349                             String test = command.getData();
350                             boolean inserted = CommandReader.this.insertToQueue( test );
351                             if ( inserted )
352                             {
353                                 CommandReader.this.wakeupIterator();
354                                 callListeners( command );
355                             }
356                             break;
357                         case TEST_SET_FINISHED:
358                             CommandReader.this.makeQueueFull();
359                             isTestSetFinished = true;
360                             CommandReader.this.wakeupIterator();
361                             callListeners( command );
362                             break;
363                         case SHUTDOWN:
364                             CommandReader.this.makeQueueFull();
365                             CommandReader.this.wakeupIterator();
366                             callListeners( command );
367                                 break;
368                         case BYE_ACK:
369                             callListeners( command );
370                             // After SHUTDOWN no more commands can come.
371                             // Hence, do NOT go back to blocking in I/O.
372                             CommandReader.this.state.set( TERMINATED );
373                             break;
374                         default:
375                             callListeners( command );
376                             break;
377                     }
378                 }
379             }
380             catch ( EOFException | ClosedChannelException e )
381             {
382                 CommandReader.this.state.set( TERMINATED );
383                 if ( !isTestSetFinished )
384                 {
385                     String msg = "TestSet has not finished before stream error has appeared >> "
386                                          + "initializing exit by non-null configuration: "
387                                          + CommandReader.this.shutdown;
388                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
389 
390                     exitByConfiguration();
391                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
392                 }
393             }
394             catch ( IOException e )
395             {
396                 CommandReader.this.state.set( TERMINATED );
397                 // If #stop() method is called, reader thread is interrupted
398                 // and exception is InterruptedIOException or its cause is InterruptedException.
399                 if ( !( e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException ) )
400                 {
401                     String msg = "[SUREFIRE] std/in stream corrupted";
402                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
403                     CommandReader.this.logger.error( msg, e );
404                 }
405             }
406             finally
407             {
408                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
409                 if ( !isTestSetFinished )
410                 {
411                     CommandReader.this.makeQueueFull();
412                 }
413                 CommandReader.this.wakeupIterator();
414             }
415         }
416 
417         private void callListeners( Command cmd )
418         {
419             MasterProcessCommand expectedCommandType = cmd.getCommandType();
420             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
421             {
422                 MasterProcessCommand commandType = listenerWrapper.getP1();
423                 CommandListener listener = listenerWrapper.getP2();
424                 if ( commandType == null || commandType == expectedCommandType )
425                 {
426                     listener.update( cmd );
427                 }
428             }
429         }
430 
431         private void exitByConfiguration()
432         {
433             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
434             if ( shutdown != null )
435             {
436                 CommandReader.this.makeQueueFull();
437                 CommandReader.this.wakeupIterator();
438                 callListeners( toShutdown( shutdown ) );
439             }
440         }
441     }
442 }