1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.booter.Command;
23 import org.apache.maven.surefire.booter.Shutdown;
24
25 import java.io.IOException;
26 import java.util.Iterator;
27 import java.util.NoSuchElementException;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Semaphore;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33 import java.util.concurrent.locks.Lock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock;
35
36 import static org.apache.maven.surefire.booter.Command.BYE_ACK;
37 import static org.apache.maven.surefire.booter.Command.NOOP;
38 import static org.apache.maven.surefire.booter.Command.SKIP_SINCE_NEXT_TEST;
39 import static org.apache.maven.surefire.booter.Command.toShutdown;
40
41
42
43
44
45
46
47 public final class TestLessInputStream
48 extends AbstractCommandStream
49 {
50 private final Semaphore barrier = new Semaphore( 0 );
51
52 private final AtomicBoolean closed = new AtomicBoolean();
53
54 private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
55
56 private final TestLessInputStreamBuilder builder;
57
58 private Iterator<Command> cachableCommands;
59
60 private TestLessInputStream( TestLessInputStreamBuilder builder )
61 {
62 this.builder = builder;
63 }
64
65 @Override
66 public void provideNewTest()
67 {
68 }
69
70 @Override
71 public void skipSinceNextTest()
72 {
73 if ( canContinue() )
74 {
75 immediateCommands.add( SKIP_SINCE_NEXT_TEST );
76 barrier.release();
77 }
78 }
79
80 @Override
81 public void shutdown( Shutdown shutdownType )
82 {
83 if ( canContinue() )
84 {
85 immediateCommands.add( toShutdown( shutdownType ) );
86 barrier.release();
87 }
88 }
89
90 @Override
91 public void noop()
92 {
93 if ( canContinue() )
94 {
95 immediateCommands.add( NOOP );
96 barrier.release();
97 }
98 }
99
100 @Override
101 public void acknowledgeByeEventReceived()
102 {
103 if ( canContinue() )
104 {
105 immediateCommands.add( BYE_ACK );
106 barrier.release();
107 }
108 }
109
110 @Override
111 protected boolean isClosed()
112 {
113 return closed.get();
114 }
115
116 @Override
117 protected Command nextCommand()
118 {
119 Command cmd = immediateCommands.poll();
120 if ( cmd == null )
121 {
122 if ( cachableCommands == null )
123 {
124 cachableCommands = builder.getIterableCachable().iterator();
125 }
126
127 cmd = cachableCommands.next();
128 }
129 return cmd;
130 }
131
132 @Override
133 protected void beforeNextCommand()
134 throws IOException
135 {
136 awaitNextCommand();
137 }
138
139 @Override
140 public void close()
141 {
142 if ( closed.compareAndSet( false, true ) )
143 {
144 invalidateInternalBuffer();
145 barrier.drainPermits();
146 barrier.release();
147 }
148 }
149
150
151
152
153
154
155 int availablePermits()
156 {
157 return barrier.availablePermits();
158 }
159
160 private void awaitNextCommand()
161 throws IOException
162 {
163 try
164 {
165 barrier.acquire();
166 }
167 catch ( InterruptedException e )
168 {
169
170 invalidateInternalBuffer();
171 throw new IOException( e.getLocalizedMessage() );
172 }
173 }
174
175
176
177
178
179
180 public static final class TestLessInputStreamBuilder
181 {
182 private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
183 private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
184 private final ImmediateCommands immediateCommands = new ImmediateCommands();
185 private final CachableCommands cachableCommands = new CachableCommands();
186 private final Node head = new Node( null );
187 private final Iterable<Command> iterableCachable;
188
189 public TestLessInputStreamBuilder()
190 {
191 iterableCachable = new Iterable<Command>()
192 {
193 @Override
194 public Iterator<Command> iterator()
195 {
196 return new CIt();
197 }
198 };
199 }
200
201 public TestLessInputStream build()
202 {
203 Lock lock = rwLock.writeLock();
204 lock.lock();
205 try
206 {
207 TestLessInputStream is = new TestLessInputStream( this );
208 aliveStreams.offer( is );
209 return is;
210 }
211 finally
212 {
213 lock.unlock();
214 }
215 }
216
217 public void removeStream( TestLessInputStream is )
218 {
219 Lock lock = rwLock.writeLock();
220 lock.lock();
221 try
222 {
223 aliveStreams.remove( is );
224 }
225 finally
226 {
227 lock.unlock();
228 }
229 }
230
231 public NotifiableTestStream getImmediateCommands()
232 {
233 return immediateCommands;
234 }
235
236 public NotifiableTestStream getCachableCommands()
237 {
238 return cachableCommands;
239 }
240
241
242
243
244 Iterable<Command> getIterableCachable()
245 {
246 return iterableCachable;
247 }
248
249 @SuppressWarnings( "checkstyle:innerassignment" )
250 private boolean addTailNodeIfAbsent( Command command )
251 {
252 Node newTail = new Node( command );
253 Node currentTail = head;
254 do
255 {
256 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
257 {
258 currentTail = successor;
259 if ( command.equals( currentTail.command ) )
260 {
261 return false;
262 }
263 }
264 } while ( !currentTail.next.compareAndSet( null, newTail ) );
265 return true;
266 }
267
268 private static Node nextCachedNode( Node current )
269 {
270 return current.next.get();
271 }
272
273 private final class CIt
274 implements Iterator<Command>
275 {
276 private Node node = TestLessInputStreamBuilder.this.head;
277
278 @Override
279 public boolean hasNext()
280 {
281 return examineNext( false ) != null;
282 }
283
284 @Override
285 public Command next()
286 {
287 Command command = examineNext( true );
288 if ( command == null )
289 {
290 throw new NoSuchElementException();
291 }
292 return command;
293 }
294
295 @Override
296 public void remove()
297 {
298 throw new UnsupportedOperationException();
299 }
300
301 private Command examineNext( boolean store )
302 {
303 Node next = nextCachedNode( node );
304 if ( store && next != null )
305 {
306 node = next;
307 }
308 return next == null ? null : next.command;
309 }
310 }
311
312
313
314
315 private final class ImmediateCommands
316 implements NotifiableTestStream
317 {
318 @Override
319 public void provideNewTest()
320 {
321 }
322
323 @Override
324 public void skipSinceNextTest()
325 {
326 Lock lock = rwLock.readLock();
327 lock.lock();
328 try
329 {
330 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
331 {
332 aliveStream.skipSinceNextTest();
333 }
334 }
335 finally
336 {
337 lock.unlock();
338 }
339 }
340
341 @Override
342 public void shutdown( Shutdown shutdownType )
343 {
344 Lock lock = rwLock.readLock();
345 lock.lock();
346 try
347 {
348 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
349 {
350 aliveStream.shutdown( shutdownType );
351 }
352 }
353 finally
354 {
355 lock.unlock();
356 }
357 }
358
359 @Override
360 public void noop()
361 {
362 Lock lock = rwLock.readLock();
363 lock.lock();
364 try
365 {
366 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
367 {
368 aliveStream.noop();
369 }
370 }
371 finally
372 {
373 lock.unlock();
374 }
375 }
376
377 @Override
378 public void acknowledgeByeEventReceived()
379 {
380 Lock lock = rwLock.readLock();
381 lock.lock();
382 try
383 {
384 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
385 {
386 aliveStream.acknowledgeByeEventReceived();
387 }
388 }
389 finally
390 {
391 lock.unlock();
392 }
393 }
394 }
395
396
397
398
399 private final class CachableCommands
400 implements NotifiableTestStream
401 {
402 @Override
403 public void provideNewTest()
404 {
405 }
406
407 @Override
408 public void skipSinceNextTest()
409 {
410 Lock lock = rwLock.readLock();
411 lock.lock();
412 try
413 {
414 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
415 {
416 release();
417 }
418 }
419 finally
420 {
421 lock.unlock();
422 }
423 }
424
425 @Override
426 public void shutdown( Shutdown shutdownType )
427 {
428 Lock lock = rwLock.readLock();
429 lock.lock();
430 try
431 {
432 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
433 {
434 release();
435 }
436 }
437 finally
438 {
439 lock.unlock();
440 }
441 }
442
443 @Override
444 public void noop()
445 {
446 Lock lock = rwLock.readLock();
447 lock.lock();
448 try
449 {
450 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( NOOP ) )
451 {
452 release();
453 }
454 }
455 finally
456 {
457 lock.unlock();
458 }
459 }
460
461 @Override
462 public void acknowledgeByeEventReceived()
463 {
464 Lock lock = rwLock.readLock();
465 lock.lock();
466 try
467 {
468 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( BYE_ACK ) )
469 {
470 release();
471 }
472 }
473 finally
474 {
475 lock.unlock();
476 }
477 }
478
479 private void release()
480 {
481 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
482 {
483 aliveStream.barrier.release();
484 }
485 }
486 }
487
488 private static class Node
489 {
490 private final AtomicReference<Node> next = new AtomicReference<>();
491 private final Command command;
492
493 Node( Command command )
494 {
495 this.command = command;
496 }
497 }
498 }
499 }