1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.surefire.api.booter.BiProperty;
24 import org.apache.maven.surefire.api.booter.Command;
25 import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
26 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
27 import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
28 import org.apache.maven.surefire.api.booter.MasterProcessCommand;
29 import org.apache.maven.surefire.api.booter.Shutdown;
30 import org.apache.maven.surefire.api.provider.CommandChainReader;
31 import org.apache.maven.surefire.api.provider.CommandListener;
32 import org.apache.maven.surefire.api.testset.TestSetFailedException;
33
34 import java.io.EOFException;
35 import java.io.IOException;
36 import java.io.InterruptedIOException;
37 import java.nio.channels.ClosedChannelException;
38 import java.util.Iterator;
39 import java.util.NoSuchElementException;
40 import java.util.Queue;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.CopyOnWriteArrayList;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.atomic.AtomicReference;
46
47 import static java.lang.StrictMath.max;
48 import static java.lang.Thread.State.NEW;
49 import static java.lang.Thread.State.RUNNABLE;
50 import static java.lang.Thread.State.TERMINATED;
51 import static java.util.Objects.requireNonNull;
52 import static org.apache.maven.surefire.api.booter.Command.toShutdown;
53 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
54 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
55 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
56 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
57 import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
58 import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
59 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
60
61
62
63
64
65
66
67 public final class CommandReader implements CommandChainReader
68 {
69 private static final String LAST_TEST_SYMBOL = "";
70
71 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
72
73 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74
75 private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
76
77 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78
79 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80
81 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
82
83 private final MasterProcessChannelDecoder decoder;
84
85 private final Shutdown shutdown;
86
87 private final ConsoleLogger logger;
88
89 private int iteratedCount;
90
91 public CommandReader( MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger )
92 {
93 this.decoder = requireNonNull( decoder, "null decoder" );
94 this.shutdown = requireNonNull( shutdown, "null Shutdown config" );
95 this.logger = requireNonNull( logger, "null logger" );
96 state.set( RUNNABLE );
97 commandThread.start();
98 }
99
100 @Override
101 public boolean awaitStarted()
102 throws TestSetFailedException
103 {
104 if ( state.get() == RUNNABLE )
105 {
106 try
107 {
108 startMonitor.await();
109 return true;
110 }
111 catch ( InterruptedException e )
112 {
113 DumpErrorSingleton.getSingleton().dumpException( e );
114 throw new TestSetFailedException( e.getLocalizedMessage() );
115 }
116 }
117 else
118 {
119 return false;
120 }
121 }
122
123 @Override
124 public void addSkipNextTestsListener( CommandListener listener )
125 {
126 addListener( SKIP_SINCE_NEXT_TEST, listener );
127 }
128
129 @Override
130 public void addShutdownListener( CommandListener listener )
131 {
132 addListener( SHUTDOWN, listener );
133 }
134
135 public void addNoopListener( CommandListener listener )
136 {
137 addListener( NOOP, listener );
138 }
139
140 public void addByeAckListener( CommandListener listener )
141 {
142 addListener( BYE_ACK, listener );
143 }
144
145 private void addListener( MasterProcessCommand cmd, CommandListener listener )
146 {
147 listeners.add( new BiProperty<>( cmd, listener ) );
148 }
149
150
151
152
153
154 Iterator<String> iterated()
155 {
156 return testClasses.subList( 0, iteratedCount ).iterator();
157 }
158
159
160
161
162
163
164
165
166 Iterable<String> getIterableClasses( MasterProcessChannelEncoder eventChannel )
167 {
168 return new ClassesIterable( eventChannel );
169 }
170
171 public void stop()
172 {
173 if ( !isStopped() )
174 {
175 state.set( TERMINATED );
176 makeQueueFull();
177 listeners.clear();
178 commandThread.interrupt();
179 }
180 }
181
182 private boolean isStopped()
183 {
184 return state.get() == TERMINATED;
185 }
186
187
188
189
190 private boolean isQueueFull()
191 {
192
193
194
195
196
197
198
199
200 int searchFrom = max( 0, testClasses.size() - 1 );
201 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
202 }
203
204 private void makeQueueFull()
205 {
206 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
207 }
208
209 private boolean insertToQueue( String test )
210 {
211 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
212 }
213
214 private final class ClassesIterable
215 implements Iterable<String>
216 {
217 private final MasterProcessChannelEncoder eventChannel;
218
219 ClassesIterable( MasterProcessChannelEncoder eventChannel )
220 {
221 this.eventChannel = eventChannel;
222 }
223
224 @Override
225 public Iterator<String> iterator()
226 {
227 return new ClassesIterator( eventChannel );
228 }
229 }
230
231 private final class ClassesIterator
232 implements Iterator<String>
233 {
234 private final MasterProcessChannelEncoder eventChannel;
235
236 private String clazz;
237
238 private int nextQueueIndex;
239
240 private ClassesIterator( MasterProcessChannelEncoder eventChannel )
241 {
242 this.eventChannel = eventChannel;
243 }
244
245 @Override
246 public boolean hasNext()
247 {
248 popUnread();
249 return isNotBlank( clazz );
250 }
251
252 @Override
253 public String next()
254 {
255 popUnread();
256 try
257 {
258 if ( isBlank( clazz ) )
259 {
260 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
261 }
262 else
263 {
264 return clazz;
265 }
266 }
267 finally
268 {
269 clazz = null;
270 }
271 }
272
273 @Override
274 public void remove()
275 {
276 throw new UnsupportedOperationException();
277 }
278
279 private void popUnread()
280 {
281 if ( shouldFinish() )
282 {
283 clazz = null;
284 return;
285 }
286
287 if ( isBlank( clazz ) )
288 {
289 requestNextTest();
290 CommandReader.this.awaitNextTest();
291 if ( shouldFinish() )
292 {
293 clazz = null;
294 return;
295 }
296 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
297 CommandReader.this.iteratedCount = nextQueueIndex;
298 }
299
300 if ( CommandReader.this.isStopped() )
301 {
302 clazz = null;
303 }
304 }
305
306 private void requestNextTest()
307 {
308 eventChannel.acquireNextTest();
309 }
310
311 private boolean shouldFinish()
312 {
313 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
314 return CommandReader.this.isStopped() || wasLastTestRead;
315 }
316
317 private boolean isEndSymbolAt( int index )
318 {
319 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
320 }
321 }
322
323 private void awaitNextTest()
324 {
325 nextCommandNotifier.acquireUninterruptibly();
326 }
327
328 private void wakeupIterator()
329 {
330 nextCommandNotifier.release();
331 }
332
333 private final class CommandRunnable
334 implements Runnable
335 {
336 @Override
337 public void run()
338 {
339 CommandReader.this.startMonitor.countDown();
340 boolean isTestSetFinished = false;
341 try ( MasterProcessChannelDecoder commandReader = CommandReader.this.decoder )
342 {
343 while ( CommandReader.this.state.get() == RUNNABLE )
344 {
345 Command command = commandReader.decode();
346 switch ( command.getCommandType() )
347 {
348 case RUN_CLASS:
349 String test = command.getData();
350 boolean inserted = CommandReader.this.insertToQueue( test );
351 if ( inserted )
352 {
353 CommandReader.this.wakeupIterator();
354 callListeners( command );
355 }
356 break;
357 case TEST_SET_FINISHED:
358 CommandReader.this.makeQueueFull();
359 isTestSetFinished = true;
360 CommandReader.this.wakeupIterator();
361 callListeners( command );
362 break;
363 case SHUTDOWN:
364 CommandReader.this.makeQueueFull();
365 CommandReader.this.wakeupIterator();
366 callListeners( command );
367 break;
368 case BYE_ACK:
369 callListeners( command );
370
371
372 CommandReader.this.state.set( TERMINATED );
373 break;
374 default:
375 callListeners( command );
376 break;
377 }
378 }
379 }
380 catch ( EOFException | ClosedChannelException e )
381 {
382 CommandReader.this.state.set( TERMINATED );
383 if ( !isTestSetFinished )
384 {
385 String msg = "TestSet has not finished before stream error has appeared >> "
386 + "initializing exit by non-null configuration: "
387 + CommandReader.this.shutdown;
388 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
389
390 exitByConfiguration();
391
392 }
393 }
394 catch ( IOException e )
395 {
396 CommandReader.this.state.set( TERMINATED );
397
398
399 if ( !( e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException ) )
400 {
401 String msg = "[SUREFIRE] std/in stream corrupted";
402 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
403 CommandReader.this.logger.error( msg, e );
404 }
405 }
406 finally
407 {
408
409 if ( !isTestSetFinished )
410 {
411 CommandReader.this.makeQueueFull();
412 }
413 CommandReader.this.wakeupIterator();
414 }
415 }
416
417 private void callListeners( Command cmd )
418 {
419 MasterProcessCommand expectedCommandType = cmd.getCommandType();
420 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
421 {
422 MasterProcessCommand commandType = listenerWrapper.getP1();
423 CommandListener listener = listenerWrapper.getP2();
424 if ( commandType == null || commandType == expectedCommandType )
425 {
426 listener.update( cmd );
427 }
428 }
429 }
430
431 private void exitByConfiguration()
432 {
433 Shutdown shutdown = CommandReader.this.shutdown;
434 if ( shutdown != null )
435 {
436 CommandReader.this.makeQueueFull();
437 CommandReader.this.wakeupIterator();
438 callListeners( toShutdown( shutdown ) );
439 }
440 }
441 }
442 }