1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
24 import org.apache.maven.surefire.testset.TestSetFailedException;
25
26 import java.io.DataInputStream;
27 import java.io.EOFException;
28 import java.io.IOException;
29 import java.util.Iterator;
30 import java.util.NoSuchElementException;
31 import java.util.Queue;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import static java.util.Objects.requireNonNull;
39 import static java.lang.Thread.State.NEW;
40 import static java.lang.Thread.State.RUNNABLE;
41 import static java.lang.Thread.State.TERMINATED;
42 import static java.lang.StrictMath.max;
43 import static org.apache.maven.surefire.booter.Command.toShutdown;
44 import static org.apache.maven.surefire.booter.MasterProcessCommand.BYE_ACK;
45 import static org.apache.maven.surefire.booter.MasterProcessCommand.NOOP;
46 import static org.apache.maven.surefire.booter.MasterProcessCommand.RUN_CLASS;
47 import static org.apache.maven.surefire.booter.MasterProcessCommand.SHUTDOWN;
48 import static org.apache.maven.surefire.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
49 import static org.apache.maven.surefire.booter.MasterProcessCommand.TEST_SET_FINISHED;
50 import static org.apache.maven.surefire.booter.MasterProcessCommand.decode;
51 import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
52 import static org.apache.maven.surefire.util.internal.StringUtils.isBlank;
53 import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
54
55
56
57
58
59
60
61 public final class CommandReader
62 {
63 private static final String LAST_TEST_SYMBOL = "";
64
65 private static final CommandReader READER = new CommandReader();
66
67 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
68
69 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
70
71 private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
72
73 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
74
75 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
76
77 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
78
79 private volatile Shutdown shutdown;
80
81 private int iteratedCount;
82
83 private volatile ConsoleLogger logger = new NullConsoleLogger();
84
85 private CommandReader()
86 {
87 }
88
89 public static CommandReader getReader()
90 {
91 final CommandReader reader = READER;
92 if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
93 {
94 reader.commandThread.start();
95 }
96 return reader;
97 }
98
99 public CommandReader setShutdown( Shutdown shutdown )
100 {
101 this.shutdown = shutdown;
102 return this;
103 }
104
105 public CommandReader setLogger( ConsoleLogger logger )
106 {
107 this.logger = requireNonNull( logger, "null logger" );
108 return this;
109 }
110
111 public boolean awaitStarted()
112 throws TestSetFailedException
113 {
114 if ( state.get() == RUNNABLE )
115 {
116 try
117 {
118 startMonitor.await();
119 return true;
120 }
121 catch ( InterruptedException e )
122 {
123 DumpErrorSingleton.getSingleton().dumpException( e );
124 throw new TestSetFailedException( e.getLocalizedMessage() );
125 }
126 }
127 else
128 {
129 return false;
130 }
131 }
132
133
134
135
136 public void addListener( CommandListener listener )
137 {
138 listeners.add( new BiProperty<MasterProcessCommand, CommandListener>( null, listener ) );
139 }
140
141 public void addTestListener( CommandListener listener )
142 {
143 addListener( RUN_CLASS, listener );
144 }
145
146 public void addTestsFinishedListener( CommandListener listener )
147 {
148 addListener( TEST_SET_FINISHED, listener );
149 }
150
151 public void addSkipNextTestsListener( CommandListener listener )
152 {
153 addListener( SKIP_SINCE_NEXT_TEST, listener );
154 }
155
156 public void addShutdownListener( CommandListener listener )
157 {
158 addListener( SHUTDOWN, listener );
159 }
160
161 public void addNoopListener( CommandListener listener )
162 {
163 addListener( NOOP, listener );
164 }
165
166 public void addByeAckListener( CommandListener listener )
167 {
168 addListener( BYE_ACK, listener );
169 }
170
171 private void addListener( MasterProcessCommand cmd, CommandListener listener )
172 {
173 listeners.add( new BiProperty<>( cmd, listener ) );
174 }
175
176 public void removeListener( CommandListener listener )
177 {
178 for ( Iterator<BiProperty<MasterProcessCommand, CommandListener>> it = listeners.iterator(); it.hasNext(); )
179 {
180 BiProperty<MasterProcessCommand, CommandListener> listenerWrapper = it.next();
181 if ( listener == listenerWrapper.getP2() )
182 {
183 it.remove();
184 }
185 }
186 }
187
188
189
190
191 Iterator<String> iterated()
192 {
193 return testClasses.subList( 0, iteratedCount ).iterator();
194 }
195
196
197
198
199
200
201
202
203 Iterable<String> getIterableClasses( ForkedChannelEncoder eventChannel )
204 {
205 return new ClassesIterable( eventChannel );
206 }
207
208 public void stop()
209 {
210 if ( !isStopped() )
211 {
212 state.set( TERMINATED );
213 makeQueueFull();
214 listeners.clear();
215 commandThread.interrupt();
216 }
217 }
218
219 private boolean isStopped()
220 {
221 return state.get() == TERMINATED;
222 }
223
224
225
226
227 private boolean isQueueFull()
228 {
229
230
231
232
233
234
235
236
237 int searchFrom = max( 0, testClasses.size() - 1 );
238 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
239 }
240
241 private void makeQueueFull()
242 {
243 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
244 }
245
246 private boolean insertToQueue( String test )
247 {
248 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
249 }
250
251 private final class ClassesIterable
252 implements Iterable<String>
253 {
254 private final ForkedChannelEncoder eventChannel;
255
256 ClassesIterable( ForkedChannelEncoder eventChannel )
257 {
258 this.eventChannel = eventChannel;
259 }
260
261 @Override
262 public Iterator<String> iterator()
263 {
264 return new ClassesIterator( eventChannel );
265 }
266 }
267
268 private final class ClassesIterator
269 implements Iterator<String>
270 {
271 private final ForkedChannelEncoder eventChannel;
272
273 private String clazz;
274
275 private int nextQueueIndex;
276
277 private ClassesIterator( ForkedChannelEncoder eventChannel )
278 {
279 this.eventChannel = eventChannel;
280 }
281
282 @Override
283 public boolean hasNext()
284 {
285 popUnread();
286 return isNotBlank( clazz );
287 }
288
289 @Override
290 public String next()
291 {
292 popUnread();
293 try
294 {
295 if ( isBlank( clazz ) )
296 {
297 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
298 }
299 else
300 {
301 return clazz;
302 }
303 }
304 finally
305 {
306 clazz = null;
307 }
308 }
309
310 @Override
311 public void remove()
312 {
313 throw new UnsupportedOperationException();
314 }
315
316 private void popUnread()
317 {
318 if ( shouldFinish() )
319 {
320 clazz = null;
321 return;
322 }
323
324 if ( isBlank( clazz ) )
325 {
326 requestNextTest();
327 CommandReader.this.awaitNextTest();
328 if ( shouldFinish() )
329 {
330 clazz = null;
331 return;
332 }
333 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
334 CommandReader.this.iteratedCount = nextQueueIndex;
335 }
336
337 if ( CommandReader.this.isStopped() )
338 {
339 clazz = null;
340 }
341 }
342
343 private void requestNextTest()
344 {
345 eventChannel.acquireNextTest();
346 }
347
348 private boolean shouldFinish()
349 {
350 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
351 return CommandReader.this.isStopped() || wasLastTestRead;
352 }
353
354 private boolean isEndSymbolAt( int index )
355 {
356 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
357 }
358 }
359
360 private void awaitNextTest()
361 {
362 nextCommandNotifier.acquireUninterruptibly();
363 }
364
365 private void wakeupIterator()
366 {
367 nextCommandNotifier.release();
368 }
369
370 private final class CommandRunnable
371 implements Runnable
372 {
373 @Override
374 public void run()
375 {
376 CommandReader.this.startMonitor.countDown();
377 DataInputStream stdIn = new DataInputStream( System.in );
378 boolean isTestSetFinished = false;
379 try
380 {
381 while ( CommandReader.this.state.get() == RUNNABLE )
382 {
383 Command command = decode( stdIn );
384 if ( command == null )
385 {
386 String errorMessage = "[SUREFIRE] std/in stream corrupted: first sequence not recognized";
387 DumpErrorSingleton.getSingleton().dumpStreamText( errorMessage );
388 logger.error( errorMessage );
389 break;
390 }
391 else
392 {
393 switch ( command.getCommandType() )
394 {
395 case RUN_CLASS:
396 String test = command.getData();
397 boolean inserted = CommandReader.this.insertToQueue( test );
398 if ( inserted )
399 {
400 CommandReader.this.wakeupIterator();
401 callListeners( command );
402 }
403 break;
404 case TEST_SET_FINISHED:
405 CommandReader.this.makeQueueFull();
406 isTestSetFinished = true;
407 CommandReader.this.wakeupIterator();
408 callListeners( command );
409 break;
410 case SHUTDOWN:
411 CommandReader.this.makeQueueFull();
412 CommandReader.this.wakeupIterator();
413 callListeners( command );
414 break;
415 default:
416 callListeners( command );
417 break;
418 }
419 }
420 }
421 }
422 catch ( EOFException e )
423 {
424 CommandReader.this.state.set( TERMINATED );
425 if ( !isTestSetFinished )
426 {
427 String msg = "TestSet has not finished before stream error has appeared >> "
428 + "initializing exit by non-null configuration: "
429 + CommandReader.this.shutdown;
430 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
431
432 exitByConfiguration();
433
434 }
435 }
436 catch ( IOException e )
437 {
438 CommandReader.this.state.set( TERMINATED );
439
440 if ( !( e.getCause() instanceof InterruptedException ) )
441 {
442 String msg = "[SUREFIRE] std/in stream corrupted";
443 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
444 logger.error( msg, e );
445 }
446 }
447 finally
448 {
449
450 if ( !isTestSetFinished )
451 {
452 CommandReader.this.makeQueueFull();
453 }
454 CommandReader.this.wakeupIterator();
455 }
456 }
457
458 private void callListeners( Command cmd )
459 {
460 MasterProcessCommand expectedCommandType = cmd.getCommandType();
461 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
462 {
463 MasterProcessCommand commandType = listenerWrapper.getP1();
464 CommandListener listener = listenerWrapper.getP2();
465 if ( commandType == null || commandType == expectedCommandType )
466 {
467 listener.update( cmd );
468 }
469 }
470 }
471
472 private void exitByConfiguration()
473 {
474 Shutdown shutdown = CommandReader.this.shutdown;
475 if ( shutdown != null )
476 {
477 CommandReader.this.makeQueueFull();
478 CommandReader.this.wakeupIterator();
479 callListeners( toShutdown( shutdown ) );
480 }
481 }
482 }
483 }