1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import org.apache.maven.shared.utils.cli.StreamConsumer;
23 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
24
25 import java.io.Closeable;
26 import java.io.IOException;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import static java.lang.Thread.currentThread;
32
33 /**
34 * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
35 *
36 * @author Kristian Rosenvold
37 */
38 public final class ThreadedStreamConsumer
39 implements StreamConsumer, Closeable
40 {
41 private static final String END_ITEM = "";
42
43 private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
44
45 private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
46
47 private final AtomicBoolean stop = new AtomicBoolean();
48
49 private final Thread thread;
50
51 private final Pumper pumper;
52
53 final class Pumper
54 implements Runnable
55 {
56 private final StreamConsumer target;
57
58 private final MultipleFailureException errors = new MultipleFailureException();
59
60 Pumper( StreamConsumer target )
61 {
62 this.target = target;
63 }
64
65 /**
66 * Calls {@link ForkClient#consumeLine(String)} which may throw any {@link RuntimeException}.<br>
67 * Even if {@link ForkClient} is not fault-tolerant, this method MUST be fault-tolerant and thus the
68 * try-catch block must be inside of the loop which prevents from loosing events from {@link StreamConsumer}.
69 * <br>
70 * If {@link org.apache.maven.plugin.surefire.report.ConsoleOutputFileReporter#writeTestOutput} throws
71 * {@link java.io.IOException} and then {@code target.consumeLine()} throws any RuntimeException, this method
72 * MUST NOT skip reading the events from the forked JVM; otherwise we could simply lost events
73 * e.g. acquire-next-test which means that {@link ForkClient} could hang on waiting for old test to complete
74 * and therefore the plugin could be permanently in progress.
75 */
76 @Override
77 public void run()
78 {
79 while ( !ThreadedStreamConsumer.this.stop.get() )
80 {
81 try
82 {
83 String item = ThreadedStreamConsumer.this.items.take();
84 if ( shouldStopQueueing( item ) )
85 {
86 return;
87 }
88 target.consumeLine( item );
89 }
90 catch ( Throwable t )
91 {
92 errors.addException( t );
93 }
94 }
95 }
96
97 boolean hasErrors()
98 {
99 return errors.hasNestedExceptions();
100 }
101
102 void throwErrors() throws IOException
103 {
104 throw errors;
105 }
106 }
107
108 public ThreadedStreamConsumer( StreamConsumer target )
109 {
110 pumper = new Pumper( target );
111 thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
112 thread.start();
113 }
114
115 @Override
116 public void consumeLine( String s )
117 {
118 if ( stop.get() && !thread.isAlive() )
119 {
120 items.clear();
121 return;
122 }
123
124 try
125 {
126 items.put( s );
127 }
128 catch ( InterruptedException e )
129 {
130 currentThread().interrupt();
131 throw new IllegalStateException( e );
132 }
133 }
134
135 @Override
136 public void close()
137 throws IOException
138 {
139 if ( stop.compareAndSet( false, true ) )
140 {
141 items.clear();
142 try
143 {
144 items.put( END_ITEM );
145 }
146 catch ( InterruptedException e )
147 {
148 currentThread().interrupt();
149 }
150 }
151
152 if ( pumper.hasErrors() )
153 {
154 pumper.throwErrors();
155 }
156 }
157
158 /**
159 * Compared item with {@link #END_ITEM} by identity.
160 *
161 * @param item element from <code>items</code>
162 * @return {@code true} if tail of the queue
163 */
164 private boolean shouldStopQueueing( String item )
165 {
166 return item == END_ITEM;
167 }
168 }