1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.booterclient.output;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentLinkedDeque;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30
31 import org.apache.maven.surefire.api.event.Event;
32 import org.apache.maven.surefire.extensions.EventHandler;
33 import org.slf4j.MDC;
34
35 import static java.lang.Thread.currentThread;
36 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
37
38
39
40
41
42
43
44
45
46
47
48 public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
49 private static final int QUEUE_MAX_ITEMS = 10_000;
50 private static final Event END_ITEM = new FinalEvent();
51
52 private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
53 private final AtomicBoolean stop = new AtomicBoolean();
54 private final AtomicBoolean isAlive = new AtomicBoolean(true);
55 private final Thread consumer;
56 private final Pumper pumper;
57
58 final class Pumper implements Runnable {
59 private final EventHandler<Event> target;
60
61 private final MultipleFailureException errors = new MultipleFailureException();
62 private final Map<String, String> parentThreadsMdcContextMap;
63
64 Pumper(EventHandler<Event> target, Map<String, String> mdcContextMap) {
65 this.target = target;
66 this.parentThreadsMdcContextMap = mdcContextMap;
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80 @Override
81 public void run() {
82
83
84
85 MDC.setContextMap(parentThreadsMdcContextMap);
86 while (!stop.get() || !synchronizer.isEmptyQueue()) {
87 try {
88 Event item = synchronizer.awaitNext();
89
90 if (shouldStopQueueing(item)) {
91 break;
92 }
93
94 target.handleEvent(item);
95 } catch (Throwable t) {
96
97 t.getStackTrace();
98 errors.addException(t);
99 }
100 }
101
102 isAlive.set(false);
103 }
104
105 boolean hasErrors() {
106 return errors.hasNestedExceptions();
107 }
108
109 void throwErrors() throws IOException {
110 throw errors;
111 }
112 }
113
114 public ThreadedStreamConsumer(EventHandler<Event> target) {
115 pumper = new Pumper(target, MDC.getCopyOfContextMap());
116 Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
117 consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
118 consumer.start();
119 this.consumer = consumer;
120 }
121
122 @Override
123 public void handleEvent(@Nonnull Event event) {
124
125
126 if (!stop.get() && isAlive.get()) {
127 synchronizer.pushNext(event);
128 }
129 }
130
131 @Override
132 public void close() throws IOException {
133 isAlive.compareAndSet(true, consumer.isAlive());
134 if (stop.compareAndSet(false, true) && isAlive.get()) {
135 if (currentThread().isInterrupted()) {
136 synchronizer.markStopped();
137 consumer.interrupt();
138 } else {
139 synchronizer.markStopped();
140
141 try {
142 consumer.join();
143 } catch (InterruptedException e) {
144
145
146 }
147
148 synchronizer.clearQueue();
149 }
150 }
151
152 if (pumper.hasErrors()) {
153 pumper.throwErrors();
154 }
155 }
156
157
158
159
160
161
162
163 private static boolean shouldStopQueueing(Event item) {
164 return item == END_ITEM;
165 }
166
167
168
169
170 private static class FinalEvent extends Event {
171 FinalEvent() {
172 super(null);
173 }
174
175 @Override
176 public boolean isControlCategory() {
177 return false;
178 }
179
180 @Override
181 public boolean isConsoleCategory() {
182 return false;
183 }
184
185 @Override
186 public boolean isConsoleErrorCategory() {
187 return false;
188 }
189
190 @Override
191 public boolean isStandardStreamCategory() {
192 return false;
193 }
194
195 @Override
196 public boolean isSysPropCategory() {
197 return false;
198 }
199
200 @Override
201 public boolean isTestCategory() {
202 return false;
203 }
204
205 @Override
206 public boolean isJvmExitError() {
207 return false;
208 }
209 }
210
211
212
213
214
215
216
217
218 static class QueueSynchronizer<T> {
219 private final SyncT1 t1 = new SyncT1();
220 private final SyncT2 t2 = new SyncT2();
221 private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
222 private final AtomicInteger queueSize = new AtomicInteger();
223 private final int maxQueueSize;
224 private final T stopItemMarker;
225
226 QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
227 this.maxQueueSize = maxQueueSize;
228 this.stopItemMarker = stopItemMarker;
229 }
230
231 private class SyncT1 extends AbstractQueuedSynchronizer {
232 private static final long serialVersionUID = 1L;
233
234 @Override
235 protected int tryAcquireShared(int arg) {
236 return queueSize.get() == 0 ? -1 : 1;
237 }
238
239 @Override
240 protected boolean tryReleaseShared(int arg) {
241 return true;
242 }
243
244 void waitIfZero() throws InterruptedException {
245 acquireSharedInterruptibly(1);
246 }
247
248 void release() {
249 releaseShared(0);
250 }
251 }
252
253 private class SyncT2 extends AbstractQueuedSynchronizer {
254 private static final long serialVersionUID = 1L;
255
256 @Override
257 protected int tryAcquireShared(int arg) {
258 return queueSize.get() < maxQueueSize ? 1 : -1;
259 }
260
261 @Override
262 protected boolean tryReleaseShared(int arg) {
263 return true;
264 }
265
266 void awaitMax() {
267 acquireShared(1);
268 }
269
270 void tryRelease() {
271 if (queueSize.get() == 0) {
272 releaseShared(0);
273 }
274 }
275 }
276
277 void markStopped() {
278 addNext(stopItemMarker);
279 }
280
281 void pushNext(T t) {
282 t2.awaitMax();
283 addNext(t);
284 }
285
286 T awaitNext() throws InterruptedException {
287 t2.tryRelease();
288 t1.waitIfZero();
289 queueSize.decrementAndGet();
290 return queue.pollFirst();
291 }
292
293 boolean isEmptyQueue() {
294 return queue.isEmpty();
295 }
296
297 void clearQueue() {
298 queue.clear();
299 }
300
301 private void addNext(T t) {
302 queue.addLast(t);
303 if (queueSize.getAndIncrement() == 0) {
304 t1.release();
305 }
306 }
307 }
308 }