1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.booter.Command;
23 import org.apache.maven.surefire.booter.Shutdown;
24
25 import java.io.IOException;
26 import java.util.Iterator;
27 import java.util.NoSuchElementException;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Semaphore;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33 import java.util.concurrent.locks.Lock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock;
35
36 import static org.apache.maven.surefire.booter.Command.BYE_ACK;
37 import static org.apache.maven.surefire.booter.Command.NOOP;
38 import static org.apache.maven.surefire.booter.Command.SKIP_SINCE_NEXT_TEST;
39 import static org.apache.maven.surefire.booter.Command.toShutdown;
40
41
42
43
44
45
46
47 public final class TestLessInputStream
48 extends AbstractCommandStream
49 {
50 private final Semaphore barrier = new Semaphore( 0 );
51
52 private final AtomicBoolean closed = new AtomicBoolean();
53
54 private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<Command>();
55
56 private final TestLessInputStreamBuilder builder;
57
58 private Iterator<Command> cachableCommands;
59
60 private TestLessInputStream( TestLessInputStreamBuilder builder )
61 {
62 this.builder = builder;
63 }
64
65 public void provideNewTest()
66 {
67 }
68
69 public void skipSinceNextTest()
70 {
71 if ( canContinue() )
72 {
73 immediateCommands.add( SKIP_SINCE_NEXT_TEST );
74 barrier.release();
75 }
76 }
77
78 public void shutdown( Shutdown shutdownType )
79 {
80 if ( canContinue() )
81 {
82 immediateCommands.add( toShutdown( shutdownType ) );
83 barrier.release();
84 }
85 }
86
87 public void noop()
88 {
89 if ( canContinue() )
90 {
91 immediateCommands.add( NOOP );
92 barrier.release();
93 }
94 }
95
96 @Override
97 public void acknowledgeByeEventReceived()
98 {
99 if ( canContinue() )
100 {
101 immediateCommands.add( BYE_ACK );
102 barrier.release();
103 }
104 }
105
106 @Override
107 protected boolean isClosed()
108 {
109 return closed.get();
110 }
111
112 @Override
113 protected Command nextCommand()
114 {
115 Command cmd = immediateCommands.poll();
116 if ( cmd == null )
117 {
118 if ( cachableCommands == null )
119 {
120 cachableCommands = builder.getIterableCachable().iterator();
121 }
122
123 cmd = cachableCommands.next();
124 }
125 return cmd;
126 }
127
128 @Override
129 protected void beforeNextCommand()
130 throws IOException
131 {
132 awaitNextCommand();
133 }
134
135 @Override
136 public void close()
137 {
138 if ( closed.compareAndSet( false, true ) )
139 {
140 invalidateInternalBuffer();
141 barrier.drainPermits();
142 barrier.release();
143 }
144 }
145
146
147
148
149
150
151 int availablePermits()
152 {
153 return barrier.availablePermits();
154 }
155
156 private void awaitNextCommand()
157 throws IOException
158 {
159 try
160 {
161 barrier.acquire();
162 }
163 catch ( InterruptedException e )
164 {
165
166 invalidateInternalBuffer();
167 throw new IOException( e.getLocalizedMessage() );
168 }
169 }
170
171
172
173
174
175
176 public static final class TestLessInputStreamBuilder
177 {
178 private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
179 private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<TestLessInputStream>();
180 private final ImmediateCommands immediateCommands = new ImmediateCommands();
181 private final CachableCommands cachableCommands = new CachableCommands();
182 private final Node head = new Node( null );
183 private final Iterable<Command> iterableCachable;
184
185 public TestLessInputStreamBuilder()
186 {
187 iterableCachable = new Iterable<Command>()
188 {
189 public Iterator<Command> iterator()
190 {
191 return new CIt();
192 }
193 };
194 }
195
196 public TestLessInputStream build()
197 {
198 Lock lock = rwLock.writeLock();
199 lock.lock();
200 try
201 {
202 TestLessInputStream is = new TestLessInputStream( this );
203 aliveStreams.offer( is );
204 return is;
205 }
206 finally
207 {
208 lock.unlock();
209 }
210 }
211
212 public void removeStream( TestLessInputStream is )
213 {
214 Lock lock = rwLock.writeLock();
215 lock.lock();
216 try
217 {
218 aliveStreams.remove( is );
219 }
220 finally
221 {
222 lock.unlock();
223 }
224 }
225
226 public NotifiableTestStream getImmediateCommands()
227 {
228 return immediateCommands;
229 }
230
231 public NotifiableTestStream getCachableCommands()
232 {
233 return cachableCommands;
234 }
235
236
237
238
239 Iterable<Command> getIterableCachable()
240 {
241 return iterableCachable;
242 }
243
244 @SuppressWarnings( "checkstyle:innerassignment" )
245 private boolean addTailNodeIfAbsent( Command command )
246 {
247 Node newTail = new Node( command );
248 Node currentTail = head;
249 do
250 {
251 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
252 {
253 currentTail = successor;
254 if ( command.equals( currentTail.command ) )
255 {
256 return false;
257 }
258 }
259 } while ( !currentTail.next.compareAndSet( null, newTail ) );
260 return true;
261 }
262
263 private static Node nextCachedNode( Node current )
264 {
265 return current.next.get();
266 }
267
268 private final class CIt
269 implements Iterator<Command>
270 {
271 private Node node = TestLessInputStreamBuilder.this.head;
272
273 public boolean hasNext()
274 {
275 return examineNext( false ) != null;
276 }
277
278 public Command next()
279 {
280 Command command = examineNext( true );
281 if ( command == null )
282 {
283 throw new NoSuchElementException();
284 }
285 return command;
286 }
287
288 public void remove()
289 {
290 throw new UnsupportedOperationException();
291 }
292
293 private Command examineNext( boolean store )
294 {
295 Node next = nextCachedNode( node );
296 if ( store && next != null )
297 {
298 node = next;
299 }
300 return next == null ? null : next.command;
301 }
302 }
303
304
305
306
307 private final class ImmediateCommands
308 implements NotifiableTestStream
309 {
310 public void provideNewTest()
311 {
312 }
313
314 public void skipSinceNextTest()
315 {
316 Lock lock = rwLock.readLock();
317 lock.lock();
318 try
319 {
320 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
321 {
322 aliveStream.skipSinceNextTest();
323 }
324 }
325 finally
326 {
327 lock.unlock();
328 }
329 }
330
331 public void shutdown( Shutdown shutdownType )
332 {
333 Lock lock = rwLock.readLock();
334 lock.lock();
335 try
336 {
337 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
338 {
339 aliveStream.shutdown( shutdownType );
340 }
341 }
342 finally
343 {
344 lock.unlock();
345 }
346 }
347
348 public void noop()
349 {
350 Lock lock = rwLock.readLock();
351 lock.lock();
352 try
353 {
354 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
355 {
356 aliveStream.noop();
357 }
358 }
359 finally
360 {
361 lock.unlock();
362 }
363 }
364
365 @Override
366 public void acknowledgeByeEventReceived()
367 {
368 Lock lock = rwLock.readLock();
369 lock.lock();
370 try
371 {
372 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
373 {
374 aliveStream.acknowledgeByeEventReceived();
375 }
376 }
377 finally
378 {
379 lock.unlock();
380 }
381 }
382 }
383
384
385
386
387 private final class CachableCommands
388 implements NotifiableTestStream
389 {
390 public void provideNewTest()
391 {
392 }
393
394 public void skipSinceNextTest()
395 {
396 Lock lock = rwLock.readLock();
397 lock.lock();
398 try
399 {
400 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
401 {
402 release();
403 }
404 }
405 finally
406 {
407 lock.unlock();
408 }
409 }
410
411 public void shutdown( Shutdown shutdownType )
412 {
413 Lock lock = rwLock.readLock();
414 lock.lock();
415 try
416 {
417 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
418 {
419 release();
420 }
421 }
422 finally
423 {
424 lock.unlock();
425 }
426 }
427
428 public void noop()
429 {
430 Lock lock = rwLock.readLock();
431 lock.lock();
432 try
433 {
434 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( NOOP ) )
435 {
436 release();
437 }
438 }
439 finally
440 {
441 lock.unlock();
442 }
443 }
444
445 @Override
446 public void acknowledgeByeEventReceived()
447 {
448 Lock lock = rwLock.readLock();
449 lock.lock();
450 try
451 {
452 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( BYE_ACK ) )
453 {
454 release();
455 }
456 }
457 finally
458 {
459 lock.unlock();
460 }
461 }
462
463 private void release()
464 {
465 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
466 {
467 aliveStream.barrier.release();
468 }
469 }
470 }
471
472 private static class Node
473 {
474 private final AtomicReference<Node> next = new AtomicReference<Node>();
475 private final Command command;
476
477 Node( Command command )
478 {
479 this.command = command;
480 }
481 }
482 }
483 }