1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.shared.utils.cli.StreamConsumer;
23  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
24  
25  import java.io.Closeable;
26  import java.io.IOException;
27  import java.util.concurrent.ArrayBlockingQueue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  
31  import static java.lang.Thread.currentThread;
32  
33  /**
34   * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
35   *
36   * @author Kristian Rosenvold
37   */
38  public final class ThreadedStreamConsumer
39          implements StreamConsumer, Closeable
40  {
41      private static final String END_ITEM = "";
42  
43      private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
44  
45      private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
46  
47      private final AtomicBoolean stop = new AtomicBoolean();
48  
49      private final Thread thread;
50  
51      private final Pumper pumper;
52  
53      final class Pumper
54              implements Runnable
55      {
56          private final StreamConsumer target;
57  
58          private final MultipleFailureException errors = new MultipleFailureException();
59  
60          Pumper( StreamConsumer target )
61          {
62              this.target = target;
63          }
64  
65          /**
66           * Calls {@link ForkClient#consumeLine(String)} which may throw any {@link RuntimeException}.<br>
67           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
68           * try-catch block must be inside of the loop which prevents from loosing events from {@link StreamConsumer}.
69           * <br>
70           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
71           * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
72           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
73           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
74           * and therefore the plugin could be permanently in progress.
75           */
76          @Override
77          public void run()
78          {
79              while ( !ThreadedStreamConsumer.this.stop.get() )
80              {
81                  try
82                  {
83                      String item = ThreadedStreamConsumer.this.items.take();
84                      if ( shouldStopQueueing( item ) )
85                      {
86                          return;
87                      }
88                      target.consumeLine( item );
89                  }
90                  catch ( Throwable t )
91                  {
92                      errors.addException( t );
93                  }
94              }
95          }
96  
97          boolean hasErrors()
98          {
99              return errors.hasNestedExceptions();
100         }
101 
102         void throwErrors() throws IOException
103         {
104             throw errors;
105         }
106     }
107 
108     public ThreadedStreamConsumer( StreamConsumer target )
109     {
110         pumper = new Pumper( target );
111         thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
112         thread.start();
113     }
114 
115     @Override
116     public void consumeLine( String s )
117     {
118         if ( stop.get() && !thread.isAlive() )
119         {
120             items.clear();
121             return;
122         }
123 
124         try
125         {
126             items.put( s );
127         }
128         catch ( InterruptedException e )
129         {
130             currentThread().interrupt();
131             throw new IllegalStateException( e );
132         }
133     }
134 
135     @Override
136     public void close()
137             throws IOException
138     {
139         if ( stop.compareAndSet( false, true ) )
140         {
141             items.clear();
142             try
143             {
144                 items.put( END_ITEM );
145             }
146             catch ( InterruptedException e )
147             {
148                 currentThread().interrupt();
149             }
150         }
151 
152         if ( pumper.hasErrors() )
153         {
154             pumper.throwErrors();
155         }
156     }
157 
158     /**
159      * Compared item with {@link #END_ITEM} by identity.
160      *
161      * @param item    element from <code>items</code>
162      * @return {@code true} if tail of the queue
163      */
164     private boolean shouldStopQueueing( String item )
165     {
166         return item == END_ITEM;
167     }
168 }