1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.booterclient.output;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentLinkedDeque;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30
31 import org.apache.maven.surefire.api.event.Event;
32 import org.apache.maven.surefire.extensions.EventHandler;
33 import org.slf4j.MDC;
34
35 import static java.lang.Thread.currentThread;
36 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
37
38
39
40
41
42
43
44
45
46
47
48 public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
49 private static final int QUEUE_MAX_ITEMS = 10_000;
50 private static final Event END_ITEM = new FinalEvent();
51
52 private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
53 private final AtomicBoolean stop = new AtomicBoolean();
54 private final AtomicBoolean isAlive = new AtomicBoolean(true);
55 private final Thread consumer;
56 private final Pumper pumper;
57
58 final class Pumper implements Runnable {
59 private final EventHandler<Event> target;
60
61 private final MultipleFailureException errors = new MultipleFailureException();
62 private final Map<String, String> parentThreadsMdcContextMap;
63
64 Pumper(EventHandler<Event> target, Map<String, String> mdcContextMap) {
65 this.target = target;
66 this.parentThreadsMdcContextMap = mdcContextMap;
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80 @Override
81 public void run() {
82
83
84
85 if (parentThreadsMdcContextMap != null) {
86 MDC.setContextMap(parentThreadsMdcContextMap);
87 }
88 while (!stop.get() || !synchronizer.isEmptyQueue()) {
89 try {
90 Event item = synchronizer.awaitNext();
91
92 if (shouldStopQueueing(item)) {
93 break;
94 }
95
96 target.handleEvent(item);
97 } catch (Throwable t) {
98
99 t.getStackTrace();
100 errors.addException(t);
101 }
102 }
103
104 isAlive.set(false);
105 }
106
107 boolean hasErrors() {
108 return errors.hasNestedExceptions();
109 }
110
111 void throwErrors() throws IOException {
112 throw errors;
113 }
114 }
115
116 public ThreadedStreamConsumer(EventHandler<Event> target) {
117 pumper = new Pumper(target, MDC.getCopyOfContextMap());
118 Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
119 consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
120 consumer.start();
121 this.consumer = consumer;
122 }
123
124 @Override
125 public void handleEvent(@Nonnull Event event) {
126
127
128 if (!stop.get() && isAlive.get()) {
129 synchronizer.pushNext(event);
130 }
131 }
132
133 @Override
134 public void close() throws IOException {
135 isAlive.compareAndSet(true, consumer.isAlive());
136 if (stop.compareAndSet(false, true) && isAlive.get()) {
137 if (currentThread().isInterrupted()) {
138 synchronizer.markStopped();
139 consumer.interrupt();
140 } else {
141 synchronizer.markStopped();
142
143 try {
144 consumer.join();
145 } catch (InterruptedException e) {
146
147
148 }
149
150 synchronizer.clearQueue();
151 }
152 }
153
154 if (pumper.hasErrors()) {
155 pumper.throwErrors();
156 }
157 }
158
159
160
161
162
163
164
165 private static boolean shouldStopQueueing(Event item) {
166 return item == END_ITEM;
167 }
168
169 private static class FinalEvent extends Event {
170 FinalEvent() {
171 super(null);
172 }
173
174 @Override
175 public boolean isControlCategory() {
176 return false;
177 }
178
179 @Override
180 public boolean isConsoleCategory() {
181 return false;
182 }
183
184 @Override
185 public boolean isConsoleErrorCategory() {
186 return false;
187 }
188
189 @Override
190 public boolean isStandardStreamCategory() {
191 return false;
192 }
193
194 @Override
195 public boolean isSysPropCategory() {
196 return false;
197 }
198
199 @Override
200 public boolean isTestCategory() {
201 return false;
202 }
203
204 @Override
205 public boolean isJvmExitError() {
206 return false;
207 }
208 }
209
210
211
212
213
214
215
216
217 static class QueueSynchronizer<T> {
218 private final SyncT1 t1 = new SyncT1();
219 private final SyncT2 t2 = new SyncT2();
220 private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
221 private final AtomicInteger queueSize = new AtomicInteger();
222 private final int maxQueueSize;
223 private final T stopItemMarker;
224
225 QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
226 this.maxQueueSize = maxQueueSize;
227 this.stopItemMarker = stopItemMarker;
228 }
229
230 private class SyncT1 extends AbstractQueuedSynchronizer {
231 private static final long serialVersionUID = 1L;
232
233 @Override
234 protected int tryAcquireShared(int arg) {
235 return queueSize.get() == 0 ? -1 : 1;
236 }
237
238 @Override
239 protected boolean tryReleaseShared(int arg) {
240 return true;
241 }
242
243 void waitIfZero() throws InterruptedException {
244 acquireSharedInterruptibly(1);
245 }
246
247 void release() {
248 releaseShared(0);
249 }
250 }
251
252 private class SyncT2 extends AbstractQueuedSynchronizer {
253 private static final long serialVersionUID = 1L;
254
255 @Override
256 protected int tryAcquireShared(int arg) {
257 return queueSize.get() < maxQueueSize ? 1 : -1;
258 }
259
260 @Override
261 protected boolean tryReleaseShared(int arg) {
262 return true;
263 }
264
265 void awaitMax() {
266 acquireShared(1);
267 }
268
269 void tryRelease() {
270 if (queueSize.get() == 0) {
271 releaseShared(0);
272 }
273 }
274 }
275
276 void markStopped() {
277 addNext(stopItemMarker);
278 }
279
280 void pushNext(T t) {
281 t2.awaitMax();
282 addNext(t);
283 }
284
285 T awaitNext() throws InterruptedException {
286 t2.tryRelease();
287 t1.waitIfZero();
288 queueSize.decrementAndGet();
289 return queue.pollFirst();
290 }
291
292 boolean isEmptyQueue() {
293 return queue.isEmpty();
294 }
295
296 void clearQueue() {
297 queue.clear();
298 }
299
300 private void addNext(T t) {
301 queue.addLast(t);
302 if (queueSize.getAndIncrement() == 0) {
303 t1.release();
304 }
305 }
306 }
307 }