View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.booter.spi;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.IOException;
24  import java.net.InetSocketAddress;
25  import java.net.MalformedURLException;
26  import java.net.SocketOption;
27  import java.net.URI;
28  import java.net.URISyntaxException;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.AsynchronousSocketChannel;
31  import java.nio.channels.ReadableByteChannel;
32  import java.util.StringTokenizer;
33  import java.util.concurrent.ExecutionException;
34  
35  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
36  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
37  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
38  import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
39  
40  import static java.net.StandardSocketOptions.SO_KEEPALIVE;
41  import static java.net.StandardSocketOptions.SO_REUSEADDR;
42  import static java.net.StandardSocketOptions.TCP_NODELAY;
43  import static java.nio.channels.AsynchronousChannelGroup.withFixedThreadPool;
44  import static java.nio.channels.AsynchronousSocketChannel.open;
45  import static java.nio.charset.StandardCharsets.US_ASCII;
46  import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
47  import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
48  import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
49  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
50  
51  /**
52   * Producer of TCP/IP encoder and decoder.
53   * <br>
54   *
55   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
56   * @since 3.0.0-M5
57   */
58  public class SurefireMasterProcessChannelProcessorFactory extends AbstractMasterProcessChannelProcessorFactory {
59      private static final int FLUSH_PERIOD_MILLIS = 100;
60      private volatile AsynchronousSocketChannel clientSocketChannel;
61  
62      @Override
63      public boolean canUse(String channelConfig) {
64          return channelConfig.startsWith("tcp://");
65      }
66  
67      @Override
68      public void connect(String channelConfig) throws IOException {
69          if (!canUse(channelConfig)) {
70              throw new MalformedURLException("Unknown channel string " + channelConfig);
71          }
72  
73          try {
74              URI uri = new URI(channelConfig);
75              InetSocketAddress hostAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
76              clientSocketChannel = open(withFixedThreadPool(2, newDaemonThreadFactory()));
77              setTrueOptions(SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE);
78              clientSocketChannel.connect(hostAddress).get();
79              String sessionId = extractSessionId(uri);
80              if (sessionId != null) {
81                  ByteBuffer buff = ByteBuffer.wrap(sessionId.getBytes(US_ASCII));
82                  while (buff.hasRemaining()) {
83                      clientSocketChannel.write(buff).get();
84                  }
85              }
86          } catch (URISyntaxException | InterruptedException e) {
87              throw new IOException(e.getLocalizedMessage(), e);
88          } catch (ExecutionException e) {
89              throw new IOException(e.getLocalizedMessage(), e.getCause());
90          }
91      }
92  
93      @Override
94      public MasterProcessChannelDecoder createDecoder(@Nonnull ForkNodeArguments forkingArguments) {
95          ReadableByteChannel bufferedChannel = newBufferedChannel(newInputStream(clientSocketChannel));
96          return new CommandChannelDecoder(bufferedChannel, forkingArguments);
97      }
98  
99      @Override
100     public MasterProcessChannelEncoder createEncoder(@Nonnull ForkNodeArguments forkingArguments) {
101         WritableBufferedByteChannel channel = newBufferedChannel(newOutputStream(clientSocketChannel));
102         schedulePeriodicFlusher(FLUSH_PERIOD_MILLIS, channel);
103         return new EventChannelEncoder(channel);
104     }
105 
106     @Override
107     public void close() throws IOException {
108         super.close();
109         if (clientSocketChannel != null && clientSocketChannel.isOpen()) {
110             clientSocketChannel.close();
111         }
112     }
113 
114     @SafeVarargs
115     private final void setTrueOptions(SocketOption<Boolean>... options) throws IOException {
116         for (SocketOption<Boolean> option : options) {
117             if (clientSocketChannel.supportedOptions().contains(option)) {
118                 clientSocketChannel.setOption(option, true);
119             }
120         }
121     }
122 
123     private static String extractSessionId(URI uri) {
124         String query = uri.getQuery();
125         if (query == null) {
126             return null;
127         }
128         for (StringTokenizer tokenizer = new StringTokenizer(query, "&"); tokenizer.hasMoreTokens(); ) {
129             String token = tokenizer.nextToken();
130             int delimiter = token.indexOf('=');
131             if (delimiter != -1 && "sessionId".equals(token.substring(0, delimiter))) {
132                 return token.substring(delimiter + 1);
133             }
134         }
135         return null;
136     }
137 }