View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.booter.Command;
23  import org.apache.maven.surefire.booter.Shutdown;
24  
25  import java.io.IOException;
26  import java.util.Iterator;
27  import java.util.NoSuchElementException;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Semaphore;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicReference;
33  import java.util.concurrent.locks.Lock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  import static org.apache.maven.surefire.booter.Command.BYE_ACK;
37  import static org.apache.maven.surefire.booter.Command.NOOP;
38  import static org.apache.maven.surefire.booter.Command.SKIP_SINCE_NEXT_TEST;
39  import static org.apache.maven.surefire.booter.Command.toShutdown;
40  
41  /**
42   * Dispatches commands without tests.
43   *
44   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
45   * @since 2.19
46   */
47  public final class TestLessInputStream
48      extends AbstractCommandStream
49  {
50      private final Semaphore barrier = new Semaphore( 0 );
51  
52      private final AtomicBoolean closed = new AtomicBoolean();
53  
54      private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
55  
56      private final TestLessInputStreamBuilder builder;
57  
58      private Iterator<Command> cachableCommands;
59  
60      private TestLessInputStream( TestLessInputStreamBuilder builder )
61      {
62          this.builder = builder;
63      }
64  
65      @Override
66      public void provideNewTest()
67      {
68      }
69  
70      @Override
71      public void skipSinceNextTest()
72      {
73          if ( canContinue() )
74          {
75              immediateCommands.add( SKIP_SINCE_NEXT_TEST );
76              barrier.release();
77          }
78      }
79  
80      @Override
81      public void shutdown( Shutdown shutdownType )
82      {
83          if ( canContinue() )
84          {
85              immediateCommands.add( toShutdown( shutdownType ) );
86              barrier.release();
87          }
88      }
89  
90      @Override
91      public void noop()
92      {
93          if ( canContinue() )
94          {
95              immediateCommands.add( NOOP );
96              barrier.release();
97          }
98      }
99  
100     @Override
101     public void acknowledgeByeEventReceived()
102     {
103         if ( canContinue() )
104         {
105             immediateCommands.add( BYE_ACK );
106             barrier.release();
107         }
108     }
109 
110     @Override
111     protected boolean isClosed()
112     {
113         return closed.get();
114     }
115 
116     @Override
117     protected Command nextCommand()
118     {
119         Command cmd = immediateCommands.poll();
120         if ( cmd == null )
121         {
122             if ( cachableCommands == null )
123             {
124                 cachableCommands = builder.getIterableCachable().iterator();
125             }
126 
127             cmd = cachableCommands.next();
128         }
129         return cmd;
130     }
131 
132     @Override
133     protected void beforeNextCommand()
134         throws IOException
135     {
136         awaitNextCommand();
137     }
138 
139     @Override
140     public void close()
141     {
142         if ( closed.compareAndSet( false, true ) )
143         {
144             invalidateInternalBuffer();
145             barrier.drainPermits();
146             barrier.release();
147         }
148     }
149 
150     /**
151      * For testing purposes only.
152      *
153      * @return permits used internally by {@link #beforeNextCommand()}
154      */
155     int availablePermits()
156     {
157         return barrier.availablePermits();
158     }
159 
160     private void awaitNextCommand()
161         throws IOException
162     {
163         try
164         {
165             barrier.acquire();
166         }
167         catch ( InterruptedException e )
168         {
169             // help GC to free this object because StreamFeeder Thread cannot read it anyway after IOE
170             invalidateInternalBuffer();
171             throw new IOException( e.getLocalizedMessage() );
172         }
173     }
174 
175     /**
176      * Builds {@link TestLessInputStream streams}, registers cachable commands
177      * and provides accessible API to dispatch immediate commands to all atomically
178      * alive streams.
179      */
180     public static final class TestLessInputStreamBuilder
181     {
182         private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
183         private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
184         private final ImmediateCommands immediateCommands = new ImmediateCommands();
185         private final CachableCommands cachableCommands = new CachableCommands();
186         private final Node head = new Node( null );
187         private final Iterable<Command> iterableCachable;
188 
189         public TestLessInputStreamBuilder()
190         {
191             iterableCachable = new Iterable<Command>()
192             {
193                 @Override
194                 public Iterator<Command> iterator()
195                 {
196                     return new CIt();
197                 }
198             };
199         }
200 
201         public TestLessInputStream build()
202         {
203             Lock lock = rwLock.writeLock();
204             lock.lock();
205             try
206             {
207                 TestLessInputStream is = new TestLessInputStream( this );
208                 aliveStreams.offer( is );
209                 return is;
210             }
211             finally
212             {
213                 lock.unlock();
214             }
215         }
216 
217         public void removeStream( TestLessInputStream is )
218         {
219             Lock lock = rwLock.writeLock();
220             lock.lock();
221             try
222             {
223                 aliveStreams.remove( is );
224             }
225             finally
226             {
227                 lock.unlock();
228             }
229         }
230 
231         public NotifiableTestStream getImmediateCommands()
232         {
233             return immediateCommands;
234         }
235 
236         public NotifiableTestStream getCachableCommands()
237         {
238             return cachableCommands;
239         }
240 
241         /**
242          * The iterator is not thread safe.
243          */
244         Iterable<Command> getIterableCachable()
245         {
246             return iterableCachable;
247         }
248 
249         @SuppressWarnings( "checkstyle:innerassignment" )
250         private boolean addTailNodeIfAbsent( Command command )
251         {
252             Node newTail = new Node( command );
253             Node currentTail = head;
254             do
255             {
256                 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
257                 {
258                     currentTail = successor;
259                     if ( command.equals( currentTail.command ) )
260                     {
261                         return false;
262                     }
263                 }
264             } while ( !currentTail.next.compareAndSet( null, newTail ) );
265             return true;
266         }
267 
268         private static Node nextCachedNode( Node current )
269         {
270             return current.next.get();
271         }
272 
273         private final class CIt
274             implements Iterator<Command>
275         {
276             private Node node = TestLessInputStreamBuilder.this.head;
277 
278             @Override
279             public boolean hasNext()
280             {
281                 return examineNext( false ) != null;
282             }
283 
284             @Override
285             public Command next()
286             {
287                 Command command = examineNext( true );
288                 if ( command == null )
289                 {
290                     throw new NoSuchElementException();
291                 }
292                 return command;
293             }
294 
295             @Override
296             public void remove()
297             {
298                 throw new UnsupportedOperationException();
299             }
300 
301             private Command examineNext( boolean store )
302             {
303                 Node next = nextCachedNode( node );
304                 if ( store && next != null )
305                 {
306                     node = next;
307                 }
308                 return next == null ? null : next.command;
309             }
310         }
311 
312         /**
313          * Event is called just now for all alive streams and command is not persisted.
314          */
315         private final class ImmediateCommands
316             implements NotifiableTestStream
317         {
318             @Override
319             public void provideNewTest()
320             {
321             }
322 
323             @Override
324             public void skipSinceNextTest()
325             {
326                 Lock lock = rwLock.readLock();
327                 lock.lock();
328                 try
329                 {
330                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
331                     {
332                         aliveStream.skipSinceNextTest();
333                     }
334                 }
335                 finally
336                 {
337                     lock.unlock();
338                 }
339             }
340 
341             @Override
342             public void shutdown( Shutdown shutdownType )
343             {
344                 Lock lock = rwLock.readLock();
345                 lock.lock();
346                 try
347                 {
348                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
349                     {
350                         aliveStream.shutdown( shutdownType );
351                     }
352                 }
353                 finally
354                 {
355                     lock.unlock();
356                 }
357             }
358 
359             @Override
360             public void noop()
361             {
362                 Lock lock = rwLock.readLock();
363                 lock.lock();
364                 try
365                 {
366                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
367                     {
368                         aliveStream.noop();
369                     }
370                 }
371                 finally
372                 {
373                     lock.unlock();
374                 }
375             }
376 
377             @Override
378             public void acknowledgeByeEventReceived()
379             {
380                 Lock lock = rwLock.readLock();
381                 lock.lock();
382                 try
383                 {
384                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
385                     {
386                         aliveStream.acknowledgeByeEventReceived();
387                     }
388                 }
389                 finally
390                 {
391                     lock.unlock();
392                 }
393             }
394         }
395 
396         /**
397          * Event is persisted.
398          */
399         private final class CachableCommands
400             implements NotifiableTestStream
401         {
402             @Override
403             public void provideNewTest()
404             {
405             }
406 
407             @Override
408             public void skipSinceNextTest()
409             {
410                 Lock lock = rwLock.readLock();
411                 lock.lock();
412                 try
413                 {
414                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
415                     {
416                         release();
417                     }
418                 }
419                 finally
420                 {
421                     lock.unlock();
422                 }
423             }
424 
425             @Override
426             public void shutdown( Shutdown shutdownType )
427             {
428                 Lock lock = rwLock.readLock();
429                 lock.lock();
430                 try
431                 {
432                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
433                     {
434                         release();
435                     }
436                 }
437                 finally
438                 {
439                     lock.unlock();
440                 }
441             }
442 
443             @Override
444             public void noop()
445             {
446                 Lock lock = rwLock.readLock();
447                 lock.lock();
448                 try
449                 {
450                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( NOOP ) )
451                     {
452                         release();
453                     }
454                 }
455                 finally
456                 {
457                     lock.unlock();
458                 }
459             }
460 
461             @Override
462             public void acknowledgeByeEventReceived()
463             {
464                 Lock lock = rwLock.readLock();
465                 lock.lock();
466                 try
467                 {
468                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( BYE_ACK ) )
469                     {
470                         release();
471                     }
472                 }
473                 finally
474                 {
475                     lock.unlock();
476                 }
477             }
478 
479             private void release()
480             {
481                 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
482                 {
483                     aliveStream.barrier.release();
484                 }
485             }
486         }
487 
488         private static class Node
489         {
490             private final AtomicReference<Node> next = new AtomicReference<>();
491             private final Command command;
492 
493             Node( Command command )
494             {
495                 this.command = command;
496             }
497         }
498     }
499 }