1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.maven.plugin.surefire.extensions;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.nio.channels.ClosedChannelException;
27  import java.nio.channels.ReadableByteChannel;
28  
29  import org.apache.maven.surefire.api.event.Event;
30  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
31  import org.apache.maven.surefire.api.stream.AbstractStreamDecoder.Memento;
32  import org.apache.maven.surefire.extensions.CloseableDaemonThread;
33  import org.apache.maven.surefire.extensions.EventHandler;
34  import org.apache.maven.surefire.extensions.util.CountdownCloseable;
35  import org.apache.maven.surefire.stream.EventDecoder;
36  
37  
38  
39  
40  public class EventConsumerThread extends CloseableDaemonThread {
41      private final ReadableByteChannel channel;
42      private final EventHandler<Event> eventHandler;
43      private final CountdownCloseable countdownCloseable;
44      private final EventDecoder decoder;
45      private final ForkNodeArguments arguments;
46      private volatile boolean disabled;
47  
48      public EventConsumerThread(
49              @Nonnull String threadName,
50              @Nonnull ReadableByteChannel channel,
51              @Nonnull EventHandler<Event> eventHandler,
52              @Nonnull CountdownCloseable countdownCloseable,
53              @Nonnull ForkNodeArguments arguments) {
54          super(threadName);
55          decoder = new EventDecoder(channel, arguments);
56          this.channel = channel;
57          this.eventHandler = eventHandler;
58          this.countdownCloseable = countdownCloseable;
59          this.arguments = arguments;
60      }
61  
62      @Override
63      public void run() {
64          try (ReadableByteChannel stream = channel;
65                  CountdownCloseable c = countdownCloseable;
66                  EventDecoder eventDecoder = decoder) {
67              Memento memento = eventDecoder.new Memento();
68              do {
69                  Event event = eventDecoder.decode(memento);
70                  if (event != null && !disabled) {
71                      eventHandler.handleEvent(event);
72                  }
73              } while (true);
74          } catch (EOFException | ClosedChannelException e) {
75              
76          } catch (IOException e) {
77              if (e instanceof InterruptedIOException || e.getCause() instanceof InterruptedException) {
78                  Thread.currentThread().interrupt();
79              } else {
80                  arguments.dumpStreamException(e);
81              }
82          }
83      }
84  
85      @Override
86      public void disable() {
87          disabled = true;
88      }
89  
90      @Override
91      public void close() throws IOException {
92          channel.close();
93      }
94  }