1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.booterclient.output;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.util.concurrent.ConcurrentLinkedDeque;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
29
30 import org.apache.maven.surefire.api.event.Event;
31 import org.apache.maven.surefire.extensions.EventHandler;
32
33 import static java.lang.Thread.currentThread;
34 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
35
36
37
38
39
40
41
42
43
44
45
46 public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
47 private static final int QUEUE_MAX_ITEMS = 10_000;
48 private static final Event END_ITEM = new FinalEvent();
49
50 private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
51 private final AtomicBoolean stop = new AtomicBoolean();
52 private final AtomicBoolean isAlive = new AtomicBoolean(true);
53 private final Thread consumer;
54 private final Pumper pumper;
55
56 final class Pumper implements Runnable {
57 private final EventHandler<Event> target;
58
59 private final MultipleFailureException errors = new MultipleFailureException();
60
61 Pumper(EventHandler<Event> target) {
62 this.target = target;
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76 @Override
77 public void run() {
78 while (!stop.get() || !synchronizer.isEmptyQueue()) {
79 try {
80 Event item = synchronizer.awaitNext();
81
82 if (shouldStopQueueing(item)) {
83 break;
84 }
85
86 target.handleEvent(item);
87 } catch (Throwable t) {
88
89 t.getStackTrace();
90 errors.addException(t);
91 }
92 }
93
94 isAlive.set(false);
95 }
96
97 boolean hasErrors() {
98 return errors.hasNestedExceptions();
99 }
100
101 void throwErrors() throws IOException {
102 throw errors;
103 }
104 }
105
106 public ThreadedStreamConsumer(EventHandler<Event> target) {
107 pumper = new Pumper(target);
108 Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
109 consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
110 consumer.start();
111 this.consumer = consumer;
112 }
113
114 @Override
115 public void handleEvent(@Nonnull Event event) {
116
117
118 if (!stop.get() && isAlive.get()) {
119 synchronizer.pushNext(event);
120 }
121 }
122
123 @Override
124 public void close() throws IOException {
125 isAlive.compareAndSet(true, consumer.isAlive());
126 if (stop.compareAndSet(false, true) && isAlive.get()) {
127 if (currentThread().isInterrupted()) {
128 synchronizer.markStopped();
129 consumer.interrupt();
130 } else {
131 synchronizer.markStopped();
132
133 try {
134 consumer.join();
135 } catch (InterruptedException e) {
136
137
138 }
139
140 synchronizer.clearQueue();
141 }
142 }
143
144 if (pumper.hasErrors()) {
145 pumper.throwErrors();
146 }
147 }
148
149
150
151
152
153
154
155 private static boolean shouldStopQueueing(Event item) {
156 return item == END_ITEM;
157 }
158
159
160
161
162 private static class FinalEvent extends Event {
163 FinalEvent() {
164 super(null);
165 }
166
167 @Override
168 public boolean isControlCategory() {
169 return false;
170 }
171
172 @Override
173 public boolean isConsoleCategory() {
174 return false;
175 }
176
177 @Override
178 public boolean isConsoleErrorCategory() {
179 return false;
180 }
181
182 @Override
183 public boolean isStandardStreamCategory() {
184 return false;
185 }
186
187 @Override
188 public boolean isSysPropCategory() {
189 return false;
190 }
191
192 @Override
193 public boolean isTestCategory() {
194 return false;
195 }
196
197 @Override
198 public boolean isJvmExitError() {
199 return false;
200 }
201 }
202
203
204
205
206
207
208
209
210 static class QueueSynchronizer<T> {
211 private final SyncT1 t1 = new SyncT1();
212 private final SyncT2 t2 = new SyncT2();
213 private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
214 private final AtomicInteger queueSize = new AtomicInteger();
215 private final int maxQueueSize;
216 private final T stopItemMarker;
217
218 QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
219 this.maxQueueSize = maxQueueSize;
220 this.stopItemMarker = stopItemMarker;
221 }
222
223 private class SyncT1 extends AbstractQueuedSynchronizer {
224 private static final long serialVersionUID = 1L;
225
226 @Override
227 protected int tryAcquireShared(int arg) {
228 return queueSize.get() == 0 ? -1 : 1;
229 }
230
231 @Override
232 protected boolean tryReleaseShared(int arg) {
233 return true;
234 }
235
236 void waitIfZero() throws InterruptedException {
237 acquireSharedInterruptibly(1);
238 }
239
240 void release() {
241 releaseShared(0);
242 }
243 }
244
245 private class SyncT2 extends AbstractQueuedSynchronizer {
246 private static final long serialVersionUID = 1L;
247
248 @Override
249 protected int tryAcquireShared(int arg) {
250 return queueSize.get() < maxQueueSize ? 1 : -1;
251 }
252
253 @Override
254 protected boolean tryReleaseShared(int arg) {
255 return true;
256 }
257
258 void awaitMax() {
259 acquireShared(1);
260 }
261
262 void tryRelease() {
263 if (queueSize.get() == 0) {
264 releaseShared(0);
265 }
266 }
267 }
268
269 void markStopped() {
270 addNext(stopItemMarker);
271 }
272
273 void pushNext(T t) {
274 t2.awaitMax();
275 addNext(t);
276 }
277
278 T awaitNext() throws InterruptedException {
279 t2.tryRelease();
280 t1.waitIfZero();
281 queueSize.decrementAndGet();
282 return queue.pollFirst();
283 }
284
285 boolean isEmptyQueue() {
286 return queue.isEmpty();
287 }
288
289 void clearQueue() {
290 queue.clear();
291 }
292
293 private void addNext(T t) {
294 queue.addLast(t);
295 if (queueSize.getAndIncrement() == 0) {
296 t1.release();
297 }
298 }
299 }
300 }