1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.booter;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.channels.ClosedChannelException;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
35 import org.apache.maven.surefire.api.booter.BiProperty;
36 import org.apache.maven.surefire.api.booter.Command;
37 import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
38 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
39 import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
40 import org.apache.maven.surefire.api.booter.MasterProcessCommand;
41 import org.apache.maven.surefire.api.booter.Shutdown;
42 import org.apache.maven.surefire.api.provider.CommandChainReader;
43 import org.apache.maven.surefire.api.provider.CommandListener;
44 import org.apache.maven.surefire.api.testset.TestSetFailedException;
45
46 import static java.lang.StrictMath.max;
47 import static java.lang.Thread.State.NEW;
48 import static java.lang.Thread.State.RUNNABLE;
49 import static java.lang.Thread.State.TERMINATED;
50 import static java.util.Objects.requireNonNull;
51 import static org.apache.maven.surefire.api.booter.Command.toShutdown;
52 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
53 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
54 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
55 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
56 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
57 import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
58 import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
59
60
61
62
63
64
65
66 public final class CommandReader implements CommandChainReader {
67 private static final String LAST_TEST_SYMBOL = "";
68
69 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
70
71 private final Thread commandThread = newDaemonThread(new CommandRunnable(), "surefire-forkedjvm-command-thread");
72
73 private final AtomicReference<Thread.State> state = new AtomicReference<>(NEW);
74
75 private final CountDownLatch startMonitor = new CountDownLatch(1);
76
77 private final Semaphore nextCommandNotifier = new Semaphore(0);
78
79 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
80
81 private final MasterProcessChannelDecoder decoder;
82
83 private final Shutdown shutdown;
84
85 private final ConsoleLogger logger;
86
87 private int iteratedCount;
88
89 public CommandReader(MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger) {
90 this.decoder = requireNonNull(decoder, "null decoder");
91 this.shutdown = requireNonNull(shutdown, "null Shutdown config");
92 this.logger = requireNonNull(logger, "null logger");
93 state.set(RUNNABLE);
94 commandThread.start();
95 }
96
97 @Override
98 public boolean awaitStarted() throws TestSetFailedException {
99 if (state.get() == RUNNABLE) {
100 try {
101 startMonitor.await();
102 return true;
103 } catch (InterruptedException e) {
104 DumpErrorSingleton.getSingleton().dumpException(e);
105 throw new TestSetFailedException(e.getLocalizedMessage());
106 }
107 } else {
108 return false;
109 }
110 }
111
112 @Override
113 public void addSkipNextTestsListener(CommandListener listener) {
114 addListener(SKIP_SINCE_NEXT_TEST, listener);
115 }
116
117 @Override
118 public void addShutdownListener(CommandListener listener) {
119 addListener(SHUTDOWN, listener);
120 }
121
122 public void addNoopListener(CommandListener listener) {
123 addListener(NOOP, listener);
124 }
125
126 public void addByeAckListener(CommandListener listener) {
127 addListener(BYE_ACK, listener);
128 }
129
130 private void addListener(MasterProcessCommand cmd, CommandListener listener) {
131 listeners.add(new BiProperty<>(cmd, listener));
132 }
133
134
135
136
137
138 Iterator<String> iterated() {
139 return testClasses.subList(0, iteratedCount).iterator();
140 }
141
142
143
144
145
146
147
148
149 Iterable<String> getIterableClasses(MasterProcessChannelEncoder eventChannel) {
150 return new ClassesIterable(eventChannel);
151 }
152
153 public void stop() {
154 if (!isStopped()) {
155 state.set(TERMINATED);
156 makeQueueFull();
157 listeners.clear();
158 commandThread.interrupt();
159 }
160 }
161
162 private boolean isStopped() {
163 return state.get() == TERMINATED;
164 }
165
166
167
168
169 private boolean isQueueFull() {
170
171
172
173
174
175
176
177
178 int searchFrom = max(0, testClasses.size() - 1);
179 return testClasses.indexOf(LAST_TEST_SYMBOL, searchFrom) != -1;
180 }
181
182 private void makeQueueFull() {
183 testClasses.addIfAbsent(LAST_TEST_SYMBOL);
184 }
185
186 private boolean insertToQueue(String test) {
187 return isNotBlank(test) && !isQueueFull() && testClasses.add(test);
188 }
189
190 private final class ClassesIterable implements Iterable<String> {
191 private final MasterProcessChannelEncoder eventChannel;
192
193 ClassesIterable(MasterProcessChannelEncoder eventChannel) {
194 this.eventChannel = eventChannel;
195 }
196
197 @Override
198 public Iterator<String> iterator() {
199 return new ClassesIterator(eventChannel);
200 }
201 }
202
203 private final class ClassesIterator implements Iterator<String> {
204 private final MasterProcessChannelEncoder eventChannel;
205
206 private String clazz;
207
208 private int nextQueueIndex;
209
210 private ClassesIterator(MasterProcessChannelEncoder eventChannel) {
211 this.eventChannel = eventChannel;
212 }
213
214 @Override
215 public boolean hasNext() {
216 popUnread();
217 return isNotBlank(clazz);
218 }
219
220 @Override
221 public String next() {
222 popUnread();
223 try {
224 if (isBlank(clazz)) {
225 throw new NoSuchElementException(CommandReader.this.isStopped() ? "stream was stopped" : "");
226 } else {
227 return clazz;
228 }
229 } finally {
230 clazz = null;
231 }
232 }
233
234 @Override
235 public void remove() {
236 throw new UnsupportedOperationException();
237 }
238
239 private void popUnread() {
240 if (shouldFinish()) {
241 clazz = null;
242 return;
243 }
244
245 if (isBlank(clazz)) {
246 requestNextTest();
247 CommandReader.this.awaitNextTest();
248 if (shouldFinish()) {
249 clazz = null;
250 return;
251 }
252 clazz = CommandReader.this.testClasses.get(nextQueueIndex++);
253 CommandReader.this.iteratedCount = nextQueueIndex;
254 }
255
256 if (CommandReader.this.isStopped()) {
257 clazz = null;
258 }
259 }
260
261 private void requestNextTest() {
262 eventChannel.acquireNextTest();
263 }
264
265 private boolean shouldFinish() {
266 boolean wasLastTestRead = isEndSymbolAt(nextQueueIndex);
267 return CommandReader.this.isStopped() || wasLastTestRead;
268 }
269
270 private boolean isEndSymbolAt(int index) {
271 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
272 }
273 }
274
275 private void awaitNextTest() {
276 nextCommandNotifier.acquireUninterruptibly();
277 }
278
279 private void wakeupIterator() {
280 nextCommandNotifier.release();
281 }
282
283 private final class CommandRunnable implements Runnable {
284 @Override
285 public void run() {
286 CommandReader.this.startMonitor.countDown();
287 boolean isTestSetFinished = false;
288 try (MasterProcessChannelDecoder commandReader = CommandReader.this.decoder) {
289 while (CommandReader.this.state.get() == RUNNABLE) {
290 Command command = commandReader.decode();
291 switch (command.getCommandType()) {
292 case RUN_CLASS:
293 String test = command.getData();
294 boolean inserted = CommandReader.this.insertToQueue(test);
295 if (inserted) {
296 CommandReader.this.wakeupIterator();
297 callListeners(command);
298 }
299 break;
300 case TEST_SET_FINISHED:
301 CommandReader.this.makeQueueFull();
302 isTestSetFinished = true;
303 CommandReader.this.wakeupIterator();
304 callListeners(command);
305 break;
306 case SHUTDOWN:
307 CommandReader.this.makeQueueFull();
308 CommandReader.this.wakeupIterator();
309 callListeners(command);
310 break;
311 case BYE_ACK:
312 callListeners(command);
313
314
315 CommandReader.this.state.set(TERMINATED);
316 break;
317 default:
318 callListeners(command);
319 break;
320 }
321 }
322 } catch (EOFException | ClosedChannelException e) {
323 CommandReader.this.state.set(TERMINATED);
324 if (!isTestSetFinished) {
325 String msg = "TestSet has not finished before stream error has appeared >> "
326 + "initializing exit by non-null configuration: "
327 + CommandReader.this.shutdown;
328 DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
329
330 exitByConfiguration();
331
332 }
333 } catch (IOException e) {
334 CommandReader.this.state.set(TERMINATED);
335
336
337 if (!(e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException)) {
338 String msg = "[SUREFIRE] std/in stream corrupted";
339 DumpErrorSingleton.getSingleton().dumpStreamException(e, msg);
340 CommandReader.this.logger.error(msg, e);
341 }
342 } finally {
343
344 if (!isTestSetFinished) {
345 CommandReader.this.makeQueueFull();
346 }
347 CommandReader.this.wakeupIterator();
348 }
349 }
350
351 private void callListeners(Command cmd) {
352 MasterProcessCommand expectedCommandType = cmd.getCommandType();
353 for (BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners) {
354 MasterProcessCommand commandType = listenerWrapper.getP1();
355 CommandListener listener = listenerWrapper.getP2();
356 if (commandType == null || commandType == expectedCommandType) {
357 listener.update(cmd);
358 }
359 }
360 }
361
362 private void exitByConfiguration() {
363 Shutdown shutdown = CommandReader.this.shutdown;
364 if (shutdown != null) {
365 CommandReader.this.makeQueueFull();
366 CommandReader.this.wakeupIterator();
367 callListeners(toShutdown(shutdown));
368 }
369 }
370 }
371 }