1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24 import org.apache.maven.surefire.testset.TestSetFailedException;
25
26 import java.io.DataInputStream;
27 import java.io.EOFException;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.util.Iterator;
31 import java.util.NoSuchElementException;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import static java.lang.Thread.State.NEW;
40 import static java.lang.Thread.State.RUNNABLE;
41 import static java.lang.Thread.State.TERMINATED;
42 import static java.lang.StrictMath.max;
43 import static org.apache.maven.surefire.booter.Command.toShutdown;
44 import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
45 import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
48 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
49 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
50 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
51 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
52 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53 import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
54 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
55 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
56 import static org.apache.maven.surefire.util.internal.ObjectUtils.requireNonNull;
57
58
59
60
61
62
63
64 public final class CommandReader
65 {
66 private static final String LAST_TEST_SYMBOL = "";
67
68 private static final CommandReader READER = new CommandReader();
69
70 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
71 = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
72
73 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74
75 private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
76
77 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78
79 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80
81 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
82
83 private volatile Shutdown shutdown;
84
85 private int iteratedCount;
86
87 private volatile ConsoleLogger logger = new NullConsoleLogger();
88
89 private CommandReader()
90 {
91 }
92
93 public static CommandReader getReader()
94 {
95 final CommandReader reader = READER;
96 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
97 {
98 reader.commandThread.start();
99 }
100 return reader;
101 }
102
103 public CommandReader setShutdown( Shutdown shutdown )
104 {
105 this.shutdown = shutdown;
106 return this;
107 }
108
109 public CommandReader setLogger( ConsoleLogger logger )
110 {
111 this.logger = requireNonNull( logger, "null logger" );
112 return this;
113 }
114
115 public boolean awaitStarted()
116 throws TestSetFailedException
117 {
118 if ( state.get() == RUNNABLE )
119 {
120 try
121 {
122 startMonitor.await();
123 return true;
124 }
125 catch ( InterruptedException e )
126 {
127 DumpErrorSingleton.getSingleton().dumpException( e );
128 throw new TestSetFailedException( e.getLocalizedMessage() );
129 }
130 }
131 else
132 {
133 return false;
134 }
135 }
136
137
138
139
140 public void addListener( CommandListener listener )
141 {
142 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
143 }
144
145 public void addTestListener( CommandListener listener )
146 {
147 addListener( RUN_CLASS, listener );
148 }
149
150 public void addTestsFinishedListener( CommandListener listener )
151 {
152 addListener( TEST_SET_FINISHED, listener );
153 }
154
155 public void addSkipNextTestsListener( CommandListener listener )
156 {
157 addListener( SKIP_SINCE_NEXT_TEST, listener );
158 }
159
160 public void addShutdownListener( CommandListener listener )
161 {
162 addListener( SHUTDOWN, listener );
163 }
164
165 public void addNoopListener( CommandListener listener )
166 {
167 addListener( NOOP, listener );
168 }
169
170 public void addByeAckListener( CommandListener listener )
171 {
172 addListener( BYE_ACK, listener );
173 }
174
175 private void addListener( MasterProcessCommand cmd, CommandListener listener )
176 {
177 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
178 }
179
180 public void removeListener( CommandListener listener )
181 {
182 for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
183 {
184 BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
185 if ( listener == listenerWrapper.getP2() )
186 {
187 it.remove();
188 }
189 }
190 }
191
192
193
194
195 Iterator<String> iterated()
196 {
197 return testClasses.subList( 0, iteratedCount ).iterator();
198 }
199
200
201
202
203
204
205
206
207 Iterable<String> getIterableClasses( PrintStream originalOutStream )
208 {
209 return new ClassesIterable( originalOutStream );
210 }
211
212 public void stop()
213 {
214 if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
215 {
216 makeQueueFull();
217 listeners.clear();
218 commandThread.interrupt();
219 }
220 }
221
222 private boolean isStopped()
223 {
224 return state.get() == TERMINATED;
225 }
226
227
228
229
230 private boolean isQueueFull()
231 {
232
233
234
235
236
237
238
239
240 int searchFrom = max( 0, testClasses.size() - 1 );
241 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
242 }
243
244 private void makeQueueFull()
245 {
246 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
247 }
248
249 private boolean insertToQueue( String test )
250 {
251 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
252 }
253
254 private final class ClassesIterable
255 implements Iterable<String>
256 {
257 private final PrintStream originalOutStream;
258
259 ClassesIterable( PrintStream originalOutStream )
260 {
261 this.originalOutStream = originalOutStream;
262 }
263
264 public Iterator<String> iterator()
265 {
266 return new ClassesIterator( originalOutStream );
267 }
268 }
269
270 private final class ClassesIterator
271 implements Iterator<String>
272 {
273 private final PrintStream originalOutStream;
274
275 private String clazz;
276
277 private int nextQueueIndex;
278
279 private ClassesIterator( PrintStream originalOutStream )
280 {
281 this.originalOutStream = originalOutStream;
282 }
283
284 public boolean hasNext()
285 {
286 popUnread();
287 return isNotBlank( clazz );
288 }
289
290 public String next()
291 {
292 popUnread();
293 try
294 {
295 if ( isBlank( clazz ) )
296 {
297 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
298 }
299 else
300 {
301 return clazz;
302 }
303 }
304 finally
305 {
306 clazz = null;
307 }
308 }
309
310 public void remove()
311 {
312 throw new UnsupportedOperationException();
313 }
314
315 private void popUnread()
316 {
317 if ( shouldFinish() )
318 {
319 clazz = null;
320 return;
321 }
322
323 if ( isBlank( clazz ) )
324 {
325 requestNextTest();
326 CommandReader.this.awaitNextTest();
327 if ( shouldFinish() )
328 {
329 clazz = null;
330 return;
331 }
332 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
333 CommandReader.this.iteratedCount = nextQueueIndex;
334 }
335
336 if ( CommandReader.this.isStopped() )
337 {
338 clazz = null;
339 }
340 }
341
342 private void requestNextTest()
343 {
344 byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
345 synchronized ( originalOutStream )
346 {
347 originalOutStream.write( encoded, 0, encoded.length );
348 originalOutStream.flush();
349 }
350 }
351
352 private boolean shouldFinish()
353 {
354 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
355 return CommandReader.this.isStopped() || wasLastTestRead;
356 }
357
358 private boolean isEndSymbolAt( int index )
359 {
360 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
361 }
362 }
363
364 private void awaitNextTest()
365 {
366 nextCommandNotifier.acquireUninterruptibly();
367 }
368
369 private void wakeupIterator()
370 {
371 nextCommandNotifier.release();
372 }
373
374 private final class CommandRunnable
375 implements Runnable
376 {
377 public void run()
378 {
379 CommandReader.this.startMonitor.countDown();
380 DataInputStream stdIn = new DataInputStream( System.in );
381 boolean isTestSetFinished = false;
382 try
383 {
384 while ( CommandReader.this.state.get() == RUNNABLE )
385 {
386 Command command = decode( stdIn );
387 if ( command == null )
388 {
389 String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
390 DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
391 logger.error( errorMessage );
392 break;
393 }
394 else
395 {
396 switch ( command.getCommandType() )
397 {
398 case RUN_CLASS:
399 String test = command.getData();
400 boolean inserted = CommandReader.this.insertToQueue( test );
401 if ( inserted )
402 {
403 CommandReader.this.wakeupIterator();
404 insertToListeners( command );
405 }
406 break;
407 case TEST_SET_FINISHED:
408 CommandReader.this.makeQueueFull();
409 isTestSetFinished = true;
410 CommandReader.this.wakeupIterator();
411 insertToListeners( command );
412 break;
413 case SHUTDOWN:
414 CommandReader.this.makeQueueFull();
415 CommandReader.this.wakeupIterator();
416 insertToListeners( command );
417 break;
418 default:
419 insertToListeners( command );
420 break;
421 }
422 }
423 }
424 }
425 catch ( EOFException e )
426 {
427 CommandReader.this.state.set( TERMINATED );
428 if ( !isTestSetFinished )
429 {
430 String msg = "TestSet has not finished before stream error has appeared >> "
431 + "initializing exit by non-null configuration: "
432 + CommandReader.this.shutdown;
433 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
434
435 exitByConfiguration();
436
437 }
438 }
439 catch ( IOException e )
440 {
441 CommandReader.this.state.set( TERMINATED );
442
443 if ( !( e.getCause() instanceof InterruptedException ) )
444 {
445 String msg = "[SUREFIRE] std/in stream corrupted";
446 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
447 logger.error( msg, e );
448 }
449 }
450 finally
451 {
452
453 if ( !isTestSetFinished )
454 {
455 CommandReader.this.makeQueueFull();
456 }
457 CommandReader.this.wakeupIterator();
458 }
459 }
460
461 private void insertToListeners( Command cmd )
462 {
463 MasterProcessCommand expectedCommandType = cmd.getCommandType();
464 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
465 {
466 MasterProcessCommand commandType = listenerWrapper.getP1();
467 CommandListener listener = listenerWrapper.getP2();
468 if ( commandType == null || commandType == expectedCommandType )
469 {
470 listener.update( cmd );
471 }
472 }
473 }
474
475 private void exitByConfiguration()
476 {
477 Shutdown shutdown = CommandReader.this.shutdown;
478 if ( shutdown != null )
479 {
480 CommandReader.this.makeQueueFull();
481 CommandReader.this.wakeupIterator();
482 insertToListeners( toShutdown( shutdown ) );
483 if ( shutdown.isExit() )
484 {
485 System.exit( 1 );
486 }
487 else if ( shutdown.isKill() )
488 {
489 Runtime.getRuntime().halt( 1 );
490 }
491
492 }
493 }
494 }
495
496 }