1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.extensions;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.ReadableByteChannel;
28
29 import org.apache.maven.surefire.api.event.Event;
30 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
31 import org.apache.maven.surefire.api.stream.AbstractStreamDecoder.Memento;
32 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
33 import org.apache.maven.surefire.extensions.EventHandler;
34 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
35 import org.apache.maven.surefire.stream.EventDecoder;
36
37 public class EventConsumerThread extends CloseableDaemonThread {
38 private final ReadableByteChannel channel;
39 private final EventHandler<Event> eventHandler;
40 private final CountdownCloseable countdownCloseable;
41 private final EventDecoder decoder;
42 private final ForkNodeArguments arguments;
43 private volatile boolean disabled;
44
45 public EventConsumerThread(
46 @Nonnull String threadName,
47 @Nonnull ReadableByteChannel channel,
48 @Nonnull EventHandler<Event> eventHandler,
49 @Nonnull CountdownCloseable countdownCloseable,
50 @Nonnull ForkNodeArguments arguments) {
51 super(threadName);
52 decoder = new EventDecoder(channel, arguments);
53 this.channel = channel;
54 this.eventHandler = eventHandler;
55 this.countdownCloseable = countdownCloseable;
56 this.arguments = arguments;
57 }
58
59 @Override
60 public void run() {
61 try (ReadableByteChannel stream = channel;
62 CountdownCloseable c = countdownCloseable;
63 EventDecoder eventDecoder = decoder) {
64 Memento memento = eventDecoder.new Memento();
65 do {
66 Event event = eventDecoder.decode(memento);
67 if (event != null && !disabled) {
68 eventHandler.handleEvent(event);
69 }
70 } while (true);
71 } catch (EOFException | ClosedChannelException e) {
72
73 } catch (IOException e) {
74 if (e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException) {
75 Thread.currentThread().interrupt();
76 } else {
77 arguments.dumpStreamException(e);
78 }
79 }
80 }
81
82 @Override
83 public void disable() {
84 disabled = true;
85 }
86
87 @Override
88 public void close() throws IOException {
89 channel.close();
90 }
91 }