1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24 import org.apache.maven.surefire.testset.TestSetFailedException;
25
26 import java.io.DataInputStream;
27 import java.io.EOFException;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.util.Iterator;
31 import java.util.NoSuchElementException;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import static java.util.Objects.requireNonNull;
40 import static java.lang.Thread.State.NEW;
41 import static java.lang.Thread.State.RUNNABLE;
42 import static java.lang.Thread.State.TERMINATED;
43 import static java.lang.StrictMath.max;
44 import static org.apache.maven.surefire.booter.Command.toShutdown;
45 import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
48 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
49 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
50 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
51 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
52 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
53 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
54 import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
55 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
56 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
57
58
59
60
61
62
63
64 public final class CommandReader
65 {
66 private static final String LAST_TEST_SYMBOL = "";
67
68 private static final CommandReader READER = new CommandReader();
69
70 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
71
72 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
73
74 private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
75
76 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
77
78 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
79
80 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
81
82 private volatile Shutdown shutdown;
83
84 private int iteratedCount;
85
86 private volatile ConsoleLogger logger = new NullConsoleLogger();
87
88 private CommandReader()
89 {
90 }
91
92 public static CommandReader getReader()
93 {
94 final CommandReader reader = READER;
95 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
96 {
97 reader.commandThread.start();
98 }
99 return reader;
100 }
101
102 public CommandReader setShutdown( Shutdown shutdown )
103 {
104 this.shutdown = shutdown;
105 return this;
106 }
107
108 public CommandReader setLogger( ConsoleLogger logger )
109 {
110 this.logger = requireNonNull( logger, "null logger" );
111 return this;
112 }
113
114 public boolean awaitStarted()
115 throws TestSetFailedException
116 {
117 if ( state.get() == RUNNABLE )
118 {
119 try
120 {
121 startMonitor.await();
122 return true;
123 }
124 catch ( InterruptedException e )
125 {
126 DumpErrorSingleton.getSingleton().dumpException( e );
127 throw new TestSetFailedException( e.getLocalizedMessage() );
128 }
129 }
130 else
131 {
132 return false;
133 }
134 }
135
136
137
138
139 public void addListener( CommandListener listener )
140 {
141 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
142 }
143
144 public void addTestListener( CommandListener listener )
145 {
146 addListener( RUN_CLASS, listener );
147 }
148
149 public void addTestsFinishedListener( CommandListener listener )
150 {
151 addListener( TEST_SET_FINISHED, listener );
152 }
153
154 public void addSkipNextTestsListener( CommandListener listener )
155 {
156 addListener( SKIP_SINCE_NEXT_TEST, listener );
157 }
158
159 public void addShutdownListener( CommandListener listener )
160 {
161 addListener( SHUTDOWN, listener );
162 }
163
164 public void addNoopListener( CommandListener listener )
165 {
166 addListener( NOOP, listener );
167 }
168
169 public void addByeAckListener( CommandListener listener )
170 {
171 addListener( BYE_ACK, listener );
172 }
173
174 private void addListener( MasterProcessCommand cmd, CommandListener listener )
175 {
176 listeners.add( new BiProperty<>( cmd, listener ) );
177 }
178
179 public void removeListener( CommandListener listener )
180 {
181 for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
182 {
183 BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
184 if ( listener == listenerWrapper.getP2() )
185 {
186 it.remove();
187 }
188 }
189 }
190
191
192
193
194 Iterator<String> iterated()
195 {
196 return testClasses.subList( 0, iteratedCount ).iterator();
197 }
198
199
200
201
202
203
204
205
206 Iterable<String> getIterableClasses( PrintStream originalOutStream )
207 {
208 return new ClassesIterable( originalOutStream );
209 }
210
211 public void stop()
212 {
213 if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
214 {
215 makeQueueFull();
216 listeners.clear();
217 commandThread.interrupt();
218 }
219 }
220
221 private boolean isStopped()
222 {
223 return state.get() == TERMINATED;
224 }
225
226
227
228
229 private boolean isQueueFull()
230 {
231
232
233
234
235
236
237
238
239 int searchFrom = max( 0, testClasses.size() - 1 );
240 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
241 }
242
243 private void makeQueueFull()
244 {
245 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
246 }
247
248 private boolean insertToQueue( String test )
249 {
250 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
251 }
252
253 private final class ClassesIterable
254 implements Iterable<String>
255 {
256 private final PrintStream originalOutStream;
257
258 ClassesIterable( PrintStream originalOutStream )
259 {
260 this.originalOutStream = originalOutStream;
261 }
262
263 @Override
264 public Iterator<String> iterator()
265 {
266 return new ClassesIterator( originalOutStream );
267 }
268 }
269
270 private final class ClassesIterator
271 implements Iterator<String>
272 {
273 private final PrintStream originalOutStream;
274
275 private String clazz;
276
277 private int nextQueueIndex;
278
279 private ClassesIterator( PrintStream originalOutStream )
280 {
281 this.originalOutStream = originalOutStream;
282 }
283
284 @Override
285 public boolean hasNext()
286 {
287 popUnread();
288 return isNotBlank( clazz );
289 }
290
291 @Override
292 public String next()
293 {
294 popUnread();
295 try
296 {
297 if ( isBlank( clazz ) )
298 {
299 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
300 }
301 else
302 {
303 return clazz;
304 }
305 }
306 finally
307 {
308 clazz = null;
309 }
310 }
311
312 @Override
313 public void remove()
314 {
315 throw new UnsupportedOperationException();
316 }
317
318 private void popUnread()
319 {
320 if ( shouldFinish() )
321 {
322 clazz = null;
323 return;
324 }
325
326 if ( isBlank( clazz ) )
327 {
328 requestNextTest();
329 CommandReader.this.awaitNextTest();
330 if ( shouldFinish() )
331 {
332 clazz = null;
333 return;
334 }
335 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
336 CommandReader.this.iteratedCount = nextQueueIndex;
337 }
338
339 if ( CommandReader.this.isStopped() )
340 {
341 clazz = null;
342 }
343 }
344
345 private void requestNextTest()
346 {
347 byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
348 synchronized ( originalOutStream )
349 {
350 originalOutStream.write( encoded, 0, encoded.length );
351 originalOutStream.flush();
352 }
353 }
354
355 private boolean shouldFinish()
356 {
357 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
358 return CommandReader.this.isStopped() || wasLastTestRead;
359 }
360
361 private boolean isEndSymbolAt( int index )
362 {
363 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
364 }
365 }
366
367 private void awaitNextTest()
368 {
369 nextCommandNotifier.acquireUninterruptibly();
370 }
371
372 private void wakeupIterator()
373 {
374 nextCommandNotifier.release();
375 }
376
377 private final class CommandRunnable
378 implements Runnable
379 {
380 @Override
381 public void run()
382 {
383 CommandReader.this.startMonitor.countDown();
384 DataInputStream stdIn = new DataInputStream( System.in );
385 boolean isTestSetFinished = false;
386 try
387 {
388 while ( CommandReader.this.state.get() == RUNNABLE )
389 {
390 Command command = decode( stdIn );
391 if ( command == null )
392 {
393 String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
394 DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
395 logger.error( errorMessage );
396 break;
397 }
398 else
399 {
400 switch ( command.getCommandType() )
401 {
402 case RUN_CLASS:
403 String test = command.getData();
404 boolean inserted = CommandReader.this.insertToQueue( test );
405 if ( inserted )
406 {
407 CommandReader.this.wakeupIterator();
408 insertToListeners( command );
409 }
410 break;
411 case TEST_SET_FINISHED:
412 CommandReader.this.makeQueueFull();
413 isTestSetFinished = true;
414 CommandReader.this.wakeupIterator();
415 insertToListeners( command );
416 break;
417 case SHUTDOWN:
418 CommandReader.this.makeQueueFull();
419 CommandReader.this.wakeupIterator();
420 insertToListeners( command );
421 break;
422 default:
423 insertToListeners( command );
424 break;
425 }
426 }
427 }
428 }
429 catch ( EOFException e )
430 {
431 CommandReader.this.state.set( TERMINATED );
432 if ( !isTestSetFinished )
433 {
434 String msg = "TestSet has not finished before stream error has appeared >> "
435 + "initializing exit by non-null configuration: "
436 + CommandReader.this.shutdown;
437 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
438
439 exitByConfiguration();
440
441 }
442 }
443 catch ( IOException e )
444 {
445 CommandReader.this.state.set( TERMINATED );
446
447 if ( !( e.getCause() instanceof InterruptedException ) )
448 {
449 String msg = "[SUREFIRE] std/in stream corrupted";
450 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
451 logger.error( msg, e );
452 }
453 }
454 finally
455 {
456
457 if ( !isTestSetFinished )
458 {
459 CommandReader.this.makeQueueFull();
460 }
461 CommandReader.this.wakeupIterator();
462 }
463 }
464
465 private void insertToListeners( Command cmd )
466 {
467 MasterProcessCommand expectedCommandType = cmd.getCommandType();
468 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
469 {
470 MasterProcessCommand commandType = listenerWrapper.getP1();
471 CommandListener listener = listenerWrapper.getP2();
472 if ( commandType == null || commandType == expectedCommandType )
473 {
474 listener.update( cmd );
475 }
476 }
477 }
478
479 private void exitByConfiguration()
480 {
481 Shutdown shutdown = CommandReader.this.shutdown;
482 if ( shutdown != null )
483 {
484 CommandReader.this.makeQueueFull();
485 CommandReader.this.wakeupIterator();
486 insertToListeners( toShutdown( shutdown ) );
487 if ( shutdown.isExit() )
488 {
489 System.exit( 1 );
490 }
491 else if ( shutdown.isKill() )
492 {
493 Runtime.getRuntime().halt( 1 );
494 }
495
496 }
497 }
498 }
499
500 }