View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.output;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.util.concurrent.ConcurrentLinkedDeque;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
29  
30  import org.apache.maven.surefire.api.event.Event;
31  import org.apache.maven.surefire.extensions.EventHandler;
32  
33  import static java.lang.Thread.currentThread;
34  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
35  
36  /**
37   * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
38   * <br>
39   * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
40   * 6.33 mega messages per second
41   * (158 nanoseconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
42   * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
43   *
44   * @author Kristian Rosenvold
45   */
46  public final class ThreadedStreamConsumer implements EventHandler<Event>, Closeable {
47      private static final int QUEUE_MAX_ITEMS = 10_000;
48      private static final Event END_ITEM = new FinalEvent();
49  
50      private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>(QUEUE_MAX_ITEMS, END_ITEM);
51      private final AtomicBoolean stop = new AtomicBoolean();
52      private final AtomicBoolean isAlive = new AtomicBoolean(true);
53      private final Thread consumer;
54      private final Pumper pumper;
55  
56      final class Pumper implements Runnable {
57          private final EventHandler<Event> target;
58  
59          private final MultipleFailureException errors = new MultipleFailureException();
60  
61          Pumper(EventHandler<Event> target) {
62              this.target = target;
63          }
64  
65          /**
66           * Calls {@link ForkClient#handleEvent(Event)} which may throw any {@link RuntimeException}.<br>
67           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
68           * try-catch block must be inside of the loop which prevents from loosing events from {@link EventHandler}.
69           * <br>
70           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
71           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
72           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
73           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
74           * and therefore the plugin could be permanently in progress.
75           */
76          @Override
77          public void run() {
78              while (!stop.get() || !synchronizer.isEmptyQueue()) {
79                  try {
80                      Event item = synchronizer.awaitNext();
81  
82                      if (shouldStopQueueing(item)) {
83                          break;
84                      }
85  
86                      target.handleEvent(item);
87                  } catch (Throwable t) {
88                      // ensure the stack trace to be at the instance of the exception
89                      t.getStackTrace();
90                      errors.addException(t);
91                  }
92              }
93  
94              isAlive.set(false);
95          }
96  
97          boolean hasErrors() {
98              return errors.hasNestedExceptions();
99          }
100 
101         void throwErrors() throws IOException {
102             throw errors;
103         }
104     }
105 
106     public ThreadedStreamConsumer(EventHandler<Event> target) {
107         pumper = new Pumper(target);
108         Thread consumer = newDaemonThread(pumper, "ThreadedStreamConsumer");
109         consumer.setUncaughtExceptionHandler((t, e) -> isAlive.set(false));
110         consumer.start();
111         this.consumer = consumer;
112     }
113 
114     @Override
115     public void handleEvent(@Nonnull Event event) {
116         // Do NOT call Thread.isAlive() - slow.
117         // It makes worse performance from 790 ms to 1250 ms for 5 million messages.
118         if (!stop.get() && isAlive.get()) {
119             synchronizer.pushNext(event);
120         }
121     }
122 
123     @Override
124     public void close() throws IOException {
125         isAlive.compareAndSet(true, consumer.isAlive());
126         if (stop.compareAndSet(false, true) && isAlive.get()) {
127             if (currentThread().isInterrupted()) {
128                 synchronizer.markStopped();
129                 consumer.interrupt();
130             } else {
131                 synchronizer.markStopped();
132 
133                 try {
134                     consumer.join();
135                 } catch (InterruptedException e) {
136                     // we should not set interrupted=true in this Thread
137                     // if consumer's Thread was interrupted which is indicated by InterruptedException
138                 }
139 
140                 synchronizer.clearQueue();
141             }
142         }
143 
144         if (pumper.hasErrors()) {
145             pumper.throwErrors();
146         }
147     }
148 
149     /**
150      * Compared item with {@link #END_ITEM} by identity.
151      *
152      * @param item    element from <code>items</code>
153      * @return {@code true} if tail of the queue
154      */
155     private static boolean shouldStopQueueing(Event item) {
156         return item == END_ITEM;
157     }
158 
159     /**
160      *
161      */
162     private static class FinalEvent extends Event {
163         FinalEvent() {
164             super(null);
165         }
166 
167         @Override
168         public boolean isControlCategory() {
169             return false;
170         }
171 
172         @Override
173         public boolean isConsoleCategory() {
174             return false;
175         }
176 
177         @Override
178         public boolean isConsoleErrorCategory() {
179             return false;
180         }
181 
182         @Override
183         public boolean isStandardStreamCategory() {
184             return false;
185         }
186 
187         @Override
188         public boolean isSysPropCategory() {
189             return false;
190         }
191 
192         @Override
193         public boolean isTestCategory() {
194             return false;
195         }
196 
197         @Override
198         public boolean isJvmExitError() {
199             return false;
200         }
201     }
202 
203     /**
204      * This synchronization helper mostly avoids the locks.
205      * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
206      * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
207      *
208      * @param <T> element type in the queue
209      */
210     static class QueueSynchronizer<T> {
211         private final SyncT1 t1 = new SyncT1();
212         private final SyncT2 t2 = new SyncT2();
213         private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
214         private final AtomicInteger queueSize = new AtomicInteger();
215         private final int maxQueueSize;
216         private final T stopItemMarker;
217 
218         QueueSynchronizer(int maxQueueSize, T stopItemMarker) {
219             this.maxQueueSize = maxQueueSize;
220             this.stopItemMarker = stopItemMarker;
221         }
222 
223         private class SyncT1 extends AbstractQueuedSynchronizer {
224             private static final long serialVersionUID = 1L;
225 
226             @Override
227             protected int tryAcquireShared(int arg) {
228                 return queueSize.get() == 0 ? -1 : 1;
229             }
230 
231             @Override
232             protected boolean tryReleaseShared(int arg) {
233                 return true;
234             }
235 
236             void waitIfZero() throws InterruptedException {
237                 acquireSharedInterruptibly(1);
238             }
239 
240             void release() {
241                 releaseShared(0);
242             }
243         }
244 
245         private class SyncT2 extends AbstractQueuedSynchronizer {
246             private static final long serialVersionUID = 1L;
247 
248             @Override
249             protected int tryAcquireShared(int arg) {
250                 return queueSize.get() < maxQueueSize ? 1 : -1;
251             }
252 
253             @Override
254             protected boolean tryReleaseShared(int arg) {
255                 return true;
256             }
257 
258             void awaitMax() {
259                 acquireShared(1);
260             }
261 
262             void tryRelease() {
263                 if (queueSize.get() == 0) {
264                     releaseShared(0);
265                 }
266             }
267         }
268 
269         void markStopped() {
270             addNext(stopItemMarker);
271         }
272 
273         void pushNext(T t) {
274             t2.awaitMax();
275             addNext(t);
276         }
277 
278         T awaitNext() throws InterruptedException {
279             t2.tryRelease();
280             t1.waitIfZero();
281             queueSize.decrementAndGet();
282             return queue.pollFirst();
283         }
284 
285         boolean isEmptyQueue() {
286             return queue.isEmpty();
287         }
288 
289         void clearQueue() {
290             queue.clear();
291         }
292 
293         private void addNext(T t) {
294             queue.addLast(t);
295             if (queueSize.getAndIncrement() == 0) {
296                 t1.release();
297             }
298         }
299     }
300 }