1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24 import org.apache.maven.surefire.testset.TestSetFailedException;
25
26 import java.io.DataInputStream;
27 import java.io.EOFException;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.util.Iterator;
31 import java.util.NoSuchElementException;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import static java.lang.Thread.State.NEW;
40 import static java.lang.Thread.State.RUNNABLE;
41 import static java.lang.Thread.State.TERMINATED;
42 import static java.lang.StrictMath.max;
43 import static org.apache.maven.surefire.booter.Command.toShutdown;
44 import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
45 import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
48 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
49 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
50 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
51 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
52 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53 import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
54 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
55 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
56 import static org.apache.maven.surefire.util.internal.ObjectUtils.requireNonNull;
57
58
59
60
61
62
63
64 public final class CommandReader
65 {
66 private static final String LAST_TEST_SYMBOL = "";
67
68 private static final CommandReader READER = new CommandReader();
69
70 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
71 = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
72
73 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
74
75 private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
76
77 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
78
79 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
80
81 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
82
83 private volatile Shutdown shutdown;
84
85 private int iteratedCount;
86
87 private volatile ConsoleLogger logger = new NullConsoleLogger();
88
89 private CommandReader()
90 {
91 }
92
93 public static CommandReader getReader()
94 {
95 final CommandReader reader = READER;
96 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
97 {
98 reader.commandThread.start();
99 }
100 return reader;
101 }
102
103 public CommandReader setShutdown( Shutdown shutdown )
104 {
105 this.shutdown = shutdown;
106 return this;
107 }
108
109 public CommandReader setLogger( ConsoleLogger logger )
110 {
111 this.logger = requireNonNull( logger, "null logger" );
112 return this;
113 }
114
115 public boolean awaitStarted()
116 throws TestSetFailedException
117 {
118 if ( state.get() == RUNNABLE )
119 {
120 try
121 {
122 startMonitor.await();
123 return true;
124 }
125 catch ( InterruptedException e )
126 {
127 DumpErrorSingleton.getSingleton().dumpException( e );
128 throw new TestSetFailedException( e.getLocalizedMessage() );
129 }
130 }
131 else
132 {
133 return false;
134 }
135 }
136
137
138
139
140 public void addListener( CommandListener listener )
141 {
142 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
143 }
144
145 public void addTestListener( CommandListener listener )
146 {
147 addListener( RUN_CLASS, listener );
148 }
149
150 public void addTestsFinishedListener( CommandListener listener )
151 {
152 addListener( TEST_SET_FINISHED, listener );
153 }
154
155 public void addSkipNextTestsListener( CommandListener listener )
156 {
157 addListener( SKIP_SINCE_NEXT_TEST, listener );
158 }
159
160 public void addShutdownListener( CommandListener listener )
161 {
162 addListener( SHUTDOWN, listener );
163 }
164
165 public void addNoopListener( CommandListener listener )
166 {
167 addListener( NOOP, listener );
168 }
169
170 public void addByeAckListener( CommandListener listener )
171 {
172 addListener( BYE_ACK, listener );
173 }
174
175 private void addListener( MasterProcessCommand cmd, CommandListener listener )
176 {
177 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
178 }
179
180 public void removeListener( CommandListener listener )
181 {
182 for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
183 {
184 BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
185 if ( listener == listenerWrapper.getP2() )
186 {
187 it.remove();
188 }
189 }
190 }
191
192
193
194
195 Iterator<String> iterated()
196 {
197 return testClasses.subList( 0, iteratedCount ).iterator();
198 }
199
200
201
202
203
204
205
206
207 Iterable<String> getIterableClasses( PrintStream originalOutStream )
208 {
209 return new ClassesIterable( originalOutStream );
210 }
211
212 public void stop()
213 {
214 if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
215 {
216 makeQueueFull();
217 listeners.clear();
218 commandThread.interrupt();
219 }
220 }
221
222 private boolean isStopped()
223 {
224 return state.get() == TERMINATED;
225 }
226
227
228
229
230 private boolean isQueueFull()
231 {
232
233
234
235
236
237
238
239
240 int searchFrom = max( 0, testClasses.size() - 1 );
241 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
242 }
243
244 private void makeQueueFull()
245 {
246 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
247 }
248
249 private boolean insertToQueue( String test )
250 {
251 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
252 }
253
254 private final class ClassesIterable
255 implements Iterable<String>
256 {
257 private final PrintStream originalOutStream;
258
259 ClassesIterable( PrintStream originalOutStream )
260 {
261 this.originalOutStream = originalOutStream;
262 }
263
264 @Override
265 public Iterator<String> iterator()
266 {
267 return new ClassesIterator( originalOutStream );
268 }
269 }
270
271 private final class ClassesIterator
272 implements Iterator<String>
273 {
274 private final PrintStream originalOutStream;
275
276 private String clazz;
277
278 private int nextQueueIndex;
279
280 private ClassesIterator( PrintStream originalOutStream )
281 {
282 this.originalOutStream = originalOutStream;
283 }
284
285 @Override
286 public boolean hasNext()
287 {
288 popUnread();
289 return isNotBlank( clazz );
290 }
291
292 @Override
293 public String next()
294 {
295 popUnread();
296 try
297 {
298 if ( isBlank( clazz ) )
299 {
300 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
301 }
302 else
303 {
304 return clazz;
305 }
306 }
307 finally
308 {
309 clazz = null;
310 }
311 }
312
313 @Override
314 public void remove()
315 {
316 throw new UnsupportedOperationException();
317 }
318
319 private void popUnread()
320 {
321 if ( shouldFinish() )
322 {
323 clazz = null;
324 return;
325 }
326
327 if ( isBlank( clazz ) )
328 {
329 requestNextTest();
330 CommandReader.this.awaitNextTest();
331 if ( shouldFinish() )
332 {
333 clazz = null;
334 return;
335 }
336 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
337 CommandReader.this.iteratedCount = nextQueueIndex;
338 }
339
340 if ( CommandReader.this.isStopped() )
341 {
342 clazz = null;
343 }
344 }
345
346 private void requestNextTest()
347 {
348 byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
349 synchronized ( originalOutStream )
350 {
351 originalOutStream.write( encoded, 0, encoded.length );
352 originalOutStream.flush();
353 }
354 }
355
356 private boolean shouldFinish()
357 {
358 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
359 return CommandReader.this.isStopped() || wasLastTestRead;
360 }
361
362 private boolean isEndSymbolAt( int index )
363 {
364 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
365 }
366 }
367
368 private void awaitNextTest()
369 {
370 nextCommandNotifier.acquireUninterruptibly();
371 }
372
373 private void wakeupIterator()
374 {
375 nextCommandNotifier.release();
376 }
377
378 private final class CommandRunnable
379 implements Runnable
380 {
381 @Override
382 public void run()
383 {
384 CommandReader.this.startMonitor.countDown();
385 DataInputStream stdIn = new DataInputStream( System.in );
386 boolean isTestSetFinished = false;
387 try
388 {
389 while ( CommandReader.this.state.get() == RUNNABLE )
390 {
391 Command command = decode( stdIn );
392 if ( command == null )
393 {
394 String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
395 DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
396 logger.error( errorMessage );
397 break;
398 }
399 else
400 {
401 switch ( command.getCommandType() )
402 {
403 case RUN_CLASS:
404 String test = command.getData();
405 boolean inserted = CommandReader.this.insertToQueue( test );
406 if ( inserted )
407 {
408 CommandReader.this.wakeupIterator();
409 insertToListeners( command );
410 }
411 break;
412 case TEST_SET_FINISHED:
413 CommandReader.this.makeQueueFull();
414 isTestSetFinished = true;
415 CommandReader.this.wakeupIterator();
416 insertToListeners( command );
417 break;
418 case SHUTDOWN:
419 CommandReader.this.makeQueueFull();
420 CommandReader.this.wakeupIterator();
421 insertToListeners( command );
422 break;
423 default:
424 insertToListeners( command );
425 break;
426 }
427 }
428 }
429 }
430 catch ( EOFException e )
431 {
432 CommandReader.this.state.set( TERMINATED );
433 if ( !isTestSetFinished )
434 {
435 String msg = "TestSet has not finished before stream error has appeared >> "
436 + "initializing exit by non-null configuration: "
437 + CommandReader.this.shutdown;
438 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
439
440 exitByConfiguration();
441
442 }
443 }
444 catch ( IOException e )
445 {
446 CommandReader.this.state.set( TERMINATED );
447
448 if ( !( e.getCause() instanceof InterruptedException ) )
449 {
450 String msg = "[SUREFIRE] std/in stream corrupted";
451 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
452 logger.error( msg, e );
453 }
454 }
455 finally
456 {
457
458 if ( !isTestSetFinished )
459 {
460 CommandReader.this.makeQueueFull();
461 }
462 CommandReader.this.wakeupIterator();
463 }
464 }
465
466 private void insertToListeners( Command cmd )
467 {
468 MasterProcessCommand expectedCommandType = cmd.getCommandType();
469 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
470 {
471 MasterProcessCommand commandType = listenerWrapper.getP1();
472 CommandListener listener = listenerWrapper.getP2();
473 if ( commandType == null || commandType == expectedCommandType )
474 {
475 listener.update( cmd );
476 }
477 }
478 }
479
480 private void exitByConfiguration()
481 {
482 Shutdown shutdown = CommandReader.this.shutdown;
483 if ( shutdown != null )
484 {
485 CommandReader.this.makeQueueFull();
486 CommandReader.this.wakeupIterator();
487 insertToListeners( toShutdown( shutdown ) );
488 if ( shutdown.isExit() )
489 {
490 System.exit( 1 );
491 }
492 else if ( shutdown.isKill() )
493 {
494 Runtime.getRuntime().halt( 1 );
495 }
496
497 }
498 }
499 }
500
501 }