1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.extensions;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.ReadableByteChannel;
28
29 import org.apache.maven.surefire.api.event.Event;
30 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
31 import org.apache.maven.surefire.api.stream.AbstractStreamDecoder.Memento;
32 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
33 import org.apache.maven.surefire.extensions.EventHandler;
34 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
35 import org.apache.maven.surefire.stream.EventDecoder;
36
37
38
39
40 public class EventConsumerThread extends CloseableDaemonThread {
41 private final ReadableByteChannel channel;
42 private final EventHandler<Event> eventHandler;
43 private final CountdownCloseable countdownCloseable;
44 private final EventDecoder decoder;
45 private final ForkNodeArguments arguments;
46 private volatile boolean disabled;
47
48 public EventConsumerThread(
49 @Nonnull String threadName,
50 @Nonnull ReadableByteChannel channel,
51 @Nonnull EventHandler<Event> eventHandler,
52 @Nonnull CountdownCloseable countdownCloseable,
53 @Nonnull ForkNodeArguments arguments) {
54 super(threadName);
55 decoder = new EventDecoder(channel, arguments);
56 this.channel = channel;
57 this.eventHandler = eventHandler;
58 this.countdownCloseable = countdownCloseable;
59 this.arguments = arguments;
60 }
61
62 @Override
63 public void run() {
64 try (ReadableByteChannel stream = channel;
65 CountdownCloseable c = countdownCloseable;
66 EventDecoder eventDecoder = decoder) {
67 Memento memento = eventDecoder.new Memento();
68 do {
69 Event event = eventDecoder.decode(memento);
70 if (event != null && !disabled) {
71 eventHandler.handleEvent(event);
72 }
73 } while (true);
74 } catch (EOFException | ClosedChannelException e) {
75
76 } catch (IOException e) {
77 if (e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException) {
78 Thread.currentThread().interrupt();
79 } else {
80 arguments.dumpStreamException(e);
81 }
82 }
83 }
84
85 @Override
86 public void disable() {
87 disabled = true;
88 }
89
90 @Override
91 public void close() throws IOException {
92 channel.close();
93 }
94 }