View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.output;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentLinkedDeque;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30  
31  import org.apache.maven.surefire.api.event.Event;
32  import org.apache.maven.surefire.extensions.EventHandler;
33  import org.slf4j.MDC;
34  
35  import static java.lang.Thread.currentThread;
36  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
37  
38  /**
39   * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
40   * <br>
41   * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
42   * 6.33 mega messages per second
43   * (158 nanoseconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
44   * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
45   *
46   * @author Kristian Rosenvold
47   */
48  public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
49      private static final int QUEUE_MAX_ITEMS = 10_000;
50      private static final Event END_ITEM = new FinalEvent();
51  
52      private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
53      private final AtomicBoolean stop = new AtomicBoolean();
54      private final AtomicBoolean isAlive = new AtomicBoolean(true);
55      private final Thread consumer;
56      private final Pumper pumper;
57  
58      final class Pumper implements Runnable {
59          private final EventHandler<Event> target;
60  
61          private final MultipleFailureException errors = new MultipleFailureException();
62          private final Map<String, String> parentThreadsMdcContextMap;
63  
64          Pumper(EventHandler<Event> target, Map<String, String> mdcContextMap) {
65              this.target = target;
66              this.parentThreadsMdcContextMap = mdcContextMap;
67          }
68  
69          /**
70           * Calls {@link ForkClient#handleEvent(Event)} which may throw any {@link RuntimeException}.<br>
71           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
72           * try-catch block must be inside of the loop which prevents from loosing events from {@link EventHandler}.
73           * <br>
74           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
75           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
76           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
77           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
78           * and therefore the plugin could be permanently in progress.
79           */
80          @Override
81          public void run() {
82              // copy mdc context map from parent thread so that mvn
83              // tools that build in parallel can use it in there logging
84              // to make clear what module is producing the logs
85              if (parentThreadsMdcContextMap != null) {
86                  MDC.setContextMap(parentThreadsMdcContextMap);
87              }
88              while (!stop.get() || !synchronizer.isEmptyQueue()) {
89                  try {
90                      Event item = synchronizer.awaitNext();
91  
92                      if (shouldStopQueueing(item)) {
93                          break;
94                      }
95  
96                      target.handleEvent(item);
97                  } catch (Throwable t) {
98                      // ensure the stack trace to be at the instance of the exception
99                      t.getStackTrace();
100                     errors.addException(t);
101                 }
102             }
103 
104             isAlive.set(false);
105         }
106 
107         boolean hasErrors() {
108             return errors.hasNestedExceptions();
109         }
110 
111         void throwErrors() throws IOException {
112             throw errors;
113         }
114     }
115 
116     public ThreadedStreamConsumer(EventHandler<Event> target) {
117         pumper = new Pumper(target, MDC.getCopyOfContextMap());
118         Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
119         consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
120         consumer.start();
121         this.consumer = consumer;
122     }
123 
124     @Override
125     public void handleEvent(@Nonnull Event event) {
126         // Do NOT call Thread.isAlive() - slow.
127         // It makes worse performance from 790 ms to 1250 ms for 5 million messages.
128         if (!stop.get() && isAlive.get()) {
129             synchronizer.pushNext(event);
130         }
131     }
132 
133     @Override
134     public void close() throws IOException {
135         isAlive.compareAndSet(true, consumer.isAlive());
136         if (stop.compareAndSet(false, true) && isAlive.get()) {
137             if (currentThread().isInterrupted()) {
138                 synchronizer.markStopped();
139                 consumer.interrupt();
140             } else {
141                 synchronizer.markStopped();
142 
143                 try {
144                     consumer.join();
145                 } catch (InterruptedException e) {
146                     // we should not set interrupted=true in this Thread
147                     // if consumer's Thread was interrupted which is indicated by InterruptedException
148                 }
149 
150                 synchronizer.clearQueue();
151             }
152         }
153 
154         if (pumper.hasErrors()) {
155             pumper.throwErrors();
156         }
157     }
158 
159     /**
160      * Compared item with {@link #END_ITEM} by identity.
161      *
162      * @param item    element from <code>items</code>
163      * @return {@code true} if tail of the queue
164      */
165     private static boolean shouldStopQueueing(Event item) {
166         return item == END_ITEM;
167     }
168 
169     private static class FinalEvent extends Event {
170         FinalEvent() {
171             super(null);
172         }
173 
174         @Override
175         public boolean isControlCategory() {
176             return false;
177         }
178 
179         @Override
180         public boolean isConsoleCategory() {
181             return false;
182         }
183 
184         @Override
185         public boolean isConsoleErrorCategory() {
186             return false;
187         }
188 
189         @Override
190         public boolean isStandardStreamCategory() {
191             return false;
192         }
193 
194         @Override
195         public boolean isSysPropCategory() {
196             return false;
197         }
198 
199         @Override
200         public boolean isTestCategory() {
201             return false;
202         }
203 
204         @Override
205         public boolean isJvmExitError() {
206             return false;
207         }
208     }
209 
210     /**
211      * This synchronization helper mostly avoids the locks.
212      * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
213      * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
214      *
215      * @param <T> element type in the queue
216      */
217     static class QueueSynchronizer<T> {
218         private final SyncT1 t1 = new SyncT1();
219         private final SyncT2 t2 = new SyncT2();
220         private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
221         private final AtomicInteger queueSize = new AtomicInteger();
222         private final int maxQueueSize;
223         private final T stopItemMarker;
224 
225         QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
226             this.maxQueueSize = maxQueueSize;
227             this.stopItemMarker = stopItemMarker;
228         }
229 
230         private class SyncT1 extends AbstractQueuedSynchronizer {
231             private static final long serialVersionUID = 1L;
232 
233             @Override
234             protected int tryAcquireShared(int arg) {
235                 return queueSize.get() == 0 ? -1 : 1;
236             }
237 
238             @Override
239             protected boolean tryReleaseShared(int arg) {
240                 return true;
241             }
242 
243             void waitIfZero() throws InterruptedException {
244                 acquireSharedInterruptibly(1);
245             }
246 
247             void release() {
248                 releaseShared(0);
249             }
250         }
251 
252         private class SyncT2 extends AbstractQueuedSynchronizer {
253             private static final long serialVersionUID = 1L;
254 
255             @Override
256             protected int tryAcquireShared(int arg) {
257                 return queueSize.get() < maxQueueSize ? 1 : -1;
258             }
259 
260             @Override
261             protected boolean tryReleaseShared(int arg) {
262                 return true;
263             }
264 
265             void awaitMax() {
266                 acquireShared(1);
267             }
268 
269             void tryRelease() {
270                 if (queueSize.get() == 0) {
271                     releaseShared(0);
272                 }
273             }
274         }
275 
276         void markStopped() {
277             addNext(stopItemMarker);
278         }
279 
280         void pushNext(T t) {
281             t2.awaitMax();
282             addNext(t);
283         }
284 
285         T awaitNext() throws InterruptedException {
286             t2.tryRelease();
287             t1.waitIfZero();
288             queueSize.decrementAndGet();
289             return queue.pollFirst();
290         }
291 
292         boolean isEmptyQueue() {
293             return queue.isEmpty();
294         }
295 
296         void clearQueue() {
297             queue.clear();
298         }
299 
300         private void addNext(T t) {
301             queue.addLast(t);
302             if (queueSize.getAndIncrement() == 0) {
303                 t1.release();
304             }
305         }
306     }
307 }