1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import javax.annotation.Nonnull;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.concurrent.ConcurrentLinkedDeque;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30
31 import org.apache.maven.surefire.api.event.Event;
32 import org.apache.maven.surefire.extensions.EventHandler;
33
34 import static java.lang.Thread.currentThread;
35 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
36
37
38
39
40
41
42
43
44
45
46
47 public final class ThreadedStreamConsumer
48 implements EventHandler<Event>, Closeable
49 {
50 private static final int QUEUE_MAX_ITEMS = 10_000;
51 private static final Event END_ITEM = new FinalEvent();
52
53 private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
54 private final AtomicBoolean stop = new AtomicBoolean();
55 private final AtomicBoolean isAlive = new AtomicBoolean( true );
56 private final Thread consumer;
57 private final Pumper pumper;
58
59 final class Pumper
60 implements Runnable
61 {
62 private final EventHandler<Event> target;
63
64 private final MultipleFailureException errors = new MultipleFailureException();
65
66 Pumper( EventHandler<Event> target )
67 {
68 this.target = target;
69 }
70
71
72
73
74
75
76
77
78
79
80
81
82 @Override
83 public void run()
84 {
85 while ( !stop.get() || !synchronizer.isEmptyQueue() )
86 {
87 try
88 {
89 Event item = synchronizer.awaitNext();
90
91 if ( shouldStopQueueing( item ) )
92 {
93 break;
94 }
95
96 target.handleEvent( item );
97 }
98 catch ( Throwable t )
99 {
100
101 t.getStackTrace();
102 errors.addException( t );
103 }
104 }
105
106 isAlive.set( false );
107 }
108
109 boolean hasErrors()
110 {
111 return errors.hasNestedExceptions();
112 }
113
114 void throwErrors() throws IOException
115 {
116 throw errors;
117 }
118 }
119
120 public ThreadedStreamConsumer( EventHandler<Event> target )
121 {
122 pumper = new Pumper( target );
123 Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
124 consumer.setUncaughtExceptionHandler( ( t, e ) -> isAlive.set( false ) );
125 consumer.start();
126 this.consumer = consumer;
127 }
128
129 @Override
130 public void handleEvent( @Nonnull Event event )
131 {
132
133
134 if ( !stop.get() && isAlive.get() )
135 {
136 synchronizer.pushNext( event );
137 }
138 }
139
140 @Override
141 public void close()
142 throws IOException
143 {
144 isAlive.compareAndSet( true, consumer.isAlive() );
145 if ( stop.compareAndSet( false, true ) && isAlive.get() )
146 {
147 if ( currentThread().isInterrupted() )
148 {
149 synchronizer.markStopped();
150 consumer.interrupt();
151 }
152 else
153 {
154 synchronizer.markStopped();
155
156 try
157 {
158 consumer.join();
159 }
160 catch ( InterruptedException e )
161 {
162
163
164 }
165
166 synchronizer.clearQueue();
167 }
168 }
169
170 if ( pumper.hasErrors() )
171 {
172 pumper.throwErrors();
173 }
174 }
175
176
177
178
179
180
181
182 private static boolean shouldStopQueueing( Event item )
183 {
184 return item == END_ITEM;
185 }
186
187
188
189
190 private static class FinalEvent extends Event
191 {
192 FinalEvent()
193 {
194 super( null );
195 }
196
197 @Override
198 public boolean isControlCategory()
199 {
200 return false;
201 }
202
203 @Override
204 public boolean isConsoleCategory()
205 {
206 return false;
207 }
208
209 @Override
210 public boolean isConsoleErrorCategory()
211 {
212 return false;
213 }
214
215 @Override
216 public boolean isStandardStreamCategory()
217 {
218 return false;
219 }
220
221 @Override
222 public boolean isSysPropCategory()
223 {
224 return false;
225 }
226
227 @Override
228 public boolean isTestCategory()
229 {
230 return false;
231 }
232
233 @Override
234 public boolean isJvmExitError()
235 {
236 return false;
237 }
238 }
239
240
241
242
243
244
245
246
247 static class QueueSynchronizer<T>
248 {
249 private final SyncT1 t1 = new SyncT1();
250 private final SyncT2 t2 = new SyncT2();
251 private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
252 private final AtomicInteger queueSize = new AtomicInteger();
253 private final int maxQueueSize;
254 private final T stopItemMarker;
255
256 QueueSynchronizer( int maxQueueSize, T stopItemMarker )
257 {
258 this.maxQueueSize = maxQueueSize;
259 this.stopItemMarker = stopItemMarker;
260 }
261
262 private class SyncT1 extends AbstractQueuedSynchronizer
263 {
264 private static final long serialVersionUID = 1L;
265
266 @Override
267 protected int tryAcquireShared( int arg )
268 {
269 return queueSize.get() == 0 ? -1 : 1;
270 }
271
272 @Override
273 protected boolean tryReleaseShared( int arg )
274 {
275 return true;
276 }
277
278 void waitIfZero() throws InterruptedException
279 {
280 acquireSharedInterruptibly( 1 );
281 }
282
283 void release()
284 {
285 releaseShared( 0 );
286 }
287 }
288
289 private class SyncT2 extends AbstractQueuedSynchronizer
290 {
291 private static final long serialVersionUID = 1L;
292
293 @Override
294 protected int tryAcquireShared( int arg )
295 {
296 return queueSize.get() < maxQueueSize ? 1 : -1;
297 }
298
299 @Override
300 protected boolean tryReleaseShared( int arg )
301 {
302 return true;
303 }
304
305 void awaitMax()
306 {
307 acquireShared( 1 );
308 }
309
310 void tryRelease()
311 {
312 if ( queueSize.get() == 0 )
313 {
314 releaseShared( 0 );
315 }
316 }
317 }
318
319 void markStopped()
320 {
321 addNext( stopItemMarker );
322 }
323
324 void pushNext( T t )
325 {
326 t2.awaitMax();
327 addNext( t );
328 }
329
330 T awaitNext() throws InterruptedException
331 {
332 t2.tryRelease();
333 t1.waitIfZero();
334 queueSize.decrementAndGet();
335 return queue.pollFirst();
336 }
337
338 boolean isEmptyQueue()
339 {
340 return queue.isEmpty();
341 }
342
343 void clearQueue()
344 {
345 queue.clear();
346 }
347
348 private void addNext( T t )
349 {
350 queue.addLast( t );
351 if ( queueSize.getAndIncrement() == 0 )
352 {
353 t1.release();
354 }
355 }
356 }
357 }