View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.booter;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.channels.ClosedChannelException;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.Semaphore;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
35  import org.apache.maven.surefire.api.booter.BiProperty;
36  import org.apache.maven.surefire.api.booter.Command;
37  import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
38  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
39  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
40  import org.apache.maven.surefire.api.booter.MasterProcessCommand;
41  import org.apache.maven.surefire.api.booter.Shutdown;
42  import org.apache.maven.surefire.api.provider.CommandChainReader;
43  import org.apache.maven.surefire.api.provider.CommandListener;
44  import org.apache.maven.surefire.api.testset.TestSetFailedException;
45  
46  import static java.lang.StrictMath.max;
47  import static java.lang.Thread.State.NEW;
48  import static java.lang.Thread.State.RUNNABLE;
49  import static java.lang.Thread.State.TERMINATED;
50  import static java.util.Objects.requireNonNull;
51  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
52  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
53  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
54  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
55  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
56  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
57  import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
58  import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
59  
60  /**
61   * Reader of commands coming from plugin(master) process.
62   *
63   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
64   * @since 2.19
65   */
66  public final class CommandReader implements CommandChainReader {
67      private static final String LAST_TEST_SYMBOL = "";
68  
69      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
70  
71      private final Thread commandThread = newDaemonThread(new CommandRunnable(), "surefire-forkedjvm-command-thread");
72  
73      private final AtomicReference<Thread.State> state = new AtomicReference<>(NEW);
74  
75      private final CountDownLatch startMonitor = new CountDownLatch(1);
76  
77      private final Semaphore nextCommandNotifier = new Semaphore(0);
78  
79      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
80  
81      private final MasterProcessChannelDecoder decoder;
82  
83      private final Shutdown shutdown;
84  
85      private final ConsoleLogger logger;
86  
87      private int iteratedCount;
88  
89      public CommandReader(MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger) {
90          this.decoder = requireNonNull(decoder, "null decoder");
91          this.shutdown = requireNonNull(shutdown, "null Shutdown config");
92          this.logger = requireNonNull(logger, "null logger");
93          state.set(RUNNABLE);
94          commandThread.start();
95      }
96  
97      @Override
98      public boolean awaitStarted() throws TestSetFailedException {
99          if (state.get() == RUNNABLE) {
100             try {
101                 startMonitor.await();
102                 return true;
103             } catch (InterruptedException e) {
104                 DumpErrorSingleton.getSingleton().dumpException(e);
105                 throw new TestSetFailedException(e.getLocalizedMessage());
106             }
107         } else {
108             return false;
109         }
110     }
111 
112     @Override
113     public void addSkipNextTestsListener(CommandListener listener) {
114         addListener(SKIP_SINCE_NEXT_TEST, listener);
115     }
116 
117     @Override
118     public void addShutdownListener(CommandListener listener) {
119         addListener(SHUTDOWN, listener);
120     }
121 
122     public void addNoopListener(CommandListener listener) {
123         addListener(NOOP, listener);
124     }
125 
126     public void addByeAckListener(CommandListener listener) {
127         addListener(BYE_ACK, listener);
128     }
129 
130     private void addListener(MasterProcessCommand cmd, CommandListener listener) {
131         listeners.add(new BiProperty<>(cmd, listener));
132     }
133 
134     /**
135      * @return test classes which have been retrieved by
136      * {@link CommandReader#getIterableClasses(MasterProcessChannelEncoder)}.
137      */
138     Iterator<String> iterated() {
139         return testClasses.subList(0, iteratedCount).iterator();
140     }
141 
142     /**
143      * The iterator can be used only in one Thread.
144      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
145      *
146      * @param eventChannel original stream in current JVM process
147      * @return Iterator with test classes lazily loaded as commands from the main process
148      */
149     Iterable<String> getIterableClasses(MasterProcessChannelEncoder eventChannel) {
150         return new ClassesIterable(eventChannel);
151     }
152 
153     public void stop() {
154         if (!isStopped()) {
155             state.set(TERMINATED);
156             makeQueueFull();
157             listeners.clear();
158             commandThread.interrupt();
159         }
160     }
161 
162     private boolean isStopped() {
163         return state.get() == TERMINATED;
164     }
165 
166     /**
167      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
168      */
169     private boolean isQueueFull() {
170         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
171         // and we need both atomic.
172         //
173         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
174         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
175         // The insert operation is concurrent with this method.
176         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
177         // allow adding a string after LAST_TEST_SYMBOL.
178         int searchFrom = max(0, testClasses.size() - 1);
179         return testClasses.indexOf(LAST_TEST_SYMBOL, searchFrom) != -1;
180     }
181 
182     private void makeQueueFull() {
183         testClasses.addIfAbsent(LAST_TEST_SYMBOL);
184     }
185 
186     private boolean insertToQueue(String test) {
187         return isNotBlank(test) && !isQueueFull() && testClasses.add(test);
188     }
189 
190     private final class ClassesIterable implements Iterable<String> {
191         private final MasterProcessChannelEncoder eventChannel;
192 
193         ClassesIterable(MasterProcessChannelEncoder eventChannel) {
194             this.eventChannel = eventChannel;
195         }
196 
197         @Override
198         public Iterator<String> iterator() {
199             return new ClassesIterator(eventChannel);
200         }
201     }
202 
203     private final class ClassesIterator implements Iterator<String> {
204         private final MasterProcessChannelEncoder eventChannel;
205 
206         private String clazz;
207 
208         private int nextQueueIndex;
209 
210         private ClassesIterator(MasterProcessChannelEncoder eventChannel) {
211             this.eventChannel = eventChannel;
212         }
213 
214         @Override
215         public boolean hasNext() {
216             popUnread();
217             return isNotBlank(clazz);
218         }
219 
220         @Override
221         public String next() {
222             popUnread();
223             try {
224                 if (isBlank(clazz)) {
225                     throw new NoSuchElementException(CommandReader.this.isStopped() ? "stream was stopped" : "");
226                 } else {
227                     return clazz;
228                 }
229             } finally {
230                 clazz = null;
231             }
232         }
233 
234         @Override
235         public void remove() {
236             throw new UnsupportedOperationException();
237         }
238 
239         private void popUnread() {
240             if (shouldFinish()) {
241                 clazz = null;
242                 return;
243             }
244 
245             if (isBlank(clazz)) {
246                 requestNextTest();
247                 CommandReader.this.awaitNextTest();
248                 if (shouldFinish()) {
249                     clazz = null;
250                     return;
251                 }
252                 clazz = CommandReader.this.testClasses.get(nextQueueIndex++);
253                 CommandReader.this.iteratedCount = nextQueueIndex;
254             }
255 
256             if (CommandReader.this.isStopped()) {
257                 clazz = null;
258             }
259         }
260 
261         private void requestNextTest() {
262             eventChannel.acquireNextTest();
263         }
264 
265         private boolean shouldFinish() {
266             boolean wasLastTestRead = isEndSymbolAt(nextQueueIndex);
267             return CommandReader.this.isStopped() || wasLastTestRead;
268         }
269 
270         private boolean isEndSymbolAt(int index) {
271             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
272         }
273     }
274 
275     private void awaitNextTest() {
276         nextCommandNotifier.acquireUninterruptibly();
277     }
278 
279     private void wakeupIterator() {
280         nextCommandNotifier.release();
281     }
282 
283     private final class CommandRunnable implements Runnable {
284         @Override
285         public void run() {
286             CommandReader.this.startMonitor.countDown();
287             boolean isTestSetFinished = false;
288             try (MasterProcessChannelDecoder commandReader = CommandReader.this.decoder) {
289                 while (CommandReader.this.state.get() == RUNNABLE) {
290                     Command command = commandReader.decode();
291                     switch (command.getCommandType()) {
292                         case RUN_CLASS:
293                             String test = command.getData();
294                             boolean inserted = CommandReader.this.insertToQueue(test);
295                             if (inserted) {
296                                 CommandReader.this.wakeupIterator();
297                                 callListeners(command);
298                             }
299                             break;
300                         case TEST_SET_FINISHED:
301                             CommandReader.this.makeQueueFull();
302                             isTestSetFinished = true;
303                             CommandReader.this.wakeupIterator();
304                             callListeners(command);
305                             break;
306                         case SHUTDOWN:
307                             CommandReader.this.makeQueueFull();
308                             CommandReader.this.wakeupIterator();
309                             callListeners(command);
310                             break;
311                         case BYE_ACK:
312                             callListeners(command);
313                             // After SHUTDOWN no more commands can come.
314                             // Hence, do NOT go back to blocking in I/O.
315                             CommandReader.this.state.set(TERMINATED);
316                             break;
317                         default:
318                             callListeners(command);
319                             break;
320                     }
321                 }
322             } catch (EOFException | ClosedChannelException e) {
323                 CommandReader.this.state.set(TERMINATED);
324                 if (!isTestSetFinished) {
325                     String msg = "TestSet has not finished before stream error has appeared >> "
326                             + "initializing exit by non-null configuration: "
327                             + CommandReader.this.shutdown;
328                     DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
329 
330                     exitByConfiguration();
331                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
332                 }
333             } catch (IOException e) {
334                 CommandReader.this.state.set(TERMINATED);
335                 // If #stop() method is called, reader thread is interrupted
336                 // and exception is InterruptedIOException or its cause is InterruptedException.
337                 if (!(e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException)) {
338                     String msg = "[SUREFIRE] std/in stream corrupted";
339                     DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
340                     CommandReader.this.logger.error(msg, e);
341                 }
342             } finally {
343                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
344                 if (!isTestSetFinished) {
345                     CommandReader.this.makeQueueFull();
346                 }
347                 CommandReader.this.wakeupIterator();
348             }
349         }
350 
351         private void callListeners(Command cmd) {
352             MasterProcessCommand expectedCommandType = cmd.getCommandType();
353             for (BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners) {
354                 MasterProcessCommand commandType = listenerWrapper.getP1();
355                 CommandListener listener = listenerWrapper.getP2();
356                 if (commandType == null || commandType == expectedCommandType) {
357                     listener.update(cmd);
358                 }
359             }
360         }
361 
362         private void exitByConfiguration() {
363             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
364             if (shutdown != null) {
365                 CommandReader.this.makeQueueFull();
366                 CommandReader.this.wakeupIterator();
367                 callListeners(toShutdown(shutdown));
368             }
369         }
370     }
371 }