1 package org.apache.maven.surefire.booter.spi;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
23 import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
24 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
25 import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
26
27 import javax.annotation.Nonnull;
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.MalformedURLException;
31 import java.net.SocketOption;
32 import java.net.URI;
33 import java.net.URISyntaxException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.AsynchronousSocketChannel;
36 import java.nio.channels.ReadableByteChannel;
37 import java.util.StringTokenizer;
38 import java.util.concurrent.ExecutionException;
39
40 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
41 import static java.net.StandardSocketOptions.SO_REUSEADDR;
42 import static java.net.StandardSocketOptions.TCP_NODELAY;
43 import static java.nio.channels.AsynchronousChannelGroup.withFixedThreadPool;
44 import static java.nio.channels.AsynchronousSocketChannel.open;
45 import static java.nio.charset.StandardCharsets.US_ASCII;
46 import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
47 import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
48 import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
49 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
50
51
52
53
54
55
56
57
58 public class SurefireMasterProcessChannelProcessorFactory
59 extends AbstractMasterProcessChannelProcessorFactory
60 {
61 private static final int FLUSH_PERIOD_MILLIS = 100;
62 private volatile AsynchronousSocketChannel clientSocketChannel;
63
64 @Override
65 public boolean canUse( String channelConfig )
66 {
67 return channelConfig.startsWith( "tcp://" );
68 }
69
70 @Override
71 public void connect( String channelConfig ) throws IOException
72 {
73 if ( !canUse( channelConfig ) )
74 {
75 throw new MalformedURLException( "Unknown channel string " + channelConfig );
76 }
77
78 try
79 {
80 URI uri = new URI( channelConfig );
81 InetSocketAddress hostAddress = new InetSocketAddress( uri.getHost(), uri.getPort() );
82 clientSocketChannel = open( withFixedThreadPool( 2, newDaemonThreadFactory() ) );
83 setTrueOptions( SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE );
84 clientSocketChannel.connect( hostAddress ).get();
85 String sessionId = extractSessionId( uri );
86 if ( sessionId != null )
87 {
88 ByteBuffer buff = ByteBuffer.wrap( sessionId.getBytes( US_ASCII ) );
89 while ( buff.hasRemaining() )
90 {
91 clientSocketChannel.write( buff ).get();
92 }
93 }
94 }
95 catch ( URISyntaxException | InterruptedException e )
96 {
97 throw new IOException( e.getLocalizedMessage(), e );
98 }
99 catch ( ExecutionException e )
100 {
101 throw new IOException( e.getLocalizedMessage(), e.getCause() );
102 }
103 }
104
105 @Override
106 public MasterProcessChannelDecoder createDecoder( @Nonnull ForkNodeArguments forkingArguments )
107 {
108 ReadableByteChannel bufferedChannel = newBufferedChannel( newInputStream( clientSocketChannel ) );
109 return new CommandChannelDecoder( bufferedChannel, forkingArguments );
110 }
111
112 @Override
113 public MasterProcessChannelEncoder createEncoder( @Nonnull ForkNodeArguments forkingArguments )
114 {
115 WritableBufferedByteChannel channel = newBufferedChannel( newOutputStream( clientSocketChannel ) );
116 schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel );
117 return new EventChannelEncoder( channel );
118 }
119
120 @Override
121 public void close() throws IOException
122 {
123 super.close();
124 if ( clientSocketChannel != null && clientSocketChannel.isOpen() )
125 {
126 clientSocketChannel.close();
127 }
128 }
129
130 @SafeVarargs
131 private final void setTrueOptions( SocketOption<Boolean>... options )
132 throws IOException
133 {
134 for ( SocketOption<Boolean> option : options )
135 {
136 if ( clientSocketChannel.supportedOptions().contains( option ) )
137 {
138 clientSocketChannel.setOption( option, true );
139 }
140 }
141 }
142
143 private static String extractSessionId( URI uri )
144 {
145 String query = uri.getQuery();
146 if ( query == null )
147 {
148 return null;
149 }
150 for ( StringTokenizer tokenizer = new StringTokenizer( query, "&" ); tokenizer.hasMoreTokens(); )
151 {
152 String token = tokenizer.nextToken();
153 int delimiter = token.indexOf( '=' );
154 if ( delimiter != -1 && "sessionId".equals( token.substring( 0, delimiter ) ) )
155 {
156 return token.substring( delimiter + 1 );
157 }
158 }
159 return null;
160 }
161 }