1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.api.booter.Command;
23 import org.apache.maven.surefire.api.booter.Shutdown;
24
25 import java.io.IOException;
26 import java.util.Iterator;
27 import java.util.NoSuchElementException;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Semaphore;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33 import java.util.concurrent.locks.Lock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock;
35
36 import static org.apache.maven.surefire.api.booter.Command.BYE_ACK;
37 import static org.apache.maven.surefire.api.booter.Command.NOOP;
38 import static org.apache.maven.surefire.api.booter.Command.SKIP_SINCE_NEXT_TEST;
39 import static org.apache.maven.surefire.api.booter.Command.toShutdown;
40
41
42
43
44
45
46
47 public final class TestLessInputStream
48 extends DefaultCommandReader
49 {
50 private final Semaphore barrier = new Semaphore( 0 );
51
52 private final AtomicBoolean closed = new AtomicBoolean();
53
54 private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
55
56 private final TestLessInputStreamBuilder builder;
57
58 private Iterator<Command> cachableCommands;
59
60 private TestLessInputStream( TestLessInputStreamBuilder builder )
61 {
62 this.builder = builder;
63 }
64
65 @Override
66 public void provideNewTest()
67 {
68 }
69
70 @Override
71 public void skipSinceNextTest()
72 {
73 if ( canContinue() )
74 {
75 immediateCommands.add( SKIP_SINCE_NEXT_TEST );
76 barrier.release();
77 }
78 }
79
80 @Override
81 public void shutdown( Shutdown shutdownType )
82 {
83 if ( canContinue() )
84 {
85 immediateCommands.add( toShutdown( shutdownType ) );
86 barrier.release();
87 }
88 }
89
90 @Override
91 public void noop()
92 {
93 if ( canContinue() )
94 {
95 immediateCommands.add( NOOP );
96 barrier.release();
97 }
98 }
99
100 @Override
101 public void acknowledgeByeEventReceived()
102 {
103 if ( canContinue() )
104 {
105 immediateCommands.add( BYE_ACK );
106 barrier.release();
107 }
108 }
109
110 @Override
111 public boolean isClosed()
112 {
113 return closed.get();
114 }
115
116 @Override
117 protected Command nextCommand()
118 {
119 Command cmd = immediateCommands.poll();
120 if ( cmd == null )
121 {
122 if ( cachableCommands == null )
123 {
124 cachableCommands = builder.getIterableCachable().iterator();
125 }
126
127 cmd = cachableCommands.next();
128 }
129 return cmd;
130 }
131
132 @Override
133 protected void beforeNextCommand()
134 throws IOException
135 {
136 awaitNextCommand();
137 }
138
139 @Override
140 public void close()
141 {
142 if ( closed.compareAndSet( false, true ) )
143 {
144 barrier.drainPermits();
145 barrier.release();
146 }
147 }
148
149
150
151
152
153
154 int availablePermits()
155 {
156 return barrier.availablePermits();
157 }
158
159 private void awaitNextCommand()
160 throws IOException
161 {
162 try
163 {
164 barrier.acquire();
165 }
166 catch ( InterruptedException e )
167 {
168 throw new IOException( e.getLocalizedMessage() );
169 }
170 }
171
172
173
174
175
176
177 public static final class TestLessInputStreamBuilder
178 {
179 private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
180 private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
181 private final ImmediateCommands immediateCommands = new ImmediateCommands();
182 private final CachableCommands cachableCommands = new CachableCommands();
183 private final Node head = new Node( null );
184 private final Iterable<Command> iterableCachable;
185
186 public TestLessInputStreamBuilder()
187 {
188 iterableCachable = new Iterable<Command>()
189 {
190 @Override
191 public Iterator<Command> iterator()
192 {
193 return new CIt();
194 }
195 };
196 }
197
198 public TestLessInputStream build()
199 {
200 Lock lock = rwLock.writeLock();
201 lock.lock();
202 try
203 {
204 TestLessInputStream is = new TestLessInputStream( this );
205 aliveStreams.offer( is );
206 return is;
207 }
208 finally
209 {
210 lock.unlock();
211 }
212 }
213
214 public void removeStream( TestLessInputStream is )
215 {
216 Lock lock = rwLock.writeLock();
217 lock.lock();
218 try
219 {
220 aliveStreams.remove( is );
221 }
222 finally
223 {
224 lock.unlock();
225 }
226 }
227
228
229
230
231
232
233
234
235
236 public NotifiableTestStream getImmediateCommands()
237 {
238 return immediateCommands;
239 }
240
241
242
243
244
245
246
247
248
249 public NotifiableTestStream getCachableCommands()
250 {
251 return cachableCommands;
252 }
253
254
255
256
257 Iterable<Command> getIterableCachable()
258 {
259 return iterableCachable;
260 }
261
262 @SuppressWarnings( "checkstyle:innerassignment" )
263 private boolean addTailNodeIfAbsent( Command command )
264 {
265 Node newTail = new Node( command );
266 Node currentTail = head;
267 do
268 {
269 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
270 {
271 currentTail = successor;
272 if ( command.equals( currentTail.command ) )
273 {
274 return false;
275 }
276 }
277 } while ( !currentTail.next.compareAndSet( null, newTail ) );
278 return true;
279 }
280
281 private static Node nextCachedNode( Node current )
282 {
283 return current.next.get();
284 }
285
286 private final class CIt
287 implements Iterator<Command>
288 {
289 private Node node = TestLessInputStreamBuilder.this.head;
290
291 @Override
292 public boolean hasNext()
293 {
294 return examineNext( false ) != null;
295 }
296
297 @Override
298 public Command next()
299 {
300 Command command = examineNext( true );
301 if ( command == null )
302 {
303 throw new NoSuchElementException();
304 }
305 return command;
306 }
307
308 @Override
309 public void remove()
310 {
311 throw new UnsupportedOperationException();
312 }
313
314 private Command examineNext( boolean store )
315 {
316 Node next = nextCachedNode( node );
317 if ( store && next != null )
318 {
319 node = next;
320 }
321 return next == null ? null : next.command;
322 }
323 }
324
325
326
327
328 private final class ImmediateCommands
329 implements NotifiableTestStream
330 {
331 @Override
332 public void provideNewTest()
333 {
334 throw new UnsupportedOperationException();
335 }
336
337 @Override
338 public void skipSinceNextTest()
339 {
340 throw new UnsupportedOperationException();
341 }
342
343 @Override
344 public void shutdown( Shutdown shutdownType )
345 {
346 Lock lock = rwLock.readLock();
347 lock.lock();
348 try
349 {
350 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
351 {
352 aliveStream.shutdown( shutdownType );
353 }
354 }
355 finally
356 {
357 lock.unlock();
358 }
359 }
360
361 @Override
362 public void noop()
363 {
364 Lock lock = rwLock.readLock();
365 lock.lock();
366 try
367 {
368 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
369 {
370 aliveStream.noop();
371 }
372 }
373 finally
374 {
375 lock.unlock();
376 }
377 }
378
379 @Override
380 public void acknowledgeByeEventReceived()
381 {
382 throw new UnsupportedOperationException();
383 }
384 }
385
386
387
388
389 private final class CachableCommands
390 implements NotifiableTestStream
391 {
392 @Override
393 public void provideNewTest()
394 {
395 throw new UnsupportedOperationException();
396 }
397
398 @Override
399 public void skipSinceNextTest()
400 {
401 Lock lock = rwLock.readLock();
402 lock.lock();
403 try
404 {
405 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
406 {
407 release();
408 }
409 }
410 finally
411 {
412 lock.unlock();
413 }
414 }
415
416 @Override
417 public void shutdown( Shutdown shutdownType )
418 {
419 Lock lock = rwLock.readLock();
420 lock.lock();
421 try
422 {
423 if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
424 {
425 release();
426 }
427 }
428 finally
429 {
430 lock.unlock();
431 }
432 }
433
434 @Override
435 public void noop()
436 {
437 throw new UnsupportedOperationException();
438 }
439
440 @Override
441 public void acknowledgeByeEventReceived()
442 {
443 throw new UnsupportedOperationException();
444 }
445
446 private void release()
447 {
448 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
449 {
450 aliveStream.barrier.release();
451 }
452 }
453 }
454
455 private static class Node
456 {
457 private final AtomicReference<Node> next = new AtomicReference<>();
458 private final Command command;
459
460 Node( Command command )
461 {
462 this.command = command;
463 }
464 }
465 }
466 }