1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.lifecycle.internal.builder.multithreaded;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.PrintStream;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Set;
30 import org.apache.maven.lifecycle.internal.ProjectBuildList;
31 import org.apache.maven.lifecycle.internal.ProjectSegment;
32
33
34
35
36
37
38
39
40
41 @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
42 public class ThreadOutputMuxer {
43 private final Iterator<ProjectSegment> projects;
44
45 private final ThreadLocal<ProjectSegment> projectBuildThreadLocal = new ThreadLocal<>();
46
47 private final Map<ProjectSegment, ByteArrayOutputStream> streams = new HashMap<>();
48
49 private final Map<ProjectSegment, PrintStream> printStreams = new HashMap<>();
50
51 private final ByteArrayOutputStream defaultOutputStreamForUnknownData = new ByteArrayOutputStream();
52
53 private final PrintStream defaultPrintStream = new PrintStream(defaultOutputStreamForUnknownData);
54
55 private final Set<ProjectSegment> completedBuilds = Collections.synchronizedSet(new HashSet<>());
56
57 private volatile ProjectSegment currentBuild;
58
59 private final PrintStream originalSystemOUtStream;
60
61 private final ConsolePrinter printer;
62
63
64
65
66 class ConsolePrinter implements Runnable {
67 private volatile boolean running;
68
69 private final ProjectBuildList projectBuildList;
70
71 ConsolePrinter(ProjectBuildList projectBuildList) {
72 this.projectBuildList = projectBuildList;
73 }
74
75 public void run() {
76 running = true;
77 for (ProjectSegment projectBuild : projectBuildList) {
78 final PrintStream projectStream = printStreams.get(projectBuild);
79 ByteArrayOutputStream projectOs = streams.get(projectBuild);
80
81 do {
82 synchronized (projectStream) {
83 try {
84 projectStream.wait(100);
85 } catch (InterruptedException e) {
86 throw new RuntimeException(e);
87 }
88 try {
89 projectOs.writeTo(originalSystemOUtStream);
90 } catch (IOException e) {
91 throw new RuntimeException(e);
92 }
93
94 projectOs.reset();
95 }
96 } while (!completedBuilds.contains(projectBuild));
97 }
98 running = false;
99 }
100
101
102
103
104
105 public void waitUntilRunning(boolean expect) {
106 while (!running == expect) {
107 try {
108 Thread.sleep(10);
109 } catch (InterruptedException e) {
110 throw new RuntimeException(e);
111 }
112 }
113 }
114 }
115
116 public ThreadOutputMuxer(ProjectBuildList segmentChunks, PrintStream originalSystemOut) {
117 projects = segmentChunks.iterator();
118 for (ProjectSegment segmentChunk : segmentChunks) {
119 final ByteArrayOutputStream value = new ByteArrayOutputStream();
120 streams.put(segmentChunk, value);
121 printStreams.put(segmentChunk, new PrintStream(value));
122 }
123 setNext();
124 this.originalSystemOUtStream = originalSystemOut;
125 System.setOut(new ThreadBoundPrintStream(this.originalSystemOUtStream));
126 printer = new ConsolePrinter(segmentChunks);
127 new Thread(printer).start();
128 printer.waitUntilRunning(true);
129 }
130
131 public void close() {
132 printer.waitUntilRunning(false);
133 System.setOut(this.originalSystemOUtStream);
134 }
135
136 private void setNext() {
137 currentBuild = projects.hasNext() ? projects.next() : null;
138 }
139
140 private boolean ownsRealOutputStream(ProjectSegment projectBuild) {
141 return projectBuild.equals(currentBuild);
142 }
143
144 private PrintStream getThreadBoundPrintStream() {
145 ProjectSegment threadProject = projectBuildThreadLocal.get();
146 if (threadProject == null) {
147 return defaultPrintStream;
148 }
149 if (ownsRealOutputStream(threadProject)) {
150 return originalSystemOUtStream;
151 }
152 return printStreams.get(threadProject);
153 }
154
155 public void associateThreadWithProjectSegment(ProjectSegment projectBuild) {
156 projectBuildThreadLocal.set(projectBuild);
157 }
158
159 public void setThisModuleComplete(ProjectSegment projectBuild) {
160 completedBuilds.add(projectBuild);
161 PrintStream stream = printStreams.get(projectBuild);
162 synchronized (stream) {
163 stream.notifyAll();
164 }
165 disconnectThreadFromProject();
166 }
167
168 private void disconnectThreadFromProject() {
169 projectBuildThreadLocal.remove();
170 }
171
172 private class ThreadBoundPrintStream extends PrintStream {
173
174 ThreadBoundPrintStream(PrintStream systemOutStream) {
175 super(systemOutStream);
176 }
177
178 private PrintStream getOutputStreamForCurrentThread() {
179 return getThreadBoundPrintStream();
180 }
181
182 @Override
183 public void println() {
184 final PrintStream currentStream = getOutputStreamForCurrentThread();
185 synchronized (currentStream) {
186 currentStream.println();
187 currentStream.notifyAll();
188 }
189 }
190
191 @Override
192 public void print(char c) {
193 final PrintStream currentStream = getOutputStreamForCurrentThread();
194 synchronized (currentStream) {
195 currentStream.print(c);
196 currentStream.notifyAll();
197 }
198 }
199
200 @Override
201 public void println(char x) {
202 final PrintStream currentStream = getOutputStreamForCurrentThread();
203 synchronized (currentStream) {
204 currentStream.println(x);
205 currentStream.notifyAll();
206 }
207 }
208
209 @Override
210 public void print(double d) {
211 final PrintStream currentStream = getOutputStreamForCurrentThread();
212 synchronized (currentStream) {
213 currentStream.print(d);
214 currentStream.notifyAll();
215 }
216 }
217
218 @Override
219 public void println(double x) {
220 final PrintStream currentStream = getOutputStreamForCurrentThread();
221 synchronized (currentStream) {
222 currentStream.println(x);
223 currentStream.notifyAll();
224 }
225 }
226
227 @Override
228 public void print(float f) {
229 final PrintStream currentStream = getOutputStreamForCurrentThread();
230 synchronized (currentStream) {
231 currentStream.print(f);
232 currentStream.notifyAll();
233 }
234 }
235
236 @Override
237 public void println(float x) {
238 final PrintStream currentStream = getOutputStreamForCurrentThread();
239 synchronized (currentStream) {
240 currentStream.println(x);
241 currentStream.notifyAll();
242 }
243 }
244
245 @Override
246 public void print(int i) {
247 final PrintStream currentStream = getOutputStreamForCurrentThread();
248 synchronized (currentStream) {
249 currentStream.print(i);
250 currentStream.notifyAll();
251 }
252 }
253
254 @Override
255 public void println(int x) {
256 final PrintStream currentStream = getOutputStreamForCurrentThread();
257 synchronized (currentStream) {
258 currentStream.println(x);
259 currentStream.notifyAll();
260 }
261 }
262
263 @Override
264 public void print(long l) {
265 final PrintStream currentStream = getOutputStreamForCurrentThread();
266 synchronized (currentStream) {
267 currentStream.print(l);
268 currentStream.notifyAll();
269 }
270 }
271
272 @Override
273 public void println(long x) {
274 final PrintStream currentStream = getOutputStreamForCurrentThread();
275 synchronized (currentStream) {
276 currentStream.print(x);
277 currentStream.notifyAll();
278 }
279 }
280
281 @Override
282 public void print(boolean b) {
283 final PrintStream currentStream = getOutputStreamForCurrentThread();
284 synchronized (currentStream) {
285 currentStream.print(b);
286 currentStream.notifyAll();
287 }
288 }
289
290 @Override
291 public void println(boolean x) {
292 final PrintStream currentStream = getOutputStreamForCurrentThread();
293 synchronized (currentStream) {
294 currentStream.print(x);
295 currentStream.notifyAll();
296 }
297 }
298
299 @Override
300 public void print(char s[]) {
301 final PrintStream currentStream = getOutputStreamForCurrentThread();
302 synchronized (currentStream) {
303 currentStream.print(s);
304 currentStream.notifyAll();
305 }
306 }
307
308 @Override
309 public void println(char x[]) {
310 final PrintStream currentStream = getOutputStreamForCurrentThread();
311 synchronized (currentStream) {
312 currentStream.print(x);
313 currentStream.notifyAll();
314 }
315 }
316
317 @Override
318 public void print(Object obj) {
319 final PrintStream currentStream = getOutputStreamForCurrentThread();
320 synchronized (currentStream) {
321 currentStream.print(obj);
322 currentStream.notifyAll();
323 }
324 }
325
326 @Override
327 public void println(Object x) {
328 final PrintStream currentStream = getOutputStreamForCurrentThread();
329 synchronized (currentStream) {
330 currentStream.println(x);
331 currentStream.notifyAll();
332 }
333 }
334
335 @Override
336 public void print(String s) {
337 final PrintStream currentStream = getOutputStreamForCurrentThread();
338 synchronized (currentStream) {
339 currentStream.print(s);
340 currentStream.notifyAll();
341 }
342 }
343
344 @Override
345 public void println(String x) {
346 final PrintStream currentStream = getOutputStreamForCurrentThread();
347 synchronized (currentStream) {
348 currentStream.println(x);
349 currentStream.notifyAll();
350 }
351 }
352
353 @Override
354 public void write(byte b[], int off, int len) {
355 final PrintStream currentStream = getOutputStreamForCurrentThread();
356 synchronized (currentStream) {
357 currentStream.write(b, off, len);
358 currentStream.notifyAll();
359 }
360 }
361
362 @Override
363 public void close() {
364 getOutputStreamForCurrentThread().close();
365 }
366
367 @Override
368 public void flush() {
369 getOutputStreamForCurrentThread().flush();
370 }
371
372 @Override
373 public void write(int b) {
374 final PrintStream currentStream = getOutputStreamForCurrentThread();
375 synchronized (currentStream) {
376 currentStream.write(b);
377 currentStream.notifyAll();
378 }
379 }
380
381 @Override
382 public void write(byte b[]) throws IOException {
383 final PrintStream currentStream = getOutputStreamForCurrentThread();
384 synchronized (currentStream) {
385 currentStream.write(b);
386 currentStream.notifyAll();
387 }
388 }
389 }
390 }