View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.booter.Command;
23  import org.apache.maven.surefire.booter.Shutdown;
24  
25  import java.io.IOException;
26  import java.util.Iterator;
27  import java.util.NoSuchElementException;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Semaphore;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicReference;
33  import java.util.concurrent.locks.Lock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  import static org.apache.maven.surefire.booter.Command.NOOP;
37  import static org.apache.maven.surefire.booter.Command.SKIP_SINCE_NEXT_TEST;
38  import static org.apache.maven.surefire.booter.Command.toShutdown;
39  
40  /**
41   * Dispatches commands without tests.
42   *
43   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
44   * @since 2.19
45   */
46  public final class TestLessInputStream
47      extends AbstractCommandStream
48  {
49      private final Semaphore barrier = new Semaphore( 0 );
50  
51      private final AtomicBoolean closed = new AtomicBoolean();
52  
53      private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<Command>();
54  
55      private final TestLessInputStreamBuilder builder;
56  
57      private Iterator<Command> cachableCommands;
58  
59      private TestLessInputStream( TestLessInputStreamBuilder builder )
60      {
61          this.builder = builder;
62      }
63  
64      public void provideNewTest()
65      {
66      }
67  
68      public void skipSinceNextTest()
69      {
70          if ( canContinue() )
71          {
72              immediateCommands.add( SKIP_SINCE_NEXT_TEST );
73              barrier.release();
74          }
75      }
76  
77      public void shutdown( Shutdown shutdownType )
78      {
79          if ( canContinue() )
80          {
81              immediateCommands.add( toShutdown( shutdownType ) );
82              barrier.release();
83          }
84      }
85  
86      public void noop()
87      {
88          if ( canContinue() )
89          {
90              immediateCommands.add( NOOP );
91              barrier.release();
92          }
93      }
94  
95      @Override
96      protected boolean isClosed()
97      {
98          return closed.get();
99      }
100 
101     @Override
102     protected boolean canContinue()
103     {
104         return !isClosed();
105     }
106 
107     @Override
108     protected Command nextCommand()
109     {
110         Command cmd = immediateCommands.poll();
111         if ( cmd == null )
112         {
113             if ( cachableCommands == null )
114             {
115                 cachableCommands = builder.getIterableCachable().iterator();
116             }
117 
118             cmd = cachableCommands.next();
119         }
120         return cmd;
121     }
122 
123     @Override
124     protected void beforeNextCommand()
125         throws IOException
126     {
127         awaitNextCommand();
128     }
129 
130     @Override
131     public void close()
132     {
133         if ( closed.compareAndSet( false, true ) )
134         {
135             invalidateInternalBuffer();
136             barrier.drainPermits();
137             barrier.release();
138         }
139     }
140 
141     /**
142      * For testing purposes only.
143      *
144      * @return permits used internally by {@link #beforeNextCommand()}
145      */
146     int availablePermits()
147     {
148         return barrier.availablePermits();
149     }
150 
151     private void awaitNextCommand()
152         throws IOException
153     {
154         try
155         {
156             barrier.acquire();
157         }
158         catch ( InterruptedException e )
159         {
160             // help GC to free this object because StreamFeeder Thread cannot read it anyway after IOE
161             invalidateInternalBuffer();
162             throw new IOException( e.getLocalizedMessage() );
163         }
164     }
165 
166     /**
167      * Builds {@link TestLessInputStream streams}, registers cachable commands
168      * and provides accessible API to dispatch immediate commands to all atomically
169      * alive streams.
170      */
171     public static final class TestLessInputStreamBuilder
172     {
173         private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
174         private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<TestLessInputStream>();
175         private final ImmediateCommands immediateCommands = new ImmediateCommands();
176         private final CachableCommands cachableCommands = new CachableCommands();
177         private final Node head = new Node( null );
178         private final Iterable<Command> iterableCachable;
179 
180         public TestLessInputStreamBuilder()
181         {
182             iterableCachable = new Iterable<Command>()
183             {
184                 public Iterator<Command> iterator()
185                 {
186                     return new CIt();
187                 }
188             };
189         }
190 
191         public TestLessInputStream build()
192         {
193             Lock lock = rwLock.writeLock();
194             lock.lock();
195             try
196             {
197                 TestLessInputStream is = new TestLessInputStream( this );
198                 aliveStreams.offer( is );
199                 return is;
200             }
201             finally
202             {
203                 lock.unlock();
204             }
205         }
206 
207         public void removeStream( TestLessInputStream is )
208         {
209             Lock lock = rwLock.writeLock();
210             lock.lock();
211             try
212             {
213                 aliveStreams.remove( is );
214             }
215             finally
216             {
217                 lock.unlock();
218             }
219         }
220 
221         public NotifiableTestStream getImmediateCommands()
222         {
223             return immediateCommands;
224         }
225 
226         public NotifiableTestStream getCachableCommands()
227         {
228             return cachableCommands;
229         }
230 
231         /**
232          * The iterator is not thread safe.
233          */
234         Iterable<Command> getIterableCachable()
235         {
236             return iterableCachable;
237         }
238 
239         @SuppressWarnings( "checkstyle:innerassignment" )
240         private void addTailNode( Command command )
241         {
242             Node newTail = new Node( command );
243             Node currentTail = head;
244             do
245             {
246                 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
247                 {
248                     currentTail = successor;
249                 }
250             } while ( !currentTail.next.compareAndSet( null, newTail ) );
251         }
252 
253         @SuppressWarnings( "checkstyle:innerassignment" )
254         private boolean addTailNodeIfAbsent( Command command )
255         {
256             Node newTail = new Node( command );
257             Node currentTail = head;
258             do
259             {
260                 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
261                 {
262                     currentTail = successor;
263                     if ( command.equals( currentTail.command ) )
264                     {
265                         return false;
266                     }
267                 }
268             } while ( !currentTail.next.compareAndSet( null, newTail ) );
269             return true;
270         }
271 
272         private static Node nextCachedNode( Node current )
273         {
274             return current.next.get();
275         }
276 
277         private final class CIt
278             implements Iterator<Command>
279         {
280             private Node node = TestLessInputStreamBuilder.this.head;
281 
282             public boolean hasNext()
283             {
284                 return examineNext( false ) != null;
285             }
286 
287             public Command next()
288             {
289                 Command command = examineNext( true );
290                 if ( command == null )
291                 {
292                     throw new NoSuchElementException();
293                 }
294                 return command;
295             }
296 
297             public void remove()
298             {
299                 throw new UnsupportedOperationException();
300             }
301 
302             private Command examineNext( boolean store )
303             {
304                 Node next = nextCachedNode( node );
305                 if ( store && next != null )
306                 {
307                     node = next;
308                 }
309                 return next == null ? null : next.command;
310             }
311         }
312 
313         /**
314          * Event is called just now for all alive streams and command is not persisted.
315          */
316         private final class ImmediateCommands
317             implements NotifiableTestStream
318         {
319             public void provideNewTest()
320             {
321             }
322 
323             public void skipSinceNextTest()
324             {
325                 Lock lock = rwLock.readLock();
326                 lock.lock();
327                 try
328                 {
329                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
330                     {
331                         aliveStream.skipSinceNextTest();
332                     }
333                 }
334                 finally
335                 {
336                     lock.unlock();
337                 }
338             }
339 
340             public void shutdown( Shutdown shutdownType )
341             {
342                 Lock lock = rwLock.readLock();
343                 lock.lock();
344                 try
345                 {
346                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
347                     {
348                         aliveStream.shutdown( shutdownType );
349                     }
350                 }
351                 finally
352                 {
353                     lock.unlock();
354                 }
355             }
356 
357             public void noop()
358             {
359                 Lock lock = rwLock.readLock();
360                 lock.lock();
361                 try
362                 {
363                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
364                     {
365                         aliveStream.noop();
366                     }
367                 }
368                 finally
369                 {
370                     lock.unlock();
371                 }
372             }
373         }
374 
375         /**
376          * Event is persisted.
377          */
378         private final class CachableCommands
379             implements NotifiableTestStream
380         {
381             public void provideNewTest()
382             {
383             }
384 
385             public void skipSinceNextTest()
386             {
387                 Lock lock = rwLock.readLock();
388                 lock.lock();
389                 try
390                 {
391                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
392                     {
393                         release();
394                     }
395                 }
396                 finally
397                 {
398                     lock.unlock();
399                 }
400             }
401 
402             public void shutdown( Shutdown shutdownType )
403             {
404                 Lock lock = rwLock.readLock();
405                 lock.lock();
406                 try
407                 {
408                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
409                     {
410                         release();
411                     }
412                 }
413                 finally
414                 {
415                     lock.unlock();
416                 }
417             }
418 
419             public void noop()
420             {
421                 Lock lock = rwLock.readLock();
422                 lock.lock();
423                 try
424                 {
425                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( NOOP ) )
426                     {
427                         release();
428                     }
429                 }
430                 finally
431                 {
432                     lock.unlock();
433                 }
434             }
435 
436             private void release()
437             {
438                 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
439                 {
440                     aliveStream.barrier.release();
441                 }
442             }
443         }
444 
445         private static class Node
446         {
447             private final AtomicReference<Node> next = new AtomicReference<Node>();
448             private final Command command;
449 
450             Node( Command command )
451             {
452                 this.command = command;
453             }
454         }
455     }
456 }