1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.maven.surefire.booter.spi;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.IOException;
24  import java.net.InetSocketAddress;
25  import java.net.MalformedURLException;
26  import java.net.SocketOption;
27  import java.net.URI;
28  import java.net.URISyntaxException;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.AsynchronousSocketChannel;
31  import java.nio.channels.ReadableByteChannel;
32  import java.util.StringTokenizer;
33  import java.util.concurrent.ExecutionException;
34  
35  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
36  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
37  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
38  import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
39  
40  import static java.net.StandardSocketOptions.SO_KEEPALIVE;
41  import static java.net.StandardSocketOptions.SO_REUSEADDR;
42  import static java.net.StandardSocketOptions.TCP_NODELAY;
43  import static java.nio.channels.AsynchronousChannelGroup.withFixedThreadPool;
44  import static java.nio.channels.AsynchronousSocketChannel.open;
45  import static java.nio.charset.StandardCharsets.US_ASCII;
46  import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
47  import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
48  import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
49  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
50  
51  
52  
53  
54  
55  
56  
57  
58  public class SurefireMasterProcessChannelProcessorFactory extends AbstractMasterProcessChannelProcessorFactory {
59      private static final int FLUSH_PERIOD_MILLIS = 100;
60      private volatile AsynchronousSocketChannel clientSocketChannel;
61  
62      @Override
63      public boolean canUse(String channelConfig) {
64          return channelConfig.startsWith("tcp://");
65      }
66  
67      @Override
68      public void connect(String channelConfig) throws IOException {
69          if (!canUse(channelConfig)) {
70              throw new MalformedURLException("Unknown channel string " + channelConfig);
71          }
72  
73          try {
74              URI uri = new URI(channelConfig);
75              InetSocketAddress hostAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
76              clientSocketChannel = open(withFixedThreadPool(2, newDaemonThreadFactory()));
77              setTrueOptions(SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE);
78              clientSocketChannel.connect(hostAddress).get();
79              String sessionId = extractSessionId(uri);
80              if (sessionId != null) {
81                  ByteBuffer buff = ByteBuffer.wrap(sessionId.getBytes(US_ASCII));
82                  while (buff.hasRemaining()) {
83                      clientSocketChannel.write(buff).get();
84                  }
85              }
86          } catch (URISyntaxException | InterruptedException e) {
87              throw new IOException(e.getLocalizedMessage(), e);
88          } catch (ExecutionException e) {
89              throw new IOException(e.getLocalizedMessage(), e.getCause());
90          }
91      }
92  
93      @Override
94      public MasterProcessChannelDecoder createDecoder(@Nonnull ForkNodeArguments forkingArguments) {
95          ReadableByteChannel bufferedChannel = newBufferedChannel(newInputStream(clientSocketChannel));
96          return new CommandChannelDecoder(bufferedChannel, forkingArguments);
97      }
98  
99      @Override
100     public MasterProcessChannelEncoder createEncoder(@Nonnull ForkNodeArguments forkingArguments) {
101         WritableBufferedByteChannel channel = newBufferedChannel(newOutputStream(clientSocketChannel));
102         schedulePeriodicFlusher(FLUSH_PERIOD_MILLIS, channel);
103         return new EventChannelEncoder(channel);
104     }
105 
106     @Override
107     public void close() throws IOException {
108         super.close();
109         if (clientSocketChannel != null && clientSocketChannel.isOpen()) {
110             clientSocketChannel.close();
111         }
112     }
113 
114     @SafeVarargs
115     private final void setTrueOptions(SocketOption<Boolean>... options) throws IOException {
116         for (SocketOption<Boolean> option : options) {
117             if (clientSocketChannel.supportedOptions().contains(option)) {
118                 clientSocketChannel.setOption(option, true);
119             }
120         }
121     }
122 
123     private static String extractSessionId(URI uri) {
124         String query = uri.getQuery();
125         if (query == null) {
126             return null;
127         }
128         for (StringTokenizer tokenizer = new StringTokenizer(query, "&"); tokenizer.hasMoreTokens(); ) {
129             String token = tokenizer.nextToken();
130             int delimiter = token.indexOf('=');
131             if (delimiter != -1 && "sessionId".equals(token.substring(0, delimiter))) {
132                 return token.substring(delimiter + 1);
133             }
134         }
135         return null;
136     }
137 }