1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.testset.TestSetFailedException;
23
24 import java.io.DataInputStream;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.PrintStream;
28 import java.util.Iterator;
29 import java.util.NoSuchElementException;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import static java.lang.Thread.State.NEW;
36 import static java.lang.Thread.State.RUNNABLE;
37 import static java.lang.Thread.State.TERMINATED;
38 import static java.util.concurrent.locks.LockSupport.park;
39 import static java.util.concurrent.locks.LockSupport.unpark;
40 import static org.apache.maven.surefire.booter.Command.toShutdown;
41 import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
42 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
43 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
44 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
45 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
48 import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
49 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
50 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
51 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
52
53
54
55
56
57
58
59 public final class MasterProcessReader
60 {
61 private static final MasterProcessReader READER = new MasterProcessReader();
62
63 private final Queue<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>> listeners
64 = new ConcurrentLinkedQueue<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>>();
65
66 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
67
68 private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
69
70 private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
71
72 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
73
74 private final Node headTestClassQueue = new Node();
75
76 private volatile Node tailTestClassQueue = headTestClassQueue;
77
78 private volatile Shutdown shutdown;
79
80 private static class Node
81 {
82 final AtomicReference<Node> successor = new AtomicReference<Node>();
83 volatile String item;
84 }
85
86 public static MasterProcessReader getReader()
87 {
88 final MasterProcessReader reader = READER;
89 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
90 {
91 reader.commandThread.start();
92 }
93 return reader;
94 }
95
96 public MasterProcessReader setShutdown( Shutdown shutdown )
97 {
98 this.shutdown = shutdown;
99 return this;
100 }
101
102 public boolean awaitStarted()
103 throws TestSetFailedException
104 {
105 if ( state.get() == RUNNABLE )
106 {
107 try
108 {
109 startMonitor.await();
110 return true;
111 }
112 catch ( InterruptedException e )
113 {
114 throw new TestSetFailedException( e.getLocalizedMessage() );
115 }
116 }
117 else
118 {
119 return false;
120 }
121 }
122
123
124
125
126 public void addListener( MasterProcessListener listener )
127 {
128 listeners.add( new TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>( null, listener ) );
129 }
130
131 public void addTestListener( MasterProcessListener listener )
132 {
133 addListener( RUN_CLASS, listener );
134 }
135
136 public void addTestsFinishedListener( MasterProcessListener listener )
137 {
138 addListener( TEST_SET_FINISHED, listener );
139 }
140
141 public void addSkipNextListener( MasterProcessListener listener )
142 {
143 addListener( SKIP_SINCE_NEXT_TEST, listener );
144 }
145
146 public void addShutdownListener( MasterProcessListener listener )
147 {
148 addListener( SHUTDOWN, listener );
149 }
150
151 public void addNoopListener( MasterProcessListener listener )
152 {
153 addListener( NOOP, listener );
154 }
155
156 private void addListener( MasterProcessCommand cmd, MasterProcessListener listener )
157 {
158 listeners.add( new TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>( cmd, listener ) );
159 }
160
161 public void removeListener( MasterProcessListener listener )
162 {
163 for ( Iterator<TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener>> it = listeners.iterator();
164 it.hasNext(); )
165 {
166 TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener> listenerWrapper = it.next();
167 if ( listener == listenerWrapper.getP2() )
168 {
169 it.remove();
170 }
171 }
172 }
173
174 Iterable<String> getIterableClasses( PrintStream originalOutStream )
175 {
176 return new ClassesIterable( headTestClassQueue, originalOutStream );
177 }
178
179 public void stop()
180 {
181 if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
182 {
183 makeQueueFull();
184 listeners.clear();
185 commandThread.interrupt();
186 }
187 }
188
189 private boolean isStopped()
190 {
191 return state.get() == TERMINATED;
192 }
193
194 private static boolean isLastNode( Node current )
195 {
196 return current.successor.get() == current;
197 }
198
199 private boolean isQueueFull()
200 {
201 return isLastNode( tailTestClassQueue );
202 }
203
204
205
206
207 private boolean addTestClassToQueue( String item )
208 {
209 if ( tailTestClassQueue.item == null )
210 {
211 tailTestClassQueue.item = item;
212 Node newNode = new Node();
213 tailTestClassQueue.successor.set( newNode );
214 tailTestClassQueue = newNode;
215 return true;
216 }
217 else
218 {
219 return false;
220 }
221 }
222
223
224
225
226
227 @SuppressWarnings( { "all", "checkstyle:needbraces", "checkstyle:emptystatement" } )
228 public void makeQueueFull()
229 {
230
231 for ( Node tail = this.tailTestClassQueue;
232 !tail.successor.compareAndSet( null, tail ) && tail.successor.get() != tail;
233 tail = tail.successor.get() );
234 }
235
236
237
238
239 private void insertToQueue( Command cmd )
240 {
241 MasterProcessCommand expectedCommandType = cmd.getCommandType();
242 switch ( expectedCommandType )
243 {
244 case RUN_CLASS:
245 addTestClassToQueue( cmd.getData() );
246 break;
247 case TEST_SET_FINISHED:
248 makeQueueFull();
249 break;
250 default:
251
252 break;
253 }
254 }
255
256 private void insertToListeners( Command cmd )
257 {
258 MasterProcessCommand expectedCommandType = cmd.getCommandType();
259 for ( TwoPropertiesWrapper<MasterProcessCommand, MasterProcessListener> listenerWrapper
260 : MasterProcessReader.this.listeners )
261 {
262 MasterProcessCommand commandType = listenerWrapper.getP1();
263 MasterProcessListener listener = listenerWrapper.getP2();
264 if ( commandType == null || commandType == expectedCommandType )
265 {
266 listener.update( cmd );
267 }
268 }
269 }
270
271
272
273
274 private void insert( Command cmd )
275 {
276 insertToQueue( cmd );
277 insertToListeners( cmd );
278 }
279
280 private final class ClassesIterable
281 implements Iterable<String>
282 {
283 private final Node head;
284 private final PrintStream originalOutStream;
285
286 ClassesIterable( Node head, PrintStream originalOutStream )
287 {
288 this.head = head;
289 this.originalOutStream = originalOutStream;
290 }
291
292 public Iterator<String> iterator()
293 {
294 return new ClassesIterator( head, originalOutStream );
295 }
296 }
297
298 private final class ClassesIterator
299 implements Iterator<String>
300 {
301 private final PrintStream originalOutStream;
302
303 private Node current;
304
305 private String clazz;
306
307 private ClassesIterator( Node current, PrintStream originalOutStream )
308 {
309 this.current = current;
310 this.originalOutStream = originalOutStream;
311 }
312
313 public boolean hasNext()
314 {
315 popUnread();
316 return isNotBlank( clazz );
317 }
318
319 public String next()
320 {
321 popUnread();
322 try
323 {
324 if ( isBlank( clazz ) )
325 {
326 throw new NoSuchElementException();
327 }
328 else
329 {
330 return clazz;
331 }
332 }
333 finally
334 {
335 clazz = null;
336 }
337 }
338
339 public void remove()
340 {
341 throw new UnsupportedOperationException();
342 }
343
344 private void popUnread()
345 {
346 if ( isStopped() )
347 {
348 clazz = null;
349 return;
350 }
351
352 if ( isBlank( clazz ) )
353 {
354 do
355 {
356 requestNextTest();
357 if ( isLastNode( current ) )
358 {
359 clazz = null;
360 }
361 else if ( current.item == null )
362 {
363 do
364 {
365 await();
366
367
368
369
370
371 if ( isStopped() )
372 {
373 clazz = null;
374 return;
375 }
376 } while ( current.item == null && !isLastNode( current ) );
377 clazz = current.item;
378 current = current.successor.get();
379 }
380 else
381 {
382 clazz = current.item;
383 current = current.successor.get();
384 }
385 }
386 while ( tryNullWhiteClass() );
387 }
388
389 if ( isStopped() )
390 {
391 clazz = null;
392 }
393 }
394
395 private boolean tryNullWhiteClass()
396 {
397 if ( clazz != null && isBlank( clazz ) )
398 {
399 clazz = null;
400 return true;
401 }
402 else
403 {
404 return false;
405 }
406 }
407
408 private void requestNextTest()
409 {
410 byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
411 originalOutStream.write( encoded, 0, encoded.length );
412 }
413 }
414
415
416
417
418 private Command read( DataInputStream stdIn )
419 throws IOException
420 {
421 Command command = decode( stdIn );
422 if ( command != null )
423 {
424 insertToQueue( command );
425 }
426 return command;
427 }
428
429 private void await()
430 {
431 final Thread currentThread = Thread.currentThread();
432 try
433 {
434 waiters.add( currentThread );
435 park();
436 }
437 finally
438 {
439 waiters.remove( currentThread );
440 }
441 }
442
443 private void wakeupWaiters()
444 {
445 for ( Thread waiter : waiters )
446 {
447 unpark( waiter );
448 }
449 }
450
451 private final class CommandRunnable
452 implements Runnable
453 {
454 public void run()
455 {
456 MasterProcessReader.this.startMonitor.countDown();
457 DataInputStream stdIn = new DataInputStream( System.in );
458 boolean isTestSetFinished = false;
459 try
460 {
461 while ( MasterProcessReader.this.state.get() == RUNNABLE )
462 {
463 Command command = read( stdIn );
464 if ( command == null )
465 {
466 System.err.println( "[SUREFIRE] std/in stream corrupted: first sequence not recognized" );
467 break;
468 }
469 else
470 {
471 switch ( command.getCommandType() )
472 {
473 case TEST_SET_FINISHED:
474 isTestSetFinished = true;
475 wakeupWaiters();
476 break;
477 case RUN_CLASS:
478 wakeupWaiters();
479 break;
480 case SHUTDOWN:
481 insertToQueue( Command.TEST_SET_FINISHED );
482 wakeupWaiters();
483 break;
484 default:
485
486 break;
487 }
488
489 insertToListeners( command );
490 }
491 }
492 }
493 catch ( EOFException e )
494 {
495 MasterProcessReader.this.state.set( TERMINATED );
496 if ( !isTestSetFinished )
497 {
498 exitByConfiguration();
499
500 }
501 }
502 catch ( IOException e )
503 {
504 MasterProcessReader.this.state.set( TERMINATED );
505
506 if ( !( e.getCause() instanceof InterruptedException ) )
507 {
508 System.err.println( "[SUREFIRE] std/in stream corrupted" );
509 e.printStackTrace();
510 }
511 }
512 finally
513 {
514
515 if ( !isTestSetFinished )
516 {
517 insert( Command.TEST_SET_FINISHED );
518 }
519 wakeupWaiters();
520 }
521 }
522
523
524
525
526 private void exitByConfiguration()
527 {
528 Shutdown shutdown = MasterProcessReader.this.shutdown;
529 if ( shutdown != null )
530 {
531 insert( Command.TEST_SET_FINISHED );
532 wakeupWaiters();
533 insertToListeners( toShutdown( shutdown ) );
534 switch ( shutdown )
535 {
536 case EXIT:
537 System.exit( 1 );
538 case KILL:
539 Runtime.getRuntime().halt( 1 );
540 case DEFAULT:
541 default:
542
543 break;
544 }
545 }
546 }
547 }
548 }