1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.api.event.Event;
23 import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
24 import org.apache.maven.surefire.extensions.EventHandler;
25 import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
26
27 import javax.annotation.Nonnull;
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import static java.lang.Thread.currentThread;
35
36
37
38
39
40
41 public final class ThreadedStreamConsumer
42 implements EventHandler<Event>, Closeable
43 {
44 private static final Event END_ITEM = new FinalEvent();
45
46 private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
47
48 private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
49
50 private final AtomicBoolean stop = new AtomicBoolean();
51
52 private final Thread thread;
53
54 private final Pumper pumper;
55
56 final class Pumper
57 implements Runnable
58 {
59 private final EventHandler<Event> target;
60
61 private final MultipleFailureException errors = new MultipleFailureException();
62
63 Pumper( EventHandler<Event> target )
64 {
65 this.target = target;
66 }
67
68
69
70
71
72
73
74
75
76
77
78
79 @Override
80 public void run()
81 {
82 while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
83 {
84 try
85 {
86 Event item = ThreadedStreamConsumer.this.items.take();
87 if ( shouldStopQueueing( item ) )
88 {
89 return;
90 }
91 target.handleEvent( item );
92 }
93 catch ( Throwable t )
94 {
95 errors.addException( t );
96 }
97 }
98 }
99
100 boolean hasErrors()
101 {
102 return errors.hasNestedExceptions();
103 }
104
105 void throwErrors() throws IOException
106 {
107 throw errors;
108 }
109 }
110
111 public ThreadedStreamConsumer( EventHandler<Event> target )
112 {
113 pumper = new Pumper( target );
114 thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
115 thread.start();
116 }
117
118 @Override
119 public void handleEvent( @Nonnull Event event )
120 {
121 if ( stop.get() )
122 {
123 return;
124 }
125 else if ( !thread.isAlive() )
126 {
127 items.clear();
128 return;
129 }
130
131 try
132 {
133 items.put( event );
134 }
135 catch ( InterruptedException e )
136 {
137 currentThread().interrupt();
138 throw new IllegalStateException( e );
139 }
140 }
141
142 @Override
143 public void close()
144 throws IOException
145 {
146 if ( stop.compareAndSet( false, true ) )
147 {
148 try
149 {
150 items.put( END_ITEM );
151 }
152 catch ( InterruptedException e )
153 {
154 currentThread().interrupt();
155 }
156 }
157
158 if ( pumper.hasErrors() )
159 {
160 pumper.throwErrors();
161 }
162 }
163
164
165
166
167
168
169
170 private boolean shouldStopQueueing( Event item )
171 {
172 return item == END_ITEM;
173 }
174
175
176
177
178 private static class FinalEvent extends Event
179 {
180 FinalEvent()
181 {
182 super( null );
183 }
184
185 @Override
186 public boolean isControlCategory()
187 {
188 return false;
189 }
190
191 @Override
192 public boolean isConsoleCategory()
193 {
194 return false;
195 }
196
197 @Override
198 public boolean isConsoleErrorCategory()
199 {
200 return false;
201 }
202
203 @Override
204 public boolean isStandardStreamCategory()
205 {
206 return false;
207 }
208
209 @Override
210 public boolean isSysPropCategory()
211 {
212 return false;
213 }
214
215 @Override
216 public boolean isTestCategory()
217 {
218 return false;
219 }
220
221 @Override
222 public boolean isJvmExitError()
223 {
224 return false;
225 }
226 }
227 }