View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.NoSuchElementException;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.Semaphore;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicReference;
29  import java.util.concurrent.locks.Lock;
30  import java.util.concurrent.locks.ReentrantReadWriteLock;
31  
32  import org.apache.maven.surefire.api.booter.Command;
33  import org.apache.maven.surefire.api.booter.Shutdown;
34  
35  import static org.apache.maven.surefire.api.booter.Command.BYE_ACK;
36  import static org.apache.maven.surefire.api.booter.Command.NOOP;
37  import static org.apache.maven.surefire.api.booter.Command.SKIP_SINCE_NEXT_TEST;
38  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
39  
40  /**
41   * Dispatches commands without tests.
42   *
43   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
44   * @since 2.19
45   */
46  public final class TestLessInputStream extends DefaultCommandReader {
47      private final Semaphore barrier = new Semaphore(0);
48  
49      private final AtomicBoolean closed = new AtomicBoolean();
50  
51      private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
52  
53      private final TestLessInputStreamBuilder builder;
54  
55      private Iterator<Command> cachableCommands;
56  
57      private TestLessInputStream(TestLessInputStreamBuilder builder) {
58          this.builder = builder;
59      }
60  
61      @Override
62      public void provideNewTest() {}
63  
64      @Override
65      public void skipSinceNextTest() {
66          if (canContinue()) {
67              immediateCommands.add(SKIP_SINCE_NEXT_TEST);
68              barrier.release();
69          }
70      }
71  
72      @Override
73      public void shutdown(Shutdown shutdownType) {
74          if (canContinue()) {
75              immediateCommands.add(toShutdown(shutdownType));
76              barrier.release();
77          }
78      }
79  
80      @Override
81      public void noop() {
82          if (canContinue()) {
83              immediateCommands.add(NOOP);
84              barrier.release();
85          }
86      }
87  
88      @Override
89      public void acknowledgeByeEventReceived() {
90          if (canContinue()) {
91              immediateCommands.add(BYE_ACK);
92              barrier.release();
93          }
94      }
95  
96      @Override
97      public boolean isClosed() {
98          return closed.get();
99      }
100 
101     @Override
102     protected Command nextCommand() {
103         Command cmd = immediateCommands.poll();
104         if (cmd == null) {
105             if (cachableCommands == null) {
106                 cachableCommands = builder.getIterableCachable().iterator();
107             }
108 
109             cmd = cachableCommands.next();
110         }
111         return cmd;
112     }
113 
114     @Override
115     protected void beforeNextCommand() throws IOException {
116         awaitNextCommand();
117     }
118 
119     @Override
120     public void close() {
121         if (closed.compareAndSet(false, true)) {
122             barrier.drainPermits();
123             barrier.release();
124         }
125     }
126 
127     /**
128      * For testing purposes only.
129      *
130      * @return permits used internally by {@link #beforeNextCommand()}
131      */
132     int availablePermits() {
133         return barrier.availablePermits();
134     }
135 
136     private void awaitNextCommand() throws IOException {
137         try {
138             barrier.acquire();
139         } catch (InterruptedException e) {
140             throw new IOException(e.getLocalizedMessage());
141         }
142     }
143 
144     /**
145      * Builds {@link TestLessInputStream streams}, registers cachable commands
146      * and provides accessible API to dispatch immediate commands to all atomically
147      * alive streams.
148      */
149     public static final class TestLessInputStreamBuilder {
150         private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
151         private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
152         private final ImmediateCommands immediateCommands = new ImmediateCommands();
153         private final CachableCommands cachableCommands = new CachableCommands();
154         private final Node head = new Node(null);
155         private final Iterable<Command> iterableCachable;
156 
157         public TestLessInputStreamBuilder() {
158             iterableCachable = new Iterable<Command>() {
159                 @Override
160                 public Iterator<Command> iterator() {
161                     return new CIt();
162                 }
163             };
164         }
165 
166         public TestLessInputStream build() {
167             Lock lock = rwLock.writeLock();
168             lock.lock();
169             try {
170                 TestLessInputStream is = new TestLessInputStream(this);
171                 aliveStreams.offer(is);
172                 return is;
173             } finally {
174                 lock.unlock();
175             }
176         }
177 
178         public void removeStream(TestLessInputStream is) {
179             Lock lock = rwLock.writeLock();
180             lock.lock();
181             try {
182                 aliveStreams.remove(is);
183             } finally {
184                 lock.unlock();
185             }
186         }
187 
188         /**
189          * Only {@link NotifiableTestStream#noop()} and {@link NotifiableTestStream#shutdown(Shutdown)} are supported.
190          * Another methods throw {@link UnsupportedOperationException}.
191          *
192          * @return commands which are immediately transmitted once to all alive forked JVMs, not cached. As opposite
193          * to cached commands, the immediate commands disappear and cannot be seen by any fork initiated after
194          * the command has dispatched.
195          */
196         public NotifiableTestStream getImmediateCommands() {
197             return immediateCommands;
198         }
199 
200         /**
201          * Cached commands are sent to all alive or future alive forks. These are termination commands which are not
202          * reversible and therefore only {@link NotifiableTestStream#shutdown(Shutdown)} and
203          * {@link NotifiableTestStream#skipSinceNextTest()} are supported.
204          * Another methods throw {@link UnsupportedOperationException}.
205          *
206          * @return commands which are cached for currently alive or future forks.
207          */
208         public NotifiableTestStream getCachableCommands() {
209             return cachableCommands;
210         }
211 
212         /**
213          * The iterator is not thread safe.
214          */
215         Iterable<Command> getIterableCachable() {
216             return iterableCachable;
217         }
218 
219         @SuppressWarnings("checkstyle:innerassignment")
220         private boolean addTailNodeIfAbsent(Command command) {
221             Node newTail = new Node(command);
222             Node currentTail = head;
223             do {
224                 for (Node successor; (successor = currentTail.next.get()) != null; ) {
225                     currentTail = successor;
226                     if (command.equals(currentTail.command)) {
227                         return false;
228                     }
229                 }
230             } while (!currentTail.next.compareAndSet(null, newTail));
231             return true;
232         }
233 
234         private static Node nextCachedNode(Node current) {
235             return current.next.get();
236         }
237 
238         private final class CIt implements Iterator<Command> {
239             private Node node = TestLessInputStreamBuilder.this.head;
240 
241             @Override
242             public boolean hasNext() {
243                 return examineNext(false) != null;
244             }
245 
246             @Override
247             public Command next() {
248                 Command command = examineNext(true);
249                 if (command == null) {
250                     throw new NoSuchElementException();
251                 }
252                 return command;
253             }
254 
255             @Override
256             public void remove() {
257                 throw new UnsupportedOperationException();
258             }
259 
260             private Command examineNext(boolean store) {
261                 Node next = nextCachedNode(node);
262                 if (store && next != null) {
263                     node = next;
264                 }
265                 return next == null ? null : next.command;
266             }
267         }
268 
269         /**
270          * Event is called just now for all alive streams and command is not persisted.
271          */
272         private final class ImmediateCommands implements NotifiableTestStream {
273             @Override
274             public void provideNewTest() {
275                 throw new UnsupportedOperationException();
276             }
277 
278             @Override
279             public void skipSinceNextTest() {
280                 throw new UnsupportedOperationException();
281             }
282 
283             @Override
284             public void shutdown(Shutdown shutdownType) {
285                 Lock lock = rwLock.readLock();
286                 lock.lock();
287                 try {
288                     for (TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams) {
289                         aliveStream.shutdown(shutdownType);
290                     }
291                 } finally {
292                     lock.unlock();
293                 }
294             }
295 
296             @Override
297             public void noop() {
298                 Lock lock = rwLock.readLock();
299                 lock.lock();
300                 try {
301                     for (TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams) {
302                         aliveStream.noop();
303                     }
304                 } finally {
305                     lock.unlock();
306                 }
307             }
308 
309             @Override
310             public void acknowledgeByeEventReceived() {
311                 throw new UnsupportedOperationException();
312             }
313         }
314 
315         /**
316          * Event is persisted.
317          */
318         private final class CachableCommands implements NotifiableTestStream {
319             @Override
320             public void provideNewTest() {
321                 throw new UnsupportedOperationException();
322             }
323 
324             @Override
325             public void skipSinceNextTest() {
326                 Lock lock = rwLock.readLock();
327                 lock.lock();
328                 try {
329                     if (TestLessInputStreamBuilder.this.addTailNodeIfAbsent(SKIP_SINCE_NEXT_TEST)) {
330                         release();
331                     }
332                 } finally {
333                     lock.unlock();
334                 }
335             }
336 
337             @Override
338             public void shutdown(Shutdown shutdownType) {
339                 Lock lock = rwLock.readLock();
340                 lock.lock();
341                 try {
342                     if (TestLessInputStreamBuilder.this.addTailNodeIfAbsent(toShutdown(shutdownType))) {
343                         release();
344                     }
345                 } finally {
346                     lock.unlock();
347                 }
348             }
349 
350             @Override
351             public void noop() {
352                 throw new UnsupportedOperationException();
353             }
354 
355             @Override
356             public void acknowledgeByeEventReceived() {
357                 throw new UnsupportedOperationException();
358             }
359 
360             private void release() {
361                 for (TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams) {
362                     aliveStream.barrier.release();
363                 }
364             }
365         }
366 
367         private static class Node {
368             private final AtomicReference<Node> next = new AtomicReference<>();
369             private final Command command;
370 
371             Node(Command command) {
372                 this.command = command;
373             }
374         }
375     }
376 }