View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import javax.annotation.Nonnull;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentLinkedDeque;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30  
31  import org.apache.maven.surefire.api.event.Event;
32  import org.apache.maven.surefire.extensions.EventHandler;
33  
34  import static java.lang.Thread.currentThread;
35  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
36  
37  /**
38   * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
39   * <br>
40   * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
41   * 6.33 mega messages per second
42   * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
43   * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
44   *
45   * @author Kristian Rosenvold
46   */
47  public final class ThreadedStreamConsumer
48      implements EventHandler<Event>, Closeable
49  {
50      private static final int QUEUE_MAX_ITEMS = 10_000;
51      private static final Event END_ITEM = new FinalEvent();
52  
53      private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
54      private final AtomicBoolean stop = new AtomicBoolean();
55      private final AtomicBoolean isAlive = new AtomicBoolean( true );
56      private final Thread consumer;
57      private final Pumper pumper;
58  
59      final class Pumper
60          implements Runnable
61      {
62          private final EventHandler<Event> target;
63  
64          private final MultipleFailureException errors = new MultipleFailureException();
65  
66          Pumper( EventHandler<Event> target )
67          {
68              this.target = target;
69          }
70  
71          /**
72           * Calls {@link ForkClient#handleEvent(Event)} which may throw any {@link RuntimeException}.<br>
73           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
74           * try-catch block must be inside of the loop which prevents from loosing events from {@link EventHandler}.
75           * <br>
76           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
77           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
78           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
79           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
80           * and therefore the plugin could be permanently in progress.
81           */
82          @Override
83          public void run()
84          {
85              while ( !stop.get() || !synchronizer.isEmptyQueue() )
86              {
87                  try
88                  {
89                      Event item = synchronizer.awaitNext();
90  
91                      if ( shouldStopQueueing( item ) )
92                      {
93                          break;
94                      }
95  
96                      target.handleEvent( item );
97                  }
98                  catch ( Throwable t )
99                  {
100                     // ensure the stack trace to be at the instance of the exception
101                     t.getStackTrace();
102                     errors.addException( t );
103                 }
104             }
105 
106             isAlive.set( false );
107         }
108 
109         boolean hasErrors()
110         {
111             return errors.hasNestedExceptions();
112         }
113 
114         void throwErrors() throws IOException
115         {
116             throw errors;
117         }
118     }
119 
120     public ThreadedStreamConsumer( EventHandler<Event> target )
121     {
122         pumper = new Pumper( target );
123         Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
124         consumer.setUncaughtExceptionHandler( ( t, e ) -> isAlive.set( false ) );
125         consumer.start();
126         this.consumer = consumer;
127     }
128 
129     @Override
130     public void handleEvent( @Nonnull Event event )
131     {
132         // Do NOT call Thread.isAlive() - slow.
133         // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
134         if ( !stop.get() && isAlive.get() )
135         {
136             synchronizer.pushNext( event );
137         }
138     }
139 
140     @Override
141     public void close()
142         throws IOException
143     {
144         isAlive.compareAndSet( true, consumer.isAlive() );
145         if ( stop.compareAndSet( false, true ) && isAlive.get() )
146         {
147             if ( currentThread().isInterrupted() )
148             {
149                 synchronizer.markStopped();
150                 consumer.interrupt();
151             }
152             else
153             {
154                 synchronizer.markStopped();
155 
156                 try
157                 {
158                     consumer.join();
159                 }
160                 catch ( InterruptedException e )
161                 {
162                     // we should not set interrupted=true in this Thread
163                     // if consumer's Thread was interrupted which is indicated by InterruptedException
164                 }
165 
166                 synchronizer.clearQueue();
167             }
168         }
169 
170         if ( pumper.hasErrors() )
171         {
172             pumper.throwErrors();
173         }
174     }
175 
176     /**
177      * Compared item with {@link #END_ITEM} by identity.
178      *
179      * @param item    element from <code>items</code>
180      * @return {@code true} if tail of the queue
181      */
182     private static boolean shouldStopQueueing( Event item )
183     {
184         return item == END_ITEM;
185     }
186 
187     /**
188      *
189      */
190     private static class FinalEvent extends Event
191     {
192         FinalEvent()
193         {
194             super( null );
195         }
196 
197         @Override
198         public boolean isControlCategory()
199         {
200             return false;
201         }
202 
203         @Override
204         public boolean isConsoleCategory()
205         {
206             return false;
207         }
208 
209         @Override
210         public boolean isConsoleErrorCategory()
211         {
212             return false;
213         }
214 
215         @Override
216         public boolean isStandardStreamCategory()
217         {
218             return false;
219         }
220 
221         @Override
222         public boolean isSysPropCategory()
223         {
224             return false;
225         }
226 
227         @Override
228         public boolean isTestCategory()
229         {
230             return false;
231         }
232 
233         @Override
234         public boolean isJvmExitError()
235         {
236             return false;
237         }
238     }
239 
240     /**
241      * This synchronization helper mostly avoids the locks.
242      * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
243      * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
244      *
245      * @param <T> element type in the queue
246      */
247     static class QueueSynchronizer<T>
248     {
249         private final SyncT1 t1 = new SyncT1();
250         private final SyncT2 t2 = new SyncT2();
251         private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
252         private final AtomicInteger queueSize = new AtomicInteger();
253         private final int maxQueueSize;
254         private final T stopItemMarker;
255 
256         QueueSynchronizer( int maxQueueSize, T stopItemMarker )
257         {
258             this.maxQueueSize = maxQueueSize;
259             this.stopItemMarker = stopItemMarker;
260         }
261 
262         private class SyncT1 extends AbstractQueuedSynchronizer
263         {
264             private static final long serialVersionUID = 1L;
265 
266             @Override
267             protected int tryAcquireShared( int arg )
268             {
269                 return queueSize.get() == 0 ? -1 : 1;
270             }
271 
272             @Override
273             protected boolean tryReleaseShared( int arg )
274             {
275                 return true;
276             }
277 
278             void waitIfZero() throws InterruptedException
279             {
280                 acquireSharedInterruptibly( 1 );
281             }
282 
283             void release()
284             {
285                 releaseShared( 0 );
286             }
287         }
288 
289         private class SyncT2 extends AbstractQueuedSynchronizer
290         {
291             private static final long serialVersionUID = 1L;
292 
293             @Override
294             protected int tryAcquireShared( int arg )
295             {
296                 return queueSize.get() < maxQueueSize ? 1 : -1;
297             }
298 
299             @Override
300             protected boolean tryReleaseShared( int arg )
301             {
302                 return true;
303             }
304 
305             void awaitMax()
306             {
307                 acquireShared( 1 );
308             }
309 
310             void tryRelease()
311             {
312                 if ( queueSize.get() == 0 )
313                 {
314                     releaseShared( 0 );
315                 }
316             }
317         }
318 
319         void markStopped()
320         {
321             addNext( stopItemMarker );
322         }
323 
324         void pushNext( T t )
325         {
326             t2.awaitMax();
327             addNext( t );
328         }
329 
330         T awaitNext() throws InterruptedException
331         {
332             t2.tryRelease();
333             t1.waitIfZero();
334             queueSize.decrementAndGet();
335             return queue.pollFirst();
336         }
337 
338         boolean isEmptyQueue()
339         {
340             return queue.isEmpty();
341         }
342 
343         void clearQueue()
344         {
345             queue.clear();
346         }
347 
348         private void addNext( T t )
349         {
350             queue.addLast( t );
351             if ( queueSize.getAndIncrement() == 0 )
352             {
353                 t1.release();
354             }
355         }
356     }
357 }