1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.util.internal.BlockingQueue;
23 import org.apache.maven.surefire.util.internal.BlockingQueueFactory;
24 import org.codehaus.plexus.util.cli.StreamConsumer;
25
26
27
28
29
30
31 public class ThreadedStreamConsumer
32 implements StreamConsumer
33 {
34
35 private final BlockingQueue items = BlockingQueueFactory.createBlockingQueue();
36
37 private static final String poison = "Pioson";
38
39 private final Thread thread;
40
41 private final Pumper pumper;
42
43 static class Pumper
44 implements Runnable
45 {
46 private final BlockingQueue queue;
47
48 private final StreamConsumer target;
49
50 private volatile InterruptedException interruptedException;
51
52
53 Pumper( BlockingQueue queue, StreamConsumer target )
54 {
55 this.queue = queue;
56 this.target = target;
57 }
58
59 public void run()
60 {
61 try
62 {
63 String item = (String) queue.take();
64
65 while ( item != poison )
66 {
67 target.consumeLine( item );
68 item = (String) queue.take();
69 }
70 }
71 catch ( InterruptedException e )
72 {
73 this.interruptedException = e;
74 }
75 }
76
77 public InterruptedException getInterruptedException()
78 {
79 return interruptedException;
80 }
81 }
82
83 public ThreadedStreamConsumer( StreamConsumer target )
84 {
85 pumper = new Pumper( items, target );
86 thread = new Thread( pumper, "ThreadedStreamConsumer" );
87 thread.start();
88 }
89
90 public void consumeLine( String s )
91 {
92 items.add( s );
93 }
94
95
96 public void close()
97 {
98 try
99 {
100 items.add( poison );
101 thread.join();
102
103 if ( pumper.getInterruptedException() != null )
104 {
105 throw pumper.getInterruptedException();
106 }
107 }
108 catch ( InterruptedException e )
109 {
110 throw new RuntimeException( e );
111 }
112 }
113 }