1 package org.apache.maven.plugin.surefire.extensions;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.api.event.Event;
23 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
24 import org.apache.maven.surefire.api.stream.AbstractStreamDecoder.Memento;
25 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
26 import org.apache.maven.surefire.extensions.EventHandler;
27 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
28 import org.apache.maven.surefire.stream.EventDecoder;
29
30 import javax.annotation.Nonnull;
31 import java.io.EOFException;
32 import java.io.IOException;
33 import java.io.InterruptedIOException;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.ReadableByteChannel;
36
37
38
39
40 public class EventConsumerThread extends CloseableDaemonThread
41 {
42 private final ReadableByteChannel channel;
43 private final EventHandler<Event> eventHandler;
44 private final CountdownCloseable countdownCloseable;
45 private final EventDecoder decoder;
46 private final ForkNodeArguments arguments;
47 private volatile boolean disabled;
48
49 public EventConsumerThread( @Nonnull String threadName,
50 @Nonnull ReadableByteChannel channel,
51 @Nonnull EventHandler<Event> eventHandler,
52 @Nonnull CountdownCloseable countdownCloseable,
53 @Nonnull ForkNodeArguments arguments )
54 {
55 super( threadName );
56 decoder = new EventDecoder( channel, arguments );
57 this.channel = channel;
58 this.eventHandler = eventHandler;
59 this.countdownCloseable = countdownCloseable;
60 this.arguments = arguments;
61 }
62
63 @Override
64 public void run()
65 {
66 try ( ReadableByteChannel stream = channel;
67 CountdownCloseable c = countdownCloseable;
68 EventDecoder eventDecoder = decoder )
69 {
70 Memento memento = eventDecoder.new Memento();
71 do
72 {
73 Event event = eventDecoder.decode( memento );
74 if ( event != null && !disabled )
75 {
76 eventHandler.handleEvent( event );
77 }
78 }
79 while ( true );
80 }
81 catch ( EOFException | ClosedChannelException e )
82 {
83
84 }
85 catch ( IOException e )
86 {
87 if ( e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException )
88 {
89 Thread.currentThread().interrupt();
90 }
91 else
92 {
93 arguments.dumpStreamException( e );
94 }
95 }
96 }
97
98 @Override
99 public void disable()
100 {
101 disabled = true;
102 }
103
104 @Override
105 public void close() throws IOException
106 {
107 channel.close();
108 }
109 }