View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.shared.utils.cli.StreamConsumer;
23  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
24  
25  import java.io.Closeable;
26  import java.io.IOException;
27  import java.util.concurrent.ArrayBlockingQueue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  
31  import static java.lang.Thread.currentThread;
32  
33  /**
34   * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
35   *
36   * @author Kristian Rosenvold
37   */
38  public final class ThreadedStreamConsumer
39          implements StreamConsumer, Closeable
40  {
41      private static final String END_ITEM = "";
42  
43      private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
44  
45      private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
46  
47      private final AtomicBoolean stop = new AtomicBoolean();
48  
49      private final Thread thread;
50  
51      private final Pumper pumper;
52  
53      final class Pumper
54              implements Runnable
55      {
56          private final StreamConsumer target;
57  
58          private final MultipleFailureException errors = new MultipleFailureException();
59  
60          Pumper( StreamConsumer target )
61          {
62              this.target = target;
63          }
64  
65          /**
66           * Calls {@link ForkClient#consumeLine(String)} which may throw any {@link RuntimeException}.<p/>
67           * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
68           * try-catch block must be inside of the loop which prevents from loosing events from {@link StreamConsumer}.
69           * <p/>
70           * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
71           * {@link java.io.IOException} and then <em>target.consumeLine()</em> throws any RuntimeException, this method
72           * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
73           * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
74           * and therefore the plugin could be permanently in progress.
75           */
76          public void run()
77          {
78              while ( !ThreadedStreamConsumer.this.stop.get() )
79              {
80                  try
81                  {
82                      String item = ThreadedStreamConsumer.this.items.take();
83                      if ( shouldStopQueueing( item ) )
84                      {
85                          return;
86                      }
87                      target.consumeLine( item );
88                  }
89                  catch ( Throwable t )
90                  {
91                      errors.addException( t );
92                  }
93              }
94          }
95  
96          boolean hasErrors()
97          {
98              return errors.hasNestedExceptions();
99          }
100 
101         void throwErrors() throws IOException
102         {
103             throw errors;
104         }
105     }
106 
107     public ThreadedStreamConsumer( StreamConsumer target )
108     {
109         pumper = new Pumper( target );
110         thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
111         thread.start();
112     }
113 
114     public void consumeLine( String s )
115     {
116         if ( stop.get() && !thread.isAlive() )
117         {
118             items.clear();
119             return;
120         }
121 
122         try
123         {
124             items.put( s );
125         }
126         catch ( InterruptedException e )
127         {
128             currentThread().interrupt();
129             throw new IllegalStateException( e );
130         }
131     }
132 
133     public void close()
134             throws IOException
135     {
136         if ( stop.compareAndSet( false, true ) )
137         {
138             items.clear();
139             try
140             {
141                 items.put( END_ITEM );
142             }
143             catch ( InterruptedException e )
144             {
145                 currentThread().interrupt();
146             }
147         }
148 
149         if ( pumper.hasErrors() )
150         {
151             pumper.throwErrors();
152         }
153     }
154 
155     /**
156      * Compared item with {@link #END_ITEM} by identity.
157      *
158      * @param item    element from <code>items</code>
159      * @return <tt>true</tt> if tail of the queue
160      */
161     private boolean shouldStopQueueing( String item )
162     {
163         return item == END_ITEM;
164     }
165 }