1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.maven.surefire.booter;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.channels.ClosedChannelException;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.Semaphore;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
35  import org.apache.maven.surefire.api.booter.BiProperty;
36  import org.apache.maven.surefire.api.booter.Command;
37  import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
38  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
39  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
40  import org.apache.maven.surefire.api.booter.MasterProcessCommand;
41  import org.apache.maven.surefire.api.booter.Shutdown;
42  import org.apache.maven.surefire.api.provider.CommandChainReader;
43  import org.apache.maven.surefire.api.provider.CommandListener;
44  import org.apache.maven.surefire.api.testset.TestSetFailedException;
45  
46  import static java.lang.StrictMath.max;
47  import static java.lang.Thread.State.NEW;
48  import static java.lang.Thread.State.RUNNABLE;
49  import static java.lang.Thread.State.TERMINATED;
50  import static java.util.Objects.requireNonNull;
51  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
52  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
53  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
54  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
55  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
56  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
57  import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
58  import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
59  
60  
61  
62  
63  
64  
65  
66  public final class CommandReader implements CommandChainReader {
67      private static final String LAST_TEST_SYMBOL = "";
68  
69      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
70  
71      private final Thread commandThread = newDaemonThread(new CommandRunnable(), "surefire-forkedjvm-command-thread");
72  
73      private final AtomicReference<Thread.State> state = new AtomicReference<>(NEW);
74  
75      private final CountDownLatch startMonitor = new CountDownLatch(1);
76  
77      private final Semaphore nextCommandNotifier = new Semaphore(0);
78  
79      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
80  
81      private final MasterProcessChannelDecoder decoder;
82  
83      private final Shutdown shutdown;
84  
85      private final ConsoleLogger logger;
86  
87      private int iteratedCount;
88  
89      public CommandReader(MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger) {
90          this.decoder = requireNonNull(decoder, "null decoder");
91          this.shutdown = requireNonNull(shutdown, "null Shutdown config");
92          this.logger = requireNonNull(logger, "null logger");
93          state.set(RUNNABLE);
94          commandThread.start();
95      }
96  
97      @Override
98      public boolean awaitStarted() throws TestSetFailedException {
99          if (state.get() == RUNNABLE) {
100             try {
101                 startMonitor.await();
102                 return true;
103             } catch (InterruptedException e) {
104                 DumpErrorSingleton.getSingleton().dumpException(e);
105                 throw new TestSetFailedException(e.getLocalizedMessage());
106             }
107         } else {
108             return false;
109         }
110     }
111 
112     @Override
113     public void addSkipNextTestsListener(CommandListener listener) {
114         addListener(SKIP_SINCE_NEXT_TEST, listener);
115     }
116 
117     @Override
118     public void addShutdownListener(CommandListener listener) {
119         addListener(SHUTDOWN, listener);
120     }
121 
122     public void addNoopListener(CommandListener listener) {
123         addListener(NOOP, listener);
124     }
125 
126     public void addByeAckListener(CommandListener listener) {
127         addListener(BYE_ACK, listener);
128     }
129 
130     private void addListener(MasterProcessCommand cmd, CommandListener listener) {
131         listeners.add(new BiProperty<>(cmd, listener));
132     }
133 
134     
135 
136 
137 
138     Iterator<String> iterated() {
139         return testClasses.subList(0, iteratedCount).iterator();
140     }
141 
142     
143 
144 
145 
146 
147 
148 
149     Iterable<String> getIterableClasses(MasterProcessChannelEncoder eventChannel) {
150         return new ClassesIterable(eventChannel);
151     }
152 
153     public void stop() {
154         if (!isStopped()) {
155             state.set(TERMINATED);
156             makeQueueFull();
157             listeners.clear();
158             commandThread.interrupt();
159         }
160     }
161 
162     private boolean isStopped() {
163         return state.get() == TERMINATED;
164     }
165 
166     
167 
168 
169     private boolean isQueueFull() {
170         
171         
172         
173         
174         
175         
176         
177         
178         int searchFrom = max(0, testClasses.size() - 1);
179         return testClasses.indexOf(LAST_TEST_SYMBOL, searchFrom) != -1;
180     }
181 
182     private void makeQueueFull() {
183         testClasses.addIfAbsent(LAST_TEST_SYMBOL);
184     }
185 
186     private boolean insertToQueue(String test) {
187         return isNotBlank(test) && !isQueueFull() && testClasses.add(test);
188     }
189 
190     private final class ClassesIterable implements Iterable<String> {
191         private final MasterProcessChannelEncoder eventChannel;
192 
193         ClassesIterable(MasterProcessChannelEncoder eventChannel) {
194             this.eventChannel = eventChannel;
195         }
196 
197         @Override
198         public Iterator<String> iterator() {
199             return new ClassesIterator(eventChannel);
200         }
201     }
202 
203     private final class ClassesIterator implements Iterator<String> {
204         private final MasterProcessChannelEncoder eventChannel;
205 
206         private String clazz;
207 
208         private int nextQueueIndex;
209 
210         private ClassesIterator(MasterProcessChannelEncoder eventChannel) {
211             this.eventChannel = eventChannel;
212         }
213 
214         @Override
215         public boolean hasNext() {
216             popUnread();
217             return isNotBlank(clazz);
218         }
219 
220         @Override
221         public String next() {
222             popUnread();
223             try {
224                 if (isBlank(clazz)) {
225                     throw new NoSuchElementException(CommandReader.this.isStopped() ? "stream was stopped" : "");
226                 } else {
227                     return clazz;
228                 }
229             } finally {
230                 clazz = null;
231             }
232         }
233 
234         @Override
235         public void remove() {
236             throw new UnsupportedOperationException();
237         }
238 
239         private void popUnread() {
240             if (shouldFinish()) {
241                 clazz = null;
242                 return;
243             }
244 
245             if (isBlank(clazz)) {
246                 requestNextTest();
247                 CommandReader.this.awaitNextTest();
248                 if (shouldFinish()) {
249                     clazz = null;
250                     return;
251                 }
252                 clazz = CommandReader.this.testClasses.get(nextQueueIndex++);
253                 CommandReader.this.iteratedCount = nextQueueIndex;
254             }
255 
256             if (CommandReader.this.isStopped()) {
257                 clazz = null;
258             }
259         }
260 
261         private void requestNextTest() {
262             eventChannel.acquireNextTest();
263         }
264 
265         private boolean shouldFinish() {
266             boolean wasLastTestRead = isEndSymbolAt(nextQueueIndex);
267             return CommandReader.this.isStopped() || wasLastTestRead;
268         }
269 
270         private boolean isEndSymbolAt(int index) {
271             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
272         }
273     }
274 
275     private void awaitNextTest() {
276         nextCommandNotifier.acquireUninterruptibly();
277     }
278 
279     private void wakeupIterator() {
280         nextCommandNotifier.release();
281     }
282 
283     private final class CommandRunnable implements Runnable {
284         @Override
285         public void run() {
286             CommandReader.this.startMonitor.countDown();
287             boolean isTestSetFinished = false;
288             try (MasterProcessChannelDecoder commandReader = CommandReader.this.decoder) {
289                 while (CommandReader.this.state.get() == RUNNABLE) {
290                     Command command = commandReader.decode();
291                     switch (command.getCommandType()) {
292                         case RUN_CLASS:
293                             String test = command.getData();
294                             boolean inserted = CommandReader.this.insertToQueue(test);
295                             if (inserted) {
296                                 CommandReader.this.wakeupIterator();
297                                 callListeners(command);
298                             }
299                             break;
300                         case TEST_SET_FINISHED:
301                             CommandReader.this.makeQueueFull();
302                             isTestSetFinished = true;
303                             CommandReader.this.wakeupIterator();
304                             callListeners(command);
305                             break;
306                         case SHUTDOWN:
307                             CommandReader.this.makeQueueFull();
308                             CommandReader.this.wakeupIterator();
309                             callListeners(command);
310                             break;
311                         case BYE_ACK:
312                             callListeners(command);
313                             
314                             
315                             CommandReader.this.state.set(TERMINATED);
316                             break;
317                         default:
318                             callListeners(command);
319                             break;
320                     }
321                 }
322             } catch (EOFException | ClosedChannelException e) {
323                 CommandReader.this.state.set(TERMINATED);
324                 if (!isTestSetFinished) {
325                     String msg = "TestSet has not finished before stream error has appeared >> "
326                             + "initializing exit by non-null configuration: "
327                             + CommandReader.this.shutdown;
328                     DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
329 
330                     exitByConfiguration();
331                     
332                 }
333             } catch (IOException e) {
334                 CommandReader.this.state.set(TERMINATED);
335                 
336                 
337                 if (!(e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException)) {
338                     String msg = "[SUREFIRE] std/in stream corrupted";
339                     DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
340                     CommandReader.this.logger.error(msg, e);
341                 }
342             } finally {
343                 
344                 if (!isTestSetFinished) {
345                     CommandReader.this.makeQueueFull();
346                 }
347                 CommandReader.this.wakeupIterator();
348             }
349         }
350 
351         private void callListeners(Command cmd) {
352             MasterProcessCommand expectedCommandType = cmd.getCommandType();
353             for (BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners) {
354                 MasterProcessCommand commandType = listenerWrapper.getP1();
355                 CommandListener listener = listenerWrapper.getP2();
356                 if (commandType == null || commandType == expectedCommandType) {
357                     listener.update(cmd);
358                 }
359             }
360         }
361 
362         private void exitByConfiguration() {
363             Shutdown shutdown = CommandReader.this.shutdown; 
364             if (shutdown != null) {
365                 CommandReader.this.makeQueueFull();
366                 CommandReader.this.wakeupIterator();
367                 callListeners(toShutdown(shutdown));
368             }
369         }
370     }
371 }