View Javadoc
1   package org.apache.maven.surefire.booter.spi;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
23  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
24  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
25  import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
26  
27  import javax.annotation.Nonnull;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.net.MalformedURLException;
31  import java.net.SocketOption;
32  import java.net.URI;
33  import java.net.URISyntaxException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.AsynchronousSocketChannel;
36  import java.nio.channels.ReadableByteChannel;
37  import java.util.StringTokenizer;
38  import java.util.concurrent.ExecutionException;
39  
40  import static java.net.StandardSocketOptions.SO_KEEPALIVE;
41  import static java.net.StandardSocketOptions.SO_REUSEADDR;
42  import static java.net.StandardSocketOptions.TCP_NODELAY;
43  import static java.nio.channels.AsynchronousChannelGroup.withFixedThreadPool;
44  import static java.nio.channels.AsynchronousSocketChannel.open;
45  import static java.nio.charset.StandardCharsets.US_ASCII;
46  import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
47  import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
48  import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
49  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
50  
51  /**
52   * Producer of TCP/IP encoder and decoder.
53   * <br>
54   *
55   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
56   * @since 3.0.0-M5
57   */
58  public class SurefireMasterProcessChannelProcessorFactory
59      extends AbstractMasterProcessChannelProcessorFactory
60  {
61      private static final int FLUSH_PERIOD_MILLIS = 100;
62      private volatile AsynchronousSocketChannel clientSocketChannel;
63  
64      @Override
65      public boolean canUse( String channelConfig )
66      {
67          return channelConfig.startsWith( "tcp://" );
68      }
69  
70      @Override
71      public void connect( String channelConfig ) throws IOException
72      {
73          if ( !canUse( channelConfig ) )
74          {
75              throw new MalformedURLException( "Unknown channel string " + channelConfig );
76          }
77  
78          try
79          {
80              URI uri = new URI( channelConfig );
81              InetSocketAddress hostAddress = new InetSocketAddress( uri.getHost(), uri.getPort() );
82              clientSocketChannel = open( withFixedThreadPool( 2, newDaemonThreadFactory() ) );
83              setTrueOptions( SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE );
84              clientSocketChannel.connect( hostAddress ).get();
85              String sessionId = extractSessionId( uri );
86              if ( sessionId != null )
87              {
88                  ByteBuffer buff = ByteBuffer.wrap( sessionId.getBytes( US_ASCII ) );
89                  while ( buff.hasRemaining() )
90                  {
91                      clientSocketChannel.write( buff ).get();
92                  }
93              }
94          }
95          catch ( URISyntaxException | InterruptedException e )
96          {
97              throw new IOException( e.getLocalizedMessage(), e );
98          }
99          catch ( ExecutionException e )
100         {
101             throw new IOException( e.getLocalizedMessage(), e.getCause() );
102         }
103     }
104 
105     @Override
106     public MasterProcessChannelDecoder createDecoder( @Nonnull ForkNodeArguments forkingArguments )
107     {
108         ReadableByteChannel bufferedChannel = newBufferedChannel( newInputStream( clientSocketChannel ) );
109         return new CommandChannelDecoder( bufferedChannel, forkingArguments );
110     }
111 
112     @Override
113     public MasterProcessChannelEncoder createEncoder( @Nonnull ForkNodeArguments forkingArguments )
114     {
115         WritableBufferedByteChannel channel = newBufferedChannel( newOutputStream( clientSocketChannel ) );
116         schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel );
117         return new EventChannelEncoder( channel );
118     }
119 
120     @Override
121     public void close() throws IOException
122     {
123         super.close();
124         if ( clientSocketChannel != null && clientSocketChannel.isOpen() )
125         {
126             clientSocketChannel.close();
127         }
128     }
129 
130     @SafeVarargs
131     private final void setTrueOptions( SocketOption<Boolean>... options )
132         throws IOException
133     {
134         for ( SocketOption<Boolean> option : options )
135         {
136             if ( clientSocketChannel.supportedOptions().contains( option ) )
137             {
138                 clientSocketChannel.setOption( option, true );
139             }
140         }
141     }
142 
143     private static String extractSessionId( URI uri )
144     {
145         String query = uri.getQuery();
146         if ( query == null )
147         {
148             return null;
149         }
150         for ( StringTokenizer tokenizer = new StringTokenizer( query, "&" ); tokenizer.hasMoreTokens(); )
151         {
152             String token = tokenizer.nextToken();
153             int delimiter = token.indexOf( '=' );
154             if ( delimiter != -1 && "sessionId".equals( token.substring( 0, delimiter ) ) )
155             {
156                 return token.substring( delimiter + 1 );
157             }
158         }
159         return null;
160     }
161 }