View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.lifecycle.internal.builder.multithreaded;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.PrintStream;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.Set;
30  import org.apache.maven.lifecycle.internal.ProjectBuildList;
31  import org.apache.maven.lifecycle.internal.ProjectSegment;
32  
33  /**
34   * <strong>NOTE:</strong> This class is not part of any public api and can be changed or deleted without prior notice.
35   * This class in particular may spontaneously self-combust and be replaced by a plexus-compliant thread aware
36   * logger implementation at any time.
37   *
38   * @since 3.0
39   * @author Kristian Rosenvold
40   */
41  @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
42  public class ThreadOutputMuxer {
43      private final Iterator<ProjectSegment> projects;
44  
45      private final ThreadLocal<ProjectSegment> projectBuildThreadLocal = new ThreadLocal<>();
46  
47      private final Map<ProjectSegment, ByteArrayOutputStream> streams = new HashMap<>();
48  
49      private final Map<ProjectSegment, PrintStream> printStreams = new HashMap<>();
50  
51      private final ByteArrayOutputStream defaultOutputStreamForUnknownData = new ByteArrayOutputStream();
52  
53      private final PrintStream defaultPrintStream = new PrintStream(defaultOutputStreamForUnknownData);
54  
55      private final Set<ProjectSegment> completedBuilds = Collections.synchronizedSet(new HashSet<>());
56  
57      private volatile ProjectSegment currentBuild;
58  
59      private final PrintStream originalSystemOUtStream;
60  
61      private final ConsolePrinter printer;
62  
63      /**
64       * A simple but safe solution for printing to the console.
65       */
66      class ConsolePrinter implements Runnable {
67          private volatile boolean running;
68  
69          private final ProjectBuildList projectBuildList;
70  
71          ConsolePrinter(ProjectBuildList projectBuildList) {
72              this.projectBuildList = projectBuildList;
73          }
74  
75          public void run() {
76              running = true;
77              for (ProjectSegment projectBuild : projectBuildList) {
78                  final PrintStream projectStream = printStreams.get(projectBuild);
79                  ByteArrayOutputStream projectOs = streams.get(projectBuild);
80  
81                  do {
82                      synchronized (projectStream) {
83                          try {
84                              projectStream.wait(100);
85                          } catch (InterruptedException e) {
86                              throw new RuntimeException(e);
87                          }
88                          try {
89                              projectOs.writeTo(originalSystemOUtStream);
90                          } catch (IOException e) {
91                              throw new RuntimeException(e);
92                          }
93  
94                          projectOs.reset();
95                      }
96                  } while (!completedBuilds.contains(projectBuild));
97              }
98              running = false;
99          }
100 
101         /*
102         Wait until we are sure the print-stream thread is running.
103          */
104 
105         public void waitUntilRunning(boolean expect) {
106             while (!running == expect) {
107                 try {
108                     Thread.sleep(10);
109                 } catch (InterruptedException e) {
110                     throw new RuntimeException(e);
111                 }
112             }
113         }
114     }
115 
116     public ThreadOutputMuxer(ProjectBuildList segmentChunks, PrintStream originalSystemOut) {
117         projects = segmentChunks.iterator();
118         for (ProjectSegment segmentChunk : segmentChunks) {
119             final ByteArrayOutputStream value = new ByteArrayOutputStream();
120             streams.put(segmentChunk, value);
121             printStreams.put(segmentChunk, new PrintStream(value));
122         }
123         setNext();
124         this.originalSystemOUtStream = originalSystemOut;
125         System.setOut(new ThreadBoundPrintStream(this.originalSystemOUtStream));
126         printer = new ConsolePrinter(segmentChunks);
127         new Thread(printer).start();
128         printer.waitUntilRunning(true);
129     }
130 
131     public void close() {
132         printer.waitUntilRunning(false);
133         System.setOut(this.originalSystemOUtStream);
134     }
135 
136     private void setNext() {
137         currentBuild = projects.hasNext() ? projects.next() : null;
138     }
139 
140     private boolean ownsRealOutputStream(ProjectSegment projectBuild) {
141         return projectBuild.equals(currentBuild);
142     }
143 
144     private PrintStream getThreadBoundPrintStream() {
145         ProjectSegment threadProject = projectBuildThreadLocal.get();
146         if (threadProject == null) {
147             return defaultPrintStream;
148         }
149         if (ownsRealOutputStream(threadProject)) {
150             return originalSystemOUtStream;
151         }
152         return printStreams.get(threadProject);
153     }
154 
155     public void associateThreadWithProjectSegment(ProjectSegment projectBuild) {
156         projectBuildThreadLocal.set(projectBuild);
157     }
158 
159     public void setThisModuleComplete(ProjectSegment projectBuild) {
160         completedBuilds.add(projectBuild);
161         PrintStream stream = printStreams.get(projectBuild);
162         synchronized (stream) {
163             stream.notifyAll();
164         }
165         disconnectThreadFromProject();
166     }
167 
168     private void disconnectThreadFromProject() {
169         projectBuildThreadLocal.remove();
170     }
171 
172     private class ThreadBoundPrintStream extends PrintStream {
173 
174         ThreadBoundPrintStream(PrintStream systemOutStream) {
175             super(systemOutStream);
176         }
177 
178         private PrintStream getOutputStreamForCurrentThread() {
179             return getThreadBoundPrintStream();
180         }
181 
182         @Override
183         public void println() {
184             final PrintStream currentStream = getOutputStreamForCurrentThread();
185             synchronized (currentStream) {
186                 currentStream.println();
187                 currentStream.notifyAll();
188             }
189         }
190 
191         @Override
192         public void print(char c) {
193             final PrintStream currentStream = getOutputStreamForCurrentThread();
194             synchronized (currentStream) {
195                 currentStream.print(c);
196                 currentStream.notifyAll();
197             }
198         }
199 
200         @Override
201         public void println(char x) {
202             final PrintStream currentStream = getOutputStreamForCurrentThread();
203             synchronized (currentStream) {
204                 currentStream.println(x);
205                 currentStream.notifyAll();
206             }
207         }
208 
209         @Override
210         public void print(double d) {
211             final PrintStream currentStream = getOutputStreamForCurrentThread();
212             synchronized (currentStream) {
213                 currentStream.print(d);
214                 currentStream.notifyAll();
215             }
216         }
217 
218         @Override
219         public void println(double x) {
220             final PrintStream currentStream = getOutputStreamForCurrentThread();
221             synchronized (currentStream) {
222                 currentStream.println(x);
223                 currentStream.notifyAll();
224             }
225         }
226 
227         @Override
228         public void print(float f) {
229             final PrintStream currentStream = getOutputStreamForCurrentThread();
230             synchronized (currentStream) {
231                 currentStream.print(f);
232                 currentStream.notifyAll();
233             }
234         }
235 
236         @Override
237         public void println(float x) {
238             final PrintStream currentStream = getOutputStreamForCurrentThread();
239             synchronized (currentStream) {
240                 currentStream.println(x);
241                 currentStream.notifyAll();
242             }
243         }
244 
245         @Override
246         public void print(int i) {
247             final PrintStream currentStream = getOutputStreamForCurrentThread();
248             synchronized (currentStream) {
249                 currentStream.print(i);
250                 currentStream.notifyAll();
251             }
252         }
253 
254         @Override
255         public void println(int x) {
256             final PrintStream currentStream = getOutputStreamForCurrentThread();
257             synchronized (currentStream) {
258                 currentStream.println(x);
259                 currentStream.notifyAll();
260             }
261         }
262 
263         @Override
264         public void print(long l) {
265             final PrintStream currentStream = getOutputStreamForCurrentThread();
266             synchronized (currentStream) {
267                 currentStream.print(l);
268                 currentStream.notifyAll();
269             }
270         }
271 
272         @Override
273         public void println(long x) {
274             final PrintStream currentStream = getOutputStreamForCurrentThread();
275             synchronized (currentStream) {
276                 currentStream.print(x);
277                 currentStream.notifyAll();
278             }
279         }
280 
281         @Override
282         public void print(boolean b) {
283             final PrintStream currentStream = getOutputStreamForCurrentThread();
284             synchronized (currentStream) {
285                 currentStream.print(b);
286                 currentStream.notifyAll();
287             }
288         }
289 
290         @Override
291         public void println(boolean x) {
292             final PrintStream currentStream = getOutputStreamForCurrentThread();
293             synchronized (currentStream) {
294                 currentStream.print(x);
295                 currentStream.notifyAll();
296             }
297         }
298 
299         @Override
300         public void print(char s[]) {
301             final PrintStream currentStream = getOutputStreamForCurrentThread();
302             synchronized (currentStream) {
303                 currentStream.print(s);
304                 currentStream.notifyAll();
305             }
306         }
307 
308         @Override
309         public void println(char x[]) {
310             final PrintStream currentStream = getOutputStreamForCurrentThread();
311             synchronized (currentStream) {
312                 currentStream.print(x);
313                 currentStream.notifyAll();
314             }
315         }
316 
317         @Override
318         public void print(Object obj) {
319             final PrintStream currentStream = getOutputStreamForCurrentThread();
320             synchronized (currentStream) {
321                 currentStream.print(obj);
322                 currentStream.notifyAll();
323             }
324         }
325 
326         @Override
327         public void println(Object x) {
328             final PrintStream currentStream = getOutputStreamForCurrentThread();
329             synchronized (currentStream) {
330                 currentStream.println(x);
331                 currentStream.notifyAll();
332             }
333         }
334 
335         @Override
336         public void print(String s) {
337             final PrintStream currentStream = getOutputStreamForCurrentThread();
338             synchronized (currentStream) {
339                 currentStream.print(s);
340                 currentStream.notifyAll();
341             }
342         }
343 
344         @Override
345         public void println(String x) {
346             final PrintStream currentStream = getOutputStreamForCurrentThread();
347             synchronized (currentStream) {
348                 currentStream.println(x);
349                 currentStream.notifyAll();
350             }
351         }
352 
353         @Override
354         public void write(byte b[], int off, int len) {
355             final PrintStream currentStream = getOutputStreamForCurrentThread();
356             synchronized (currentStream) {
357                 currentStream.write(b, off, len);
358                 currentStream.notifyAll();
359             }
360         }
361 
362         @Override
363         public void close() {
364             getOutputStreamForCurrentThread().close();
365         }
366 
367         @Override
368         public void flush() {
369             getOutputStreamForCurrentThread().flush();
370         }
371 
372         @Override
373         public void write(int b) {
374             final PrintStream currentStream = getOutputStreamForCurrentThread();
375             synchronized (currentStream) {
376                 currentStream.write(b);
377                 currentStream.notifyAll();
378             }
379         }
380 
381         @Override
382         public void write(byte b[]) throws IOException {
383             final PrintStream currentStream = getOutputStreamForCurrentThread();
384             synchronized (currentStream) {
385                 currentStream.write(b);
386                 currentStream.notifyAll();
387             }
388         }
389     }
390 }