1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.shared.utils.cli.StreamConsumer;
23 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
24
25 import java.io.Closeable;
26 import java.io.IOException;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import static java.lang.Thread.currentThread;
32
33
34
35
36
37
38 public final class ThreadedStreamConsumer
39 implements StreamConsumer, Closeable
40 {
41 private static final String END_ITEM = "";
42
43 private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
44
45 private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
46
47 private final AtomicBoolean stop = new AtomicBoolean();
48
49 private final Thread thread;
50
51 private final Pumper pumper;
52
53 final class Pumper
54 implements Runnable
55 {
56 private final StreamConsumer target;
57
58 private final MultipleFailureException errors = new MultipleFailureException();
59
60 Pumper( StreamConsumer target )
61 {
62 this.target = target;
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76 public void run()
77 {
78 while ( !ThreadedStreamConsumer.this.stop.get() )
79 {
80 try
81 {
82 String item = ThreadedStreamConsumer.this.items.take();
83 if ( shouldStopQueueing( item ) )
84 {
85 return;
86 }
87 target.consumeLine( item );
88 }
89 catch ( Throwable t )
90 {
91 errors.addException( t );
92 }
93 }
94 }
95
96 boolean hasErrors()
97 {
98 return errors.hasNestedExceptions();
99 }
100
101 void throwErrors() throws IOException
102 {
103 throw errors;
104 }
105 }
106
107 public ThreadedStreamConsumer( StreamConsumer target )
108 {
109 pumper = new Pumper( target );
110 thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
111 thread.start();
112 }
113
114 public void consumeLine( String s )
115 {
116 if ( stop.get() && !thread.isAlive() )
117 {
118 items.clear();
119 return;
120 }
121
122 try
123 {
124 items.put( s );
125 }
126 catch ( InterruptedException e )
127 {
128 currentThread().interrupt();
129 throw new IllegalStateException( e );
130 }
131 }
132
133 public void close()
134 throws IOException
135 {
136 if ( stop.compareAndSet( false, true ) )
137 {
138 items.clear();
139 try
140 {
141 items.put( END_ITEM );
142 }
143 catch ( InterruptedException e )
144 {
145 currentThread().interrupt();
146 }
147 }
148
149 if ( pumper.hasErrors() )
150 {
151 pumper.throwErrors();
152 }
153 }
154
155
156
157
158
159
160
161 private boolean shouldStopQueueing( String item )
162 {
163 return item == END_ITEM;
164 }
165 }