View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.api.booter.Command;
23  import org.apache.maven.surefire.api.booter.Shutdown;
24  
25  import java.io.IOException;
26  import java.util.Iterator;
27  import java.util.NoSuchElementException;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Semaphore;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicReference;
33  import java.util.concurrent.locks.Lock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  import static org.apache.maven.surefire.api.booter.Command.BYE_ACK;
37  import static org.apache.maven.surefire.api.booter.Command.NOOP;
38  import static org.apache.maven.surefire.api.booter.Command.SKIP_SINCE_NEXT_TEST;
39  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
40  
41  /**
42   * Dispatches commands without tests.
43   *
44   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
45   * @since 2.19
46   */
47  public final class TestLessInputStream
48          extends DefaultCommandReader
49  {
50      private final Semaphore barrier = new Semaphore( 0 );
51  
52      private final AtomicBoolean closed = new AtomicBoolean();
53  
54      private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
55  
56      private final TestLessInputStreamBuilder builder;
57  
58      private Iterator<Command> cachableCommands;
59  
60      private TestLessInputStream( TestLessInputStreamBuilder builder )
61      {
62          this.builder = builder;
63      }
64  
65      @Override
66      public void provideNewTest()
67      {
68      }
69  
70      @Override
71      public void skipSinceNextTest()
72      {
73          if ( canContinue() )
74          {
75              immediateCommands.add( SKIP_SINCE_NEXT_TEST );
76              barrier.release();
77          }
78      }
79  
80      @Override
81      public void shutdown( Shutdown shutdownType )
82      {
83          if ( canContinue() )
84          {
85              immediateCommands.add( toShutdown( shutdownType ) );
86              barrier.release();
87          }
88      }
89  
90      @Override
91      public void noop()
92      {
93          if ( canContinue() )
94          {
95              immediateCommands.add( NOOP );
96              barrier.release();
97          }
98      }
99  
100     @Override
101     public void acknowledgeByeEventReceived()
102     {
103         if ( canContinue() )
104         {
105             immediateCommands.add( BYE_ACK );
106             barrier.release();
107         }
108     }
109 
110     @Override
111     public boolean isClosed()
112     {
113         return closed.get();
114     }
115 
116     @Override
117     protected Command nextCommand()
118     {
119         Command cmd = immediateCommands.poll();
120         if ( cmd == null )
121         {
122             if ( cachableCommands == null )
123             {
124                 cachableCommands = builder.getIterableCachable().iterator();
125             }
126 
127             cmd = cachableCommands.next();
128         }
129         return cmd;
130     }
131 
132     @Override
133     protected void beforeNextCommand()
134         throws IOException
135     {
136         awaitNextCommand();
137     }
138 
139     @Override
140     public void close()
141     {
142         if ( closed.compareAndSet( false, true ) )
143         {
144             barrier.drainPermits();
145             barrier.release();
146         }
147     }
148 
149     /**
150      * For testing purposes only.
151      *
152      * @return permits used internally by {@link #beforeNextCommand()}
153      */
154     int availablePermits()
155     {
156         return barrier.availablePermits();
157     }
158 
159     private void awaitNextCommand()
160         throws IOException
161     {
162         try
163         {
164             barrier.acquire();
165         }
166         catch ( InterruptedException e )
167         {
168             throw new IOException( e.getLocalizedMessage() );
169         }
170     }
171 
172     /**
173      * Builds {@link TestLessInputStream streams}, registers cachable commands
174      * and provides accessible API to dispatch immediate commands to all atomically
175      * alive streams.
176      */
177     public static final class TestLessInputStreamBuilder
178     {
179         private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
180         private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
181         private final ImmediateCommands immediateCommands = new ImmediateCommands();
182         private final CachableCommands cachableCommands = new CachableCommands();
183         private final Node head = new Node( null );
184         private final Iterable<Command> iterableCachable;
185 
186         public TestLessInputStreamBuilder()
187         {
188             iterableCachable = new Iterable<Command>()
189             {
190                 @Override
191                 public Iterator<Command> iterator()
192                 {
193                     return new CIt();
194                 }
195             };
196         }
197 
198         public TestLessInputStream build()
199         {
200             Lock lock = rwLock.writeLock();
201             lock.lock();
202             try
203             {
204                 TestLessInputStream is = new TestLessInputStream( this );
205                 aliveStreams.offer( is );
206                 return is;
207             }
208             finally
209             {
210                 lock.unlock();
211             }
212         }
213 
214         public void removeStream( TestLessInputStream is )
215         {
216             Lock lock = rwLock.writeLock();
217             lock.lock();
218             try
219             {
220                 aliveStreams.remove( is );
221             }
222             finally
223             {
224                 lock.unlock();
225             }
226         }
227 
228         /**
229          * Only {@link NotifiableTestStream#noop()} and {@link NotifiableTestStream#shutdown(Shutdown)} are supported.
230          * Another methods throw {@link UnsupportedOperationException}.
231          *
232          * @return commands which are immediately transmitted once to all alive forked JVMs, not cached. As opposite
233          * to cached commands, the immediate commands disappear and cannot be seen by any fork initiated after
234          * the command has dispatched.
235          */
236         public NotifiableTestStream getImmediateCommands()
237         {
238             return immediateCommands;
239         }
240 
241         /**
242          * Cached commands are sent to all alive or future alive forks. These are termination commands which are not
243          * reversible and therefore only {@link NotifiableTestStream#shutdown(Shutdown)} and
244          * {@link NotifiableTestStream#skipSinceNextTest()} are supported.
245          * Another methods throw {@link UnsupportedOperationException}.
246          *
247          * @return commands which are cached for currently alive or future forks.
248          */
249         public NotifiableTestStream getCachableCommands()
250         {
251             return cachableCommands;
252         }
253 
254         /**
255          * The iterator is not thread safe.
256          */
257         Iterable<Command> getIterableCachable()
258         {
259             return iterableCachable;
260         }
261 
262         @SuppressWarnings( "checkstyle:innerassignment" )
263         private boolean addTailNodeIfAbsent( Command command )
264         {
265             Node newTail = new Node( command );
266             Node currentTail = head;
267             do
268             {
269                 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
270                 {
271                     currentTail = successor;
272                     if ( command.equals( currentTail.command ) )
273                     {
274                         return false;
275                     }
276                 }
277             } while ( !currentTail.next.compareAndSet( null, newTail ) );
278             return true;
279         }
280 
281         private static Node nextCachedNode( Node current )
282         {
283             return current.next.get();
284         }
285 
286         private final class CIt
287             implements Iterator<Command>
288         {
289             private Node node = TestLessInputStreamBuilder.this.head;
290 
291             @Override
292             public boolean hasNext()
293             {
294                 return examineNext( false ) != null;
295             }
296 
297             @Override
298             public Command next()
299             {
300                 Command command = examineNext( true );
301                 if ( command == null )
302                 {
303                     throw new NoSuchElementException();
304                 }
305                 return command;
306             }
307 
308             @Override
309             public void remove()
310             {
311                 throw new UnsupportedOperationException();
312             }
313 
314             private Command examineNext( boolean store )
315             {
316                 Node next = nextCachedNode( node );
317                 if ( store && next != null )
318                 {
319                     node = next;
320                 }
321                 return next == null ? null : next.command;
322             }
323         }
324 
325         /**
326          * Event is called just now for all alive streams and command is not persisted.
327          */
328         private final class ImmediateCommands
329             implements NotifiableTestStream
330         {
331             @Override
332             public void provideNewTest()
333             {
334                 throw new UnsupportedOperationException();
335             }
336 
337             @Override
338             public void skipSinceNextTest()
339             {
340                 throw new UnsupportedOperationException();
341             }
342 
343             @Override
344             public void shutdown( Shutdown shutdownType )
345             {
346                 Lock lock = rwLock.readLock();
347                 lock.lock();
348                 try
349                 {
350                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
351                     {
352                         aliveStream.shutdown( shutdownType );
353                     }
354                 }
355                 finally
356                 {
357                     lock.unlock();
358                 }
359             }
360 
361             @Override
362             public void noop()
363             {
364                 Lock lock = rwLock.readLock();
365                 lock.lock();
366                 try
367                 {
368                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
369                     {
370                         aliveStream.noop();
371                     }
372                 }
373                 finally
374                 {
375                     lock.unlock();
376                 }
377             }
378 
379             @Override
380             public void acknowledgeByeEventReceived()
381             {
382                 throw new UnsupportedOperationException();
383             }
384         }
385 
386         /**
387          * Event is persisted.
388          */
389         private final class CachableCommands
390             implements NotifiableTestStream
391         {
392             @Override
393             public void provideNewTest()
394             {
395                 throw new UnsupportedOperationException();
396             }
397 
398             @Override
399             public void skipSinceNextTest()
400             {
401                 Lock lock = rwLock.readLock();
402                 lock.lock();
403                 try
404                 {
405                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
406                     {
407                         release();
408                     }
409                 }
410                 finally
411                 {
412                     lock.unlock();
413                 }
414             }
415 
416             @Override
417             public void shutdown( Shutdown shutdownType )
418             {
419                 Lock lock = rwLock.readLock();
420                 lock.lock();
421                 try
422                 {
423                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
424                     {
425                         release();
426                     }
427                 }
428                 finally
429                 {
430                     lock.unlock();
431                 }
432             }
433 
434             @Override
435             public void noop()
436             {
437                 throw new UnsupportedOperationException();
438             }
439 
440             @Override
441             public void acknowledgeByeEventReceived()
442             {
443                 throw new UnsupportedOperationException();
444             }
445 
446             private void release()
447             {
448                 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
449                 {
450                     aliveStream.barrier.release();
451                 }
452             }
453         }
454 
455         private static class Node
456         {
457             private final AtomicReference<Node> next = new AtomicReference<>();
458             private final Command command;
459 
460             Node( Command command )
461             {
462                 this.command = command;
463             }
464         }
465     }
466 }