View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.booter.Command;
23  import org.apache.maven.surefire.booter.Shutdown;
24  
25  import java.io.IOException;
26  import java.util.Iterator;
27  import java.util.NoSuchElementException;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Semaphore;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicReference;
33  import java.util.concurrent.locks.Lock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  import static org.apache.maven.surefire.booter.Command.BYE_ACK;
37  import static org.apache.maven.surefire.booter.Command.NOOP;
38  import static org.apache.maven.surefire.booter.Command.SKIP_SINCE_NEXT_TEST;
39  import static org.apache.maven.surefire.booter.Command.toShutdown;
40  
41  /**
42   * Dispatches commands without tests.
43   *
44   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
45   * @since 2.19
46   */
47  public final class TestLessInputStream
48      extends AbstractCommandStream
49  {
50      private final Semaphore barrier = new Semaphore( 0 );
51  
52      private final AtomicBoolean closed = new AtomicBoolean();
53  
54      private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<Command>();
55  
56      private final TestLessInputStreamBuilder builder;
57  
58      private Iterator<Command> cachableCommands;
59  
60      private TestLessInputStream( TestLessInputStreamBuilder builder )
61      {
62          this.builder = builder;
63      }
64  
65      public void provideNewTest()
66      {
67      }
68  
69      public void skipSinceNextTest()
70      {
71          if ( canContinue() )
72          {
73              immediateCommands.add( SKIP_SINCE_NEXT_TEST );
74              barrier.release();
75          }
76      }
77  
78      public void shutdown( Shutdown shutdownType )
79      {
80          if ( canContinue() )
81          {
82              immediateCommands.add( toShutdown( shutdownType ) );
83              barrier.release();
84          }
85      }
86  
87      public void noop()
88      {
89          if ( canContinue() )
90          {
91              immediateCommands.add( NOOP );
92              barrier.release();
93          }
94      }
95  
96      @Override
97      public void acknowledgeByeEventReceived()
98      {
99          if ( canContinue() )
100         {
101             immediateCommands.add( BYE_ACK );
102             barrier.release();
103         }
104     }
105 
106     @Override
107     protected boolean isClosed()
108     {
109         return closed.get();
110     }
111 
112     @Override
113     protected Command nextCommand()
114     {
115         Command cmd = immediateCommands.poll();
116         if ( cmd == null )
117         {
118             if ( cachableCommands == null )
119             {
120                 cachableCommands = builder.getIterableCachable().iterator();
121             }
122 
123             cmd = cachableCommands.next();
124         }
125         return cmd;
126     }
127 
128     @Override
129     protected void beforeNextCommand()
130         throws IOException
131     {
132         awaitNextCommand();
133     }
134 
135     @Override
136     public void close()
137     {
138         if ( closed.compareAndSet( false, true ) )
139         {
140             invalidateInternalBuffer();
141             barrier.drainPermits();
142             barrier.release();
143         }
144     }
145 
146     /**
147      * For testing purposes only.
148      *
149      * @return permits used internally by {@link #beforeNextCommand()}
150      */
151     int availablePermits()
152     {
153         return barrier.availablePermits();
154     }
155 
156     private void awaitNextCommand()
157         throws IOException
158     {
159         try
160         {
161             barrier.acquire();
162         }
163         catch ( InterruptedException e )
164         {
165             // help GC to free this object because StreamFeeder Thread cannot read it anyway after IOE
166             invalidateInternalBuffer();
167             throw new IOException( e.getLocalizedMessage() );
168         }
169     }
170 
171     /**
172      * Builds {@link TestLessInputStream streams}, registers cachable commands
173      * and provides accessible API to dispatch immediate commands to all atomically
174      * alive streams.
175      */
176     public static final class TestLessInputStreamBuilder
177     {
178         private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
179         private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<TestLessInputStream>();
180         private final ImmediateCommands immediateCommands = new ImmediateCommands();
181         private final CachableCommands cachableCommands = new CachableCommands();
182         private final Node head = new Node( null );
183         private final Iterable<Command> iterableCachable;
184 
185         public TestLessInputStreamBuilder()
186         {
187             iterableCachable = new Iterable<Command>()
188             {
189                 public Iterator<Command> iterator()
190                 {
191                     return new CIt();
192                 }
193             };
194         }
195 
196         public TestLessInputStream build()
197         {
198             Lock lock = rwLock.writeLock();
199             lock.lock();
200             try
201             {
202                 TestLessInputStream is = new TestLessInputStream( this );
203                 aliveStreams.offer( is );
204                 return is;
205             }
206             finally
207             {
208                 lock.unlock();
209             }
210         }
211 
212         public void removeStream( TestLessInputStream is )
213         {
214             Lock lock = rwLock.writeLock();
215             lock.lock();
216             try
217             {
218                 aliveStreams.remove( is );
219             }
220             finally
221             {
222                 lock.unlock();
223             }
224         }
225 
226         public NotifiableTestStream getImmediateCommands()
227         {
228             return immediateCommands;
229         }
230 
231         public NotifiableTestStream getCachableCommands()
232         {
233             return cachableCommands;
234         }
235 
236         /**
237          * The iterator is not thread safe.
238          */
239         Iterable<Command> getIterableCachable()
240         {
241             return iterableCachable;
242         }
243 
244         @SuppressWarnings( "checkstyle:innerassignment" )
245         private boolean addTailNodeIfAbsent( Command command )
246         {
247             Node newTail = new Node( command );
248             Node currentTail = head;
249             do
250             {
251                 for ( Node successor; ( successor = currentTail.next.get() ) != null; )
252                 {
253                     currentTail = successor;
254                     if ( command.equals( currentTail.command ) )
255                     {
256                         return false;
257                     }
258                 }
259             } while ( !currentTail.next.compareAndSet( null, newTail ) );
260             return true;
261         }
262 
263         private static Node nextCachedNode( Node current )
264         {
265             return current.next.get();
266         }
267 
268         private final class CIt
269             implements Iterator<Command>
270         {
271             private Node node = TestLessInputStreamBuilder.this.head;
272 
273             public boolean hasNext()
274             {
275                 return examineNext( false ) != null;
276             }
277 
278             public Command next()
279             {
280                 Command command = examineNext( true );
281                 if ( command == null )
282                 {
283                     throw new NoSuchElementException();
284                 }
285                 return command;
286             }
287 
288             public void remove()
289             {
290                 throw new UnsupportedOperationException();
291             }
292 
293             private Command examineNext( boolean store )
294             {
295                 Node next = nextCachedNode( node );
296                 if ( store && next != null )
297                 {
298                     node = next;
299                 }
300                 return next == null ? null : next.command;
301             }
302         }
303 
304         /**
305          * Event is called just now for all alive streams and command is not persisted.
306          */
307         private final class ImmediateCommands
308             implements NotifiableTestStream
309         {
310             public void provideNewTest()
311             {
312             }
313 
314             public void skipSinceNextTest()
315             {
316                 Lock lock = rwLock.readLock();
317                 lock.lock();
318                 try
319                 {
320                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
321                     {
322                         aliveStream.skipSinceNextTest();
323                     }
324                 }
325                 finally
326                 {
327                     lock.unlock();
328                 }
329             }
330 
331             public void shutdown( Shutdown shutdownType )
332             {
333                 Lock lock = rwLock.readLock();
334                 lock.lock();
335                 try
336                 {
337                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
338                     {
339                         aliveStream.shutdown( shutdownType );
340                     }
341                 }
342                 finally
343                 {
344                     lock.unlock();
345                 }
346             }
347 
348             public void noop()
349             {
350                 Lock lock = rwLock.readLock();
351                 lock.lock();
352                 try
353                 {
354                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
355                     {
356                         aliveStream.noop();
357                     }
358                 }
359                 finally
360                 {
361                     lock.unlock();
362                 }
363             }
364 
365             @Override
366             public void acknowledgeByeEventReceived()
367             {
368                 Lock lock = rwLock.readLock();
369                 lock.lock();
370                 try
371                 {
372                     for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
373                     {
374                         aliveStream.acknowledgeByeEventReceived();
375                     }
376                 }
377                 finally
378                 {
379                     lock.unlock();
380                 }
381             }
382         }
383 
384         /**
385          * Event is persisted.
386          */
387         private final class CachableCommands
388             implements NotifiableTestStream
389         {
390             public void provideNewTest()
391             {
392             }
393 
394             public void skipSinceNextTest()
395             {
396                 Lock lock = rwLock.readLock();
397                 lock.lock();
398                 try
399                 {
400                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
401                     {
402                         release();
403                     }
404                 }
405                 finally
406                 {
407                     lock.unlock();
408                 }
409             }
410 
411             public void shutdown( Shutdown shutdownType )
412             {
413                 Lock lock = rwLock.readLock();
414                 lock.lock();
415                 try
416                 {
417                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
418                     {
419                         release();
420                     }
421                 }
422                 finally
423                 {
424                     lock.unlock();
425                 }
426             }
427 
428             public void noop()
429             {
430                 Lock lock = rwLock.readLock();
431                 lock.lock();
432                 try
433                 {
434                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( NOOP ) )
435                     {
436                         release();
437                     }
438                 }
439                 finally
440                 {
441                     lock.unlock();
442                 }
443             }
444 
445             @Override
446             public void acknowledgeByeEventReceived()
447             {
448                 Lock lock = rwLock.readLock();
449                 lock.lock();
450                 try
451                 {
452                     if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( BYE_ACK ) )
453                     {
454                         release();
455                     }
456                 }
457                 finally
458                 {
459                     lock.unlock();
460                 }
461             }
462 
463             private void release()
464             {
465                 for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
466                 {
467                     aliveStream.barrier.release();
468                 }
469             }
470         }
471 
472         private static class Node
473         {
474             private final AtomicReference<Node> next = new AtomicReference<Node>();
475             private final Command command;
476 
477             Node( Command command )
478             {
479                 this.command = command;
480             }
481         }
482     }
483 }