1 package org.apache.maven.lifecycle.internal.builder.multithreaded;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayOutputStream;
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.maven.lifecycle.internal.ProjectBuildList;
33 import org.apache.maven.lifecycle.internal.ProjectSegment;
34
35
36
37
38
39
40
41
42
43 @SuppressWarnings( { "SynchronizationOnLocalVariableOrMethodParameter" } )
44 public class ThreadOutputMuxer
45 {
46 private final Iterator<ProjectSegment> projects;
47
48 private final ThreadLocal<ProjectSegment> projectBuildThreadLocal = new ThreadLocal<>();
49
50 private final Map<ProjectSegment, ByteArrayOutputStream> streams =
51 new HashMap<>();
52
53 private final Map<ProjectSegment, PrintStream> printStreams = new HashMap<>();
54
55 private final ByteArrayOutputStream defaultOutputStreamForUnknownData = new ByteArrayOutputStream();
56
57 private final PrintStream defaultPrintStream = new PrintStream( defaultOutputStreamForUnknownData );
58
59 private final Set<ProjectSegment> completedBuilds = Collections.synchronizedSet( new HashSet<ProjectSegment>() );
60
61 private volatile ProjectSegment currentBuild;
62
63 private final PrintStream originalSystemOUtStream;
64
65 private final ConsolePrinter printer;
66
67
68
69
70
71 class ConsolePrinter
72 implements Runnable
73 {
74 private volatile boolean running;
75
76 private final ProjectBuildList projectBuildList;
77
78 ConsolePrinter( ProjectBuildList projectBuildList )
79 {
80 this.projectBuildList = projectBuildList;
81 }
82
83 public void run()
84 {
85 running = true;
86 for ( ProjectSegment projectBuild : projectBuildList )
87 {
88 final PrintStream projectStream = printStreams.get( projectBuild );
89 ByteArrayOutputStream projectOs = streams.get( projectBuild );
90
91 do
92 {
93 synchronized ( projectStream )
94 {
95 try
96 {
97 projectStream.wait( 100 );
98 }
99 catch ( InterruptedException e )
100 {
101 throw new RuntimeException( e );
102 }
103 try
104 {
105 projectOs.writeTo( originalSystemOUtStream );
106 }
107 catch ( IOException e )
108 {
109 throw new RuntimeException( e );
110 }
111
112 projectOs.reset();
113 }
114 }
115 while ( !completedBuilds.contains( projectBuild ) );
116 }
117 running = false;
118 }
119
120
121
122
123
124 public void waitUntilRunning( boolean expect )
125 {
126 while ( !running == expect )
127 {
128 try
129 {
130 Thread.sleep( 10 );
131 }
132 catch ( InterruptedException e )
133 {
134 throw new RuntimeException( e );
135 }
136 }
137 }
138 }
139
140 public ThreadOutputMuxer( ProjectBuildList segmentChunks, PrintStream originalSystemOut )
141 {
142 projects = segmentChunks.iterator();
143 for ( ProjectSegment segmentChunk : segmentChunks )
144 {
145 final ByteArrayOutputStream value = new ByteArrayOutputStream();
146 streams.put( segmentChunk, value );
147 printStreams.put( segmentChunk, new PrintStream( value ) );
148 }
149 setNext();
150 this.originalSystemOUtStream = originalSystemOut;
151 System.setOut( new ThreadBoundPrintStream( this.originalSystemOUtStream ) );
152 printer = new ConsolePrinter( segmentChunks );
153 new Thread( printer ).start();
154 printer.waitUntilRunning( true );
155 }
156
157 public void close()
158 {
159 printer.waitUntilRunning( false );
160 System.setOut( this.originalSystemOUtStream );
161 }
162
163 private void setNext()
164 {
165 currentBuild = projects.hasNext() ? projects.next() : null;
166 }
167
168 private boolean ownsRealOutputStream( ProjectSegment projectBuild )
169 {
170 return projectBuild.equals( currentBuild );
171 }
172
173 private PrintStream getThreadBoundPrintStream()
174 {
175 ProjectSegment threadProject = projectBuildThreadLocal.get();
176 if ( threadProject == null )
177 {
178 return defaultPrintStream;
179 }
180 if ( ownsRealOutputStream( threadProject ) )
181 {
182 return originalSystemOUtStream;
183 }
184 return printStreams.get( threadProject );
185 }
186
187 public void associateThreadWithProjectSegment( ProjectSegment projectBuild )
188 {
189 projectBuildThreadLocal.set( projectBuild );
190 }
191
192 public void setThisModuleComplete( ProjectSegment projectBuild )
193 {
194 completedBuilds.add( projectBuild );
195 PrintStream stream = printStreams.get( projectBuild );
196 synchronized ( stream )
197 {
198 stream.notifyAll();
199 }
200 disconnectThreadFromProject();
201 }
202
203 private void disconnectThreadFromProject()
204 {
205 projectBuildThreadLocal.remove();
206 }
207
208 private class ThreadBoundPrintStream
209 extends PrintStream
210 {
211
212 ThreadBoundPrintStream( PrintStream systemOutStream )
213 {
214 super( systemOutStream );
215 }
216
217 private PrintStream getOutputStreamForCurrentThread()
218 {
219 return getThreadBoundPrintStream();
220 }
221
222 @Override
223 public void println()
224 {
225 final PrintStream currentStream = getOutputStreamForCurrentThread();
226 synchronized ( currentStream )
227 {
228 currentStream.println();
229 currentStream.notifyAll();
230 }
231 }
232
233 @Override
234 public void print( char c )
235 {
236 final PrintStream currentStream = getOutputStreamForCurrentThread();
237 synchronized ( currentStream )
238 {
239 currentStream.print( c );
240 currentStream.notifyAll();
241 }
242 }
243
244 @Override
245 public void println( char x )
246 {
247 final PrintStream currentStream = getOutputStreamForCurrentThread();
248 synchronized ( currentStream )
249 {
250 currentStream.println( x );
251 currentStream.notifyAll();
252 }
253 }
254
255 @Override
256 public void print( double d )
257 {
258 final PrintStream currentStream = getOutputStreamForCurrentThread();
259 synchronized ( currentStream )
260 {
261 currentStream.print( d );
262 currentStream.notifyAll();
263 }
264 }
265
266 @Override
267 public void println( double x )
268 {
269 final PrintStream currentStream = getOutputStreamForCurrentThread();
270 synchronized ( currentStream )
271 {
272 currentStream.println( x );
273 currentStream.notifyAll();
274 }
275 }
276
277 @Override
278 public void print( float f )
279 {
280 final PrintStream currentStream = getOutputStreamForCurrentThread();
281 synchronized ( currentStream )
282 {
283 currentStream.print( f );
284 currentStream.notifyAll();
285 }
286 }
287
288 @Override
289 public void println( float x )
290 {
291 final PrintStream currentStream = getOutputStreamForCurrentThread();
292 synchronized ( currentStream )
293 {
294 currentStream.println( x );
295 currentStream.notifyAll();
296 }
297 }
298
299 @Override
300 public void print( int i )
301 {
302 final PrintStream currentStream = getOutputStreamForCurrentThread();
303 synchronized ( currentStream )
304 {
305 currentStream.print( i );
306 currentStream.notifyAll();
307 }
308 }
309
310 @Override
311 public void println( int x )
312 {
313 final PrintStream currentStream = getOutputStreamForCurrentThread();
314 synchronized ( currentStream )
315 {
316 currentStream.println( x );
317 currentStream.notifyAll();
318 }
319 }
320
321 @Override
322 public void print( long l )
323 {
324 final PrintStream currentStream = getOutputStreamForCurrentThread();
325 synchronized ( currentStream )
326 {
327 currentStream.print( l );
328 currentStream.notifyAll();
329 }
330 }
331
332 @Override
333 public void println( long x )
334 {
335 final PrintStream currentStream = getOutputStreamForCurrentThread();
336 synchronized ( currentStream )
337 {
338 currentStream.print( x );
339 currentStream.notifyAll();
340 }
341 }
342
343 @Override
344 public void print( boolean b )
345 {
346 final PrintStream currentStream = getOutputStreamForCurrentThread();
347 synchronized ( currentStream )
348 {
349 currentStream.print( b );
350 currentStream.notifyAll();
351 }
352 }
353
354 @Override
355 public void println( boolean x )
356 {
357 final PrintStream currentStream = getOutputStreamForCurrentThread();
358 synchronized ( currentStream )
359 {
360 currentStream.print( x );
361 currentStream.notifyAll();
362 }
363 }
364
365 @Override
366 public void print( char s[] )
367 {
368 final PrintStream currentStream = getOutputStreamForCurrentThread();
369 synchronized ( currentStream )
370 {
371 currentStream.print( s );
372 currentStream.notifyAll();
373 }
374 }
375
376 @Override
377 public void println( char x[] )
378 {
379 final PrintStream currentStream = getOutputStreamForCurrentThread();
380 synchronized ( currentStream )
381 {
382 currentStream.print( x );
383 currentStream.notifyAll();
384 }
385 }
386
387 @Override
388 public void print( Object obj )
389 {
390 final PrintStream currentStream = getOutputStreamForCurrentThread();
391 synchronized ( currentStream )
392 {
393 currentStream.print( obj );
394 currentStream.notifyAll();
395 }
396 }
397
398 @Override
399 public void println( Object x )
400 {
401 final PrintStream currentStream = getOutputStreamForCurrentThread();
402 synchronized ( currentStream )
403 {
404 currentStream.println( x );
405 currentStream.notifyAll();
406 }
407 }
408
409 @Override
410 public void print( String s )
411 {
412 final PrintStream currentStream = getOutputStreamForCurrentThread();
413 synchronized ( currentStream )
414 {
415 currentStream.print( s );
416 currentStream.notifyAll();
417 }
418 }
419
420 @Override
421 public void println( String x )
422 {
423 final PrintStream currentStream = getOutputStreamForCurrentThread();
424 synchronized ( currentStream )
425 {
426 currentStream.println( x );
427 currentStream.notifyAll();
428 }
429 }
430
431 @Override
432 public void write( byte b[], int off, int len )
433 {
434 final PrintStream currentStream = getOutputStreamForCurrentThread();
435 synchronized ( currentStream )
436 {
437 currentStream.write( b, off, len );
438 currentStream.notifyAll();
439 }
440 }
441
442 @Override
443 public void close()
444 {
445 getOutputStreamForCurrentThread().close();
446 }
447
448 @Override
449 public void flush()
450 {
451 getOutputStreamForCurrentThread().flush();
452 }
453
454 @Override
455 public void write( int b )
456 {
457 final PrintStream currentStream = getOutputStreamForCurrentThread();
458 synchronized ( currentStream )
459 {
460 currentStream.write( b );
461 currentStream.notifyAll();
462 }
463 }
464
465 @Override
466 public void write( byte b[] )
467 throws IOException
468 {
469 final PrintStream currentStream = getOutputStreamForCurrentThread();
470 synchronized ( currentStream )
471 {
472 currentStream.write( b );
473 currentStream.notifyAll();
474 }
475 }
476 }
477 }