1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.booter.spi;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.IOException;
24 import java.net.InetSocketAddress;
25 import java.net.MalformedURLException;
26 import java.net.SocketOption;
27 import java.net.URI;
28 import java.net.URISyntaxException;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.AsynchronousSocketChannel;
31 import java.nio.channels.ReadableByteChannel;
32 import java.util.StringTokenizer;
33 import java.util.concurrent.ExecutionException;
34
35 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
36 import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
37 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
38 import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
39
40 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
41 import static java.net.StandardSocketOptions.SO_REUSEADDR;
42 import static java.net.StandardSocketOptions.TCP_NODELAY;
43 import static java.nio.channels.AsynchronousChannelGroup.withFixedThreadPool;
44 import static java.nio.channels.AsynchronousSocketChannel.open;
45 import static java.nio.charset.StandardCharsets.US_ASCII;
46 import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
47 import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
48 import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
49 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
50
51
52
53
54
55
56
57
58 public class SurefireMasterProcessChannelProcessorFactory extends AbstractMasterProcessChannelProcessorFactory {
59 private static final int FLUSH_PERIOD_MILLIS = 100;
60 private volatile AsynchronousSocketChannel clientSocketChannel;
61
62 @Override
63 public boolean canUse(String channelConfig) {
64 return channelConfig.startsWith("tcp://");
65 }
66
67 @Override
68 public void connect(String channelConfig) throws IOException {
69 if (!canUse(channelConfig)) {
70 throw new MalformedURLException("Unknown channel string " + channelConfig);
71 }
72
73 try {
74 URI uri = new URI(channelConfig);
75 InetSocketAddress hostAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
76 clientSocketChannel = open(withFixedThreadPool(2, newDaemonThreadFactory()));
77 setTrueOptions(SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE);
78 clientSocketChannel.connect(hostAddress).get();
79 String sessionId = extractSessionId(uri);
80 if (sessionId != null) {
81 ByteBuffer buff = ByteBuffer.wrap(sessionId.getBytes(US_ASCII));
82 while (buff.hasRemaining()) {
83 clientSocketChannel.write(buff).get();
84 }
85 }
86 } catch (URISyntaxException | InterruptedException e) {
87 throw new IOException(e.getLocalizedMessage(), e);
88 } catch (ExecutionException e) {
89 throw new IOException(e.getLocalizedMessage(), e.getCause());
90 }
91 }
92
93 @Override
94 public MasterProcessChannelDecoder createDecoder(@Nonnull ForkNodeArguments forkingArguments) {
95 ReadableByteChannel bufferedChannel = newBufferedChannel(newInputStream(clientSocketChannel));
96 return new CommandChannelDecoder(bufferedChannel, forkingArguments);
97 }
98
99 @Override
100 public MasterProcessChannelEncoder createEncoder(@Nonnull ForkNodeArguments forkingArguments) {
101 WritableBufferedByteChannel channel = newBufferedChannel(newOutputStream(clientSocketChannel));
102 schedulePeriodicFlusher(FLUSH_PERIOD_MILLIS, channel);
103 return new EventChannelEncoder(channel);
104 }
105
106 @Override
107 public void close() throws IOException {
108 super.close();
109 if (clientSocketChannel != null && clientSocketChannel.isOpen()) {
110 clientSocketChannel.close();
111 }
112 }
113
114 @SafeVarargs
115 private final void setTrueOptions(SocketOption<Boolean>... options) throws IOException {
116 for (SocketOption<Boolean> option : options) {
117 if (clientSocketChannel.supportedOptions().contains(option)) {
118 clientSocketChannel.setOption(option, true);
119 }
120 }
121 }
122
123 private static String extractSessionId(URI uri) {
124 String query = uri.getQuery();
125 if (query == null) {
126 return null;
127 }
128 for (StringTokenizer tokenizer = new StringTokenizer(query, "&"); tokenizer.hasMoreTokens(); ) {
129 String token = tokenizer.nextToken();
130 int delimiter = token.indexOf('=');
131 if (delimiter != -1 && "sessionId".equals(token.substring(0, delimiter))) {
132 return token.substring(delimiter + 1);
133 }
134 }
135 return null;
136 }
137 }