View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.output;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentLinkedDeque;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30  
31  import org.apache.maven.surefire.api.event.Event;
32  import org.apache.maven.surefire.extensions.EventHandler;
33  import org.slf4j.MDC;
34  
35  import static java.lang.Thread.currentThread;
36  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
37  
38  /**
39   * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
40   * <br>
41   * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
42   * 6.33 mega messages per second
43   * (158 nanoseconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
44   * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
45   *
46   * @author Kristian Rosenvold
47   */
48  public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
49      private static final int QUEUE_MAX_ITEMS = 10_000;
50      private static final Event END_ITEM = new FinalEvent();
51  
52      private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
53      private final AtomicBoolean stop = new AtomicBoolean();
54      private final AtomicBoolean isAlive = new AtomicBoolean(true);
55      private final Thread consumer;
56      private final Pumper pumper;
57  
58      final class Pumper implements Runnable {
59          private final EventHandler<Event> target;
60  
61          private final MultipleFailureException errors = new MultipleFailureException();
62          private final Map<String, String> parentThreadsMdcContextMap;
63  
64          Pumper(EventHandler<Event> target, Map<String, String> mdcContextMap) {
65              this.target = target;
66              this.parentThreadsMdcContextMap = mdcContextMap;
67          }
68  
69          /**
70           * Calls {@link ForkClient#handleEvent(Event)} which may throw any {@link RuntimeException}.<br>
71           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
72           * try-catch block must be inside of the loop which prevents from loosing events from {@link EventHandler}.
73           * <br>
74           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
75           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
76           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
77           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
78           * and therefore the plugin could be permanently in progress.
79           */
80          @Override
81          public void run() {
82              // copy mdc context map from parent thread so that mvn
83              // tools that build in parallel can use it in there logging
84              // to make clear what module is producing the logs
85              MDC.setContextMap(parentThreadsMdcContextMap);
86              while (!stop.get() || !synchronizer.isEmptyQueue()) {
87                  try {
88                      Event item = synchronizer.awaitNext();
89  
90                      if (shouldStopQueueing(item)) {
91                          break;
92                      }
93  
94                      target.handleEvent(item);
95                  } catch (Throwable t) {
96                      // ensure the stack trace to be at the instance of the exception
97                      t.getStackTrace();
98                      errors.addException(t);
99                  }
100             }
101 
102             isAlive.set(false);
103         }
104 
105         boolean hasErrors() {
106             return errors.hasNestedExceptions();
107         }
108 
109         void throwErrors() throws IOException {
110             throw errors;
111         }
112     }
113 
114     public ThreadedStreamConsumer(EventHandler<Event> target) {
115         pumper = new Pumper(target, MDC.getCopyOfContextMap());
116         Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
117         consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
118         consumer.start();
119         this.consumer = consumer;
120     }
121 
122     @Override
123     public void handleEvent(@Nonnull Event event) {
124         // Do NOT call Thread.isAlive() - slow.
125         // It makes worse performance from 790 ms to 1250 ms for 5 million messages.
126         if (!stop.get() && isAlive.get()) {
127             synchronizer.pushNext(event);
128         }
129     }
130 
131     @Override
132     public void close() throws IOException {
133         isAlive.compareAndSet(true, consumer.isAlive());
134         if (stop.compareAndSet(false, true) && isAlive.get()) {
135             if (currentThread().isInterrupted()) {
136                 synchronizer.markStopped();
137                 consumer.interrupt();
138             } else {
139                 synchronizer.markStopped();
140 
141                 try {
142                     consumer.join();
143                 } catch (InterruptedException e) {
144                     // we should not set interrupted=true in this Thread
145                     // if consumer's Thread was interrupted which is indicated by InterruptedException
146                 }
147 
148                 synchronizer.clearQueue();
149             }
150         }
151 
152         if (pumper.hasErrors()) {
153             pumper.throwErrors();
154         }
155     }
156 
157     /**
158      * Compared item with {@link #END_ITEM} by identity.
159      *
160      * @param item    element from <code>items</code>
161      * @return {@code true} if tail of the queue
162      */
163     private static boolean shouldStopQueueing(Event item) {
164         return item == END_ITEM;
165     }
166 
167     /**
168      *
169      */
170     private static class FinalEvent extends Event {
171         FinalEvent() {
172             super(null);
173         }
174 
175         @Override
176         public boolean isControlCategory() {
177             return false;
178         }
179 
180         @Override
181         public boolean isConsoleCategory() {
182             return false;
183         }
184 
185         @Override
186         public boolean isConsoleErrorCategory() {
187             return false;
188         }
189 
190         @Override
191         public boolean isStandardStreamCategory() {
192             return false;
193         }
194 
195         @Override
196         public boolean isSysPropCategory() {
197             return false;
198         }
199 
200         @Override
201         public boolean isTestCategory() {
202             return false;
203         }
204 
205         @Override
206         public boolean isJvmExitError() {
207             return false;
208         }
209     }
210 
211     /**
212      * This synchronization helper mostly avoids the locks.
213      * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
214      * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
215      *
216      * @param <T> element type in the queue
217      */
218     static class QueueSynchronizer<T> {
219         private final SyncT1 t1 = new SyncT1();
220         private final SyncT2 t2 = new SyncT2();
221         private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
222         private final AtomicInteger queueSize = new AtomicInteger();
223         private final int maxQueueSize;
224         private final T stopItemMarker;
225 
226         QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
227             this.maxQueueSize = maxQueueSize;
228             this.stopItemMarker = stopItemMarker;
229         }
230 
231         private class SyncT1 extends AbstractQueuedSynchronizer {
232             private static final long serialVersionUID = 1L;
233 
234             @Override
235             protected int tryAcquireShared(int arg) {
236                 return queueSize.get() == 0 ? -1 : 1;
237             }
238 
239             @Override
240             protected boolean tryReleaseShared(int arg) {
241                 return true;
242             }
243 
244             void waitIfZero() throws InterruptedException {
245                 acquireSharedInterruptibly(1);
246             }
247 
248             void release() {
249                 releaseShared(0);
250             }
251         }
252 
253         private class SyncT2 extends AbstractQueuedSynchronizer {
254             private static final long serialVersionUID = 1L;
255 
256             @Override
257             protected int tryAcquireShared(int arg) {
258                 return queueSize.get() < maxQueueSize ? 1 : -1;
259             }
260 
261             @Override
262             protected boolean tryReleaseShared(int arg) {
263                 return true;
264             }
265 
266             void awaitMax() {
267                 acquireShared(1);
268             }
269 
270             void tryRelease() {
271                 if (queueSize.get() == 0) {
272                     releaseShared(0);
273                 }
274             }
275         }
276 
277         void markStopped() {
278             addNext(stopItemMarker);
279         }
280 
281         void pushNext(T t) {
282             t2.awaitMax();
283             addNext(t);
284         }
285 
286         T awaitNext() throws InterruptedException {
287             t2.tryRelease();
288             t1.waitIfZero();
289             queueSize.decrementAndGet();
290             return queue.pollFirst();
291         }
292 
293         boolean isEmptyQueue() {
294             return queue.isEmpty();
295         }
296 
297         void clearQueue() {
298             queue.clear();
299         }
300 
301         private void addNext(T t) {
302             queue.addLast(t);
303             if (queueSize.getAndIncrement() == 0) {
304                 t1.release();
305             }
306         }
307     }
308 }