View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.api.event.Event;
23  import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
24  import org.apache.maven.surefire.extensions.EventHandler;
25  import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
26  
27  import javax.annotation.Nonnull;
28  import java.io.Closeable;
29  import java.io.IOException;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import static java.lang.Thread.currentThread;
35  
36  /**
37   * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
38   *
39   * @author Kristian Rosenvold
40   */
41  public final class ThreadedStreamConsumer
42          implements EventHandler<Event>, Closeable
43  {
44      private static final Event END_ITEM = new FinalEvent();
45  
46      private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
47  
48      private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
49  
50      private final AtomicBoolean stop = new AtomicBoolean();
51  
52      private final Thread thread;
53  
54      private final Pumper pumper;
55  
56      final class Pumper
57              implements Runnable
58      {
59          private final EventHandler<Event> target;
60  
61          private final MultipleFailureException errors = new MultipleFailureException();
62  
63          Pumper( EventHandler<Event> target )
64          {
65              this.target = target;
66          }
67  
68          /**
69           * Calls {@link ForkClient#handleEvent(Event)} which may throw any {@link RuntimeException}.<br>
70           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
71           * try-catch block must be inside of the loop which prevents from loosing events from {@link StreamConsumer}.
72           * <br>
73           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
74           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
75           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
76           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
77           * and therefore the plugin could be permanently in progress.
78           */
79          @Override
80          public void run()
81          {
82              while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
83              {
84                  try
85                  {
86                      Event item = ThreadedStreamConsumer.this.items.take();
87                      if ( shouldStopQueueing( item ) )
88                      {
89                          return;
90                      }
91                      target.handleEvent( item );
92                  }
93                  catch ( Throwable t )
94                  {
95                      errors.addException( t );
96                  }
97              }
98          }
99  
100         boolean hasErrors()
101         {
102             return errors.hasNestedExceptions();
103         }
104 
105         void throwErrors() throws IOException
106         {
107             throw errors;
108         }
109     }
110 
111     public ThreadedStreamConsumer( EventHandler<Event> target )
112     {
113         pumper = new Pumper( target );
114         thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
115         thread.start();
116     }
117 
118     @Override
119     public void handleEvent( @Nonnull Event event )
120     {
121         if ( stop.get() )
122         {
123             return;
124         }
125         else if ( !thread.isAlive() )
126         {
127             items.clear();
128             return;
129         }
130 
131         try
132         {
133             items.put( event );
134         }
135         catch ( InterruptedException e )
136         {
137             currentThread().interrupt();
138             throw new IllegalStateException( e );
139         }
140     }
141 
142     @Override
143     public void close()
144             throws IOException
145     {
146         if ( stop.compareAndSet( false, true ) )
147         {
148             try
149             {
150                 items.put( END_ITEM );
151             }
152             catch ( InterruptedException e )
153             {
154                 currentThread().interrupt();
155             }
156         }
157 
158         if ( pumper.hasErrors() )
159         {
160             pumper.throwErrors();
161         }
162     }
163 
164     /**
165      * Compared item with {@link #END_ITEM} by identity.
166      *
167      * @param item    element from <code>items</code>
168      * @return {@code true} if tail of the queue
169      */
170     private boolean shouldStopQueueing( Event item )
171     {
172         return item == END_ITEM;
173     }
174 
175     /**
176      *
177      */
178     private static class FinalEvent extends Event
179     {
180         FinalEvent()
181         {
182             super( null );
183         }
184 
185         @Override
186         public boolean isControlCategory()
187         {
188             return false;
189         }
190 
191         @Override
192         public boolean isConsoleCategory()
193         {
194             return false;
195         }
196 
197         @Override
198         public boolean isConsoleErrorCategory()
199         {
200             return false;
201         }
202 
203         @Override
204         public boolean isStandardStreamCategory()
205         {
206             return false;
207         }
208 
209         @Override
210         public boolean isSysPropCategory()
211         {
212             return false;
213         }
214 
215         @Override
216         public boolean isTestCategory()
217         {
218             return false;
219         }
220 
221         @Override
222         public boolean isJvmExitError()
223         {
224             return false;
225         }
226     }
227 }