1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.testset.TestSetFailedException;
23
24 import java.io.DataInputStream;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.PrintStream;
28 import java.util.Iterator;
29 import java.util.NoSuchElementException;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Semaphore;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import static java.lang.Thread.State.NEW;
38 import static java.lang.Thread.State.RUNNABLE;
39 import static java.lang.Thread.State.TERMINATED;
40 import static java.lang.StrictMath.max;
41 import static org.apache.maven.surefire.booter.Command.toShutdown;
42 import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
43 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
44 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
45 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
48 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
49 import static org.apache.maven.surefire.util.internal.StringUtils.encodeStringForForkCommunication;
50 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
51 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
52 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
53
54
55
56
57
58
59
60 public final class CommandReader
61 {
62 private static final String LAST_TEST_SYMBOL = "";
63
64 private static final CommandReader READER = new CommandReader();
65
66 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners
67 = new ConcurrentLinkedQueue<BiProperty<MasterProcessCommand, CommandListener>>();
68
69 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
70
71 private final AtomicReference<Thread.State> state = new AtomicReference<Thread.State>( NEW );
72
73 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
74
75 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
76
77 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<String>();
78
79 private volatile Shutdown shutdown;
80
81 private int iteratedCount;
82
83 public static CommandReader getReader()
84 {
85 final CommandReader reader = READER;
86 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
87 {
88 reader.commandThread.start();
89 }
90 return reader;
91 }
92
93 public CommandReader setShutdown( Shutdown shutdown )
94 {
95 this.shutdown = shutdown;
96 return this;
97 }
98
99 public boolean awaitStarted()
100 throws TestSetFailedException
101 {
102 if ( state.get() == RUNNABLE )
103 {
104 try
105 {
106 startMonitor.await();
107 return true;
108 }
109 catch ( InterruptedException e )
110 {
111 throw new TestSetFailedException( e.getLocalizedMessage() );
112 }
113 }
114 else
115 {
116 return false;
117 }
118 }
119
120
121
122
123 public void addListener( CommandListener listener )
124 {
125 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
126 }
127
128 public void addTestListener( CommandListener listener )
129 {
130 addListener( RUN_CLASS, listener );
131 }
132
133 public void addTestsFinishedListener( CommandListener listener )
134 {
135 addListener( TEST_SET_FINISHED, listener );
136 }
137
138 public void addSkipNextTestsListener( CommandListener listener )
139 {
140 addListener( SKIP_SINCE_NEXT_TEST, listener );
141 }
142
143 public void addShutdownListener( CommandListener listener )
144 {
145 addListener( SHUTDOWN, listener );
146 }
147
148 public void addNoopListener( CommandListener listener )
149 {
150 addListener( NOOP, listener );
151 }
152
153 private void addListener( MasterProcessCommand cmd, CommandListener listener )
154 {
155 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( cmd, listener ) );
156 }
157
158 public void removeListener( CommandListener listener )
159 {
160 for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
161 {
162 BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
163 if ( listener == listenerWrapper.getP2() )
164 {
165 it.remove();
166 }
167 }
168 }
169
170
171
172
173 Iterator<String> iterated()
174 {
175 return testClasses.subList( 0, iteratedCount ).iterator();
176 }
177
178
179
180
181
182
183
184
185 Iterable<String> getIterableClasses( PrintStream originalOutStream )
186 {
187 return new ClassesIterable( originalOutStream );
188 }
189
190 public void stop()
191 {
192 if ( state.compareAndSet( NEW, TERMINATED ) || state.compareAndSet( RUNNABLE, TERMINATED ) )
193 {
194 makeQueueFull();
195 listeners.clear();
196 commandThread.interrupt();
197 }
198 }
199
200 private boolean isStopped()
201 {
202 return state.get() == TERMINATED;
203 }
204
205
206
207
208 private boolean isQueueFull()
209 {
210
211
212
213
214
215
216
217
218 int searchFrom = max( 0, testClasses.size() - 1 );
219 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
220 }
221
222 private void makeQueueFull()
223 {
224 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
225 }
226
227 private boolean insertToQueue( String test )
228 {
229 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
230 }
231
232 private final class ClassesIterable
233 implements Iterable<String>
234 {
235 private final PrintStream originalOutStream;
236
237 ClassesIterable( PrintStream originalOutStream )
238 {
239 this.originalOutStream = originalOutStream;
240 }
241
242 public Iterator<String> iterator()
243 {
244 return new ClassesIterator( originalOutStream );
245 }
246 }
247
248 private final class ClassesIterator
249 implements Iterator<String>
250 {
251 private final PrintStream originalOutStream;
252
253 private String clazz;
254
255 private int nextQueueIndex;
256
257 private ClassesIterator( PrintStream originalOutStream )
258 {
259 this.originalOutStream = originalOutStream;
260 }
261
262 public boolean hasNext()
263 {
264 popUnread();
265 return isNotBlank( clazz );
266 }
267
268 public String next()
269 {
270 popUnread();
271 try
272 {
273 if ( isBlank( clazz ) )
274 {
275 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
276 }
277 else
278 {
279 return clazz;
280 }
281 }
282 finally
283 {
284 clazz = null;
285 }
286 }
287
288 public void remove()
289 {
290 throw new UnsupportedOperationException();
291 }
292
293 private void popUnread()
294 {
295 if ( shouldFinish() )
296 {
297 clazz = null;
298 return;
299 }
300
301 if ( isBlank( clazz ) )
302 {
303 requestNextTest();
304 CommandReader.this.awaitNextTest();
305 if ( shouldFinish() )
306 {
307 clazz = null;
308 return;
309 }
310 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
311 CommandReader.this.iteratedCount = nextQueueIndex;
312 }
313
314 if ( CommandReader.this.isStopped() )
315 {
316 clazz = null;
317 }
318 }
319
320 private void requestNextTest()
321 {
322 byte[] encoded = encodeStringForForkCommunication( ( (char) BOOTERCODE_NEXT_TEST ) + ",0,want more!\n" );
323 originalOutStream.write( encoded, 0, encoded.length );
324 }
325
326 private boolean shouldFinish()
327 {
328 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
329 return CommandReader.this.isStopped() || wasLastTestRead;
330 }
331
332 private boolean isEndSymbolAt( int index )
333 {
334 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
335 }
336 }
337
338 private void awaitNextTest()
339 {
340 nextCommandNotifier.acquireUninterruptibly();
341 }
342
343 private void wakeupIterator()
344 {
345 nextCommandNotifier.release();
346 }
347
348 private final class CommandRunnable
349 implements Runnable
350 {
351 public void run()
352 {
353 CommandReader.this.startMonitor.countDown();
354 DataInputStream stdIn = new DataInputStream( System.in );
355 boolean isTestSetFinished = false;
356 try
357 {
358 while ( CommandReader.this.state.get() == RUNNABLE )
359 {
360 Command command = decode( stdIn );
361 if ( command == null )
362 {
363 System.err.println( "[SUREFIRE] std/in stream corrupted: first sequence not recognized" );
364 break;
365 }
366 else
367 {
368 switch ( command.getCommandType() )
369 {
370 case RUN_CLASS:
371 String test = command.getData();
372 boolean inserted = CommandReader.this.insertToQueue( test );
373 if ( inserted )
374 {
375 CommandReader.this.wakeupIterator();
376 insertToListeners( command );
377 }
378 break;
379 case TEST_SET_FINISHED:
380 CommandReader.this.makeQueueFull();
381 isTestSetFinished = true;
382 CommandReader.this.wakeupIterator();
383 insertToListeners( command );
384 break;
385 case SHUTDOWN:
386 CommandReader.this.makeQueueFull();
387 CommandReader.this.wakeupIterator();
388 insertToListeners( command );
389 break;
390 default:
391 insertToListeners( command );
392 break;
393 }
394 }
395 }
396 }
397 catch ( EOFException e )
398 {
399 CommandReader.this.state.set( TERMINATED );
400 if ( !isTestSetFinished )
401 {
402 exitByConfiguration();
403
404 }
405 }
406 catch ( IOException e )
407 {
408 CommandReader.this.state.set( TERMINATED );
409
410 if ( !( e.getCause() instanceof InterruptedException ) )
411 {
412 System.err.println( "[SUREFIRE] std/in stream corrupted" );
413 e.printStackTrace();
414 }
415 }
416 finally
417 {
418
419 if ( !isTestSetFinished )
420 {
421 CommandReader.this.makeQueueFull();
422 }
423 CommandReader.this.wakeupIterator();
424 }
425 }
426
427 private void insertToListeners( Command cmd )
428 {
429 MasterProcessCommand expectedCommandType = cmd.getCommandType();
430 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
431 {
432 MasterProcessCommand commandType = listenerWrapper.getP1();
433 CommandListener listener = listenerWrapper.getP2();
434 if ( commandType == null || commandType == expectedCommandType )
435 {
436 listener.update( cmd );
437 }
438 }
439 }
440
441 private void exitByConfiguration()
442 {
443 Shutdown shutdown = CommandReader.this.shutdown;
444 if ( shutdown != null )
445 {
446 CommandReader.this.makeQueueFull();
447 CommandReader.this.wakeupIterator();
448 insertToListeners( toShutdown( shutdown ) );
449 switch ( shutdown )
450 {
451 case EXIT:
452 System.exit( 1 );
453 case KILL:
454 Runtime.getRuntime().halt( 1 );
455 case DEFAULT:
456 default:
457
458 break;
459 }
460 }
461 }
462 }
463
464 }