1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.maven.plugin.surefire.booterclient.output;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.util.concurrent.ConcurrentLinkedDeque;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
29  
30  import org.apache.maven.surefire.api.event.Event;
31  import org.apache.maven.surefire.extensions.EventHandler;
32  
33  import static java.lang.Thread.currentThread;
34  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
47      private static final int QUEUE_MAX_ITEMS = 10_000;
48      private static final Event END_ITEM = new FinalEvent();
49  
50      private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
51      private final AtomicBoolean stop = new AtomicBoolean();
52      private final AtomicBoolean isAlive = new AtomicBoolean(true);
53      private final Thread consumer;
54      private final Pumper pumper;
55  
56      final class Pumper implements Runnable {
57          private final EventHandler<Event> target;
58  
59          private final MultipleFailureException errors = new MultipleFailureException();
60  
61          Pumper(EventHandler<Event> target) {
62              this.target = target;
63          }
64  
65          
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76          @Override
77          public void run() {
78              while (!stop.get() || !synchronizer.isEmptyQueue()) {
79                  try {
80                      Event item = synchronizer.awaitNext();
81  
82                      if (shouldStopQueueing(item)) {
83                          break;
84                      }
85  
86                      target.handleEvent(item);
87                  } catch (Throwable t) {
88                      
89                      t.getStackTrace();
90                      errors.addException(t);
91                  }
92              }
93  
94              isAlive.set(false);
95          }
96  
97          boolean hasErrors() {
98              return errors.hasNestedExceptions();
99          }
100 
101         void throwErrors() throws IOException {
102             throw errors;
103         }
104     }
105 
106     public ThreadedStreamConsumer(EventHandler<Event> target) {
107         pumper = new Pumper(target);
108         Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
109         consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
110         consumer.start();
111         this.consumer = consumer;
112     }
113 
114     @Override
115     public void handleEvent(@Nonnull Event event) {
116         
117         
118         if (!stop.get() && isAlive.get()) {
119             synchronizer.pushNext(event);
120         }
121     }
122 
123     @Override
124     public void close() throws IOException {
125         isAlive.compareAndSet(true, consumer.isAlive());
126         if (stop.compareAndSet(false, true) && isAlive.get()) {
127             if (currentThread().isInterrupted()) {
128                 synchronizer.markStopped();
129                 consumer.interrupt();
130             } else {
131                 synchronizer.markStopped();
132 
133                 try {
134                     consumer.join();
135                 } catch (InterruptedException e) {
136                     
137                     
138                 }
139 
140                 synchronizer.clearQueue();
141             }
142         }
143 
144         if (pumper.hasErrors()) {
145             pumper.throwErrors();
146         }
147     }
148 
149     
150 
151 
152 
153 
154 
155     private static boolean shouldStopQueueing(Event item) {
156         return item == END_ITEM;
157     }
158 
159     
160 
161 
162     private static class FinalEvent extends Event {
163         FinalEvent() {
164             super(null);
165         }
166 
167         @Override
168         public boolean isControlCategory() {
169             return false;
170         }
171 
172         @Override
173         public boolean isConsoleCategory() {
174             return false;
175         }
176 
177         @Override
178         public boolean isConsoleErrorCategory() {
179             return false;
180         }
181 
182         @Override
183         public boolean isStandardStreamCategory() {
184             return false;
185         }
186 
187         @Override
188         public boolean isSysPropCategory() {
189             return false;
190         }
191 
192         @Override
193         public boolean isTestCategory() {
194             return false;
195         }
196 
197         @Override
198         public boolean isJvmExitError() {
199             return false;
200         }
201     }
202 
203     
204 
205 
206 
207 
208 
209 
210     static class QueueSynchronizer<T> {
211         private final SyncT1 t1 = new SyncT1();
212         private final SyncT2 t2 = new SyncT2();
213         private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
214         private final AtomicInteger queueSize = new AtomicInteger();
215         private final int maxQueueSize;
216         private final T stopItemMarker;
217 
218         QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
219             this.maxQueueSize = maxQueueSize;
220             this.stopItemMarker = stopItemMarker;
221         }
222 
223         private class SyncT1 extends AbstractQueuedSynchronizer {
224             private static final long serialVersionUID = 1L;
225 
226             @Override
227             protected int tryAcquireShared(int arg) {
228                 return queueSize.get() == 0 ? -1 : 1;
229             }
230 
231             @Override
232             protected boolean tryReleaseShared(int arg) {
233                 return true;
234             }
235 
236             void waitIfZero() throws InterruptedException {
237                 acquireSharedInterruptibly(1);
238             }
239 
240             void release() {
241                 releaseShared(0);
242             }
243         }
244 
245         private class SyncT2 extends AbstractQueuedSynchronizer {
246             private static final long serialVersionUID = 1L;
247 
248             @Override
249             protected int tryAcquireShared(int arg) {
250                 return queueSize.get() < maxQueueSize ? 1 : -1;
251             }
252 
253             @Override
254             protected boolean tryReleaseShared(int arg) {
255                 return true;
256             }
257 
258             void awaitMax() {
259                 acquireShared(1);
260             }
261 
262             void tryRelease() {
263                 if (queueSize.get() == 0) {
264                     releaseShared(0);
265                 }
266             }
267         }
268 
269         void markStopped() {
270             addNext(stopItemMarker);
271         }
272 
273         void pushNext(T t) {
274             t2.awaitMax();
275             addNext(t);
276         }
277 
278         T awaitNext() throws InterruptedException {
279             t2.tryRelease();
280             t1.waitIfZero();
281             queueSize.decrementAndGet();
282             return queue.pollFirst();
283         }
284 
285         boolean isEmptyQueue() {
286             return queue.isEmpty();
287         }
288 
289         void clearQueue() {
290             queue.clear();
291         }
292 
293         private void addNext(T t) {
294             queue.addLast(t);
295             if (queueSize.getAndIncrement() == 0) {
296                 t1.release();
297             }
298         }
299     }
300 }