View Javadoc
1   package org.apache.maven.plugin.surefire.extensions;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.booterclient.output.NativeStdOutStreamConsumer;
23  import org.apache.maven.surefire.api.event.Event;
24  import org.apache.maven.surefire.extensions.CloseableDaemonThread;
25  import org.apache.maven.surefire.extensions.CommandReader;
26  import org.apache.maven.surefire.extensions.EventHandler;
27  import org.apache.maven.surefire.extensions.ForkChannel;
28  import org.apache.maven.surefire.extensions.ForkNodeArguments;
29  import org.apache.maven.surefire.extensions.util.CountdownCloseable;
30  import org.apache.maven.surefire.extensions.util.LineConsumerThread;
31  
32  import javax.annotation.Nonnull;
33  import java.io.Closeable;
34  import java.io.IOException;
35  import java.net.Inet4Address;
36  import java.net.InetAddress;
37  import java.net.InetSocketAddress;
38  import java.net.SocketOption;
39  import java.nio.channels.AsynchronousServerSocketChannel;
40  import java.nio.channels.AsynchronousSocketChannel;
41  import java.nio.channels.ReadableByteChannel;
42  import java.nio.channels.WritableByteChannel;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Executors;
46  
47  import static java.net.StandardSocketOptions.SO_KEEPALIVE;
48  import static java.net.StandardSocketOptions.SO_REUSEADDR;
49  import static java.net.StandardSocketOptions.TCP_NODELAY;
50  import static java.nio.channels.AsynchronousChannelGroup.withThreadPool;
51  import static java.nio.channels.AsynchronousServerSocketChannel.open;
52  import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
53  import static org.apache.maven.surefire.api.util.internal.Channels.newChannel;
54  import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
55  import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
56  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
57  
58  /**
59   * The TCP/IP server accepting only one client connection. The forked JVM connects to the server using the
60   * {@link #getForkNodeConnectionString() connection string}.
61   * The main purpose of this class is to {@link #connectToClient() conect with tthe client}, bind the
62   * {@link #bindCommandReader(CommandReader, WritableByteChannel) command reader} to the internal socket's
63   * {@link java.io.InputStream}, and bind the
64   * {@link #bindEventHandler(EventHandler, CountdownCloseable, ReadableByteChannel) event handler} writing the event
65   * objects to the {@link EventHandler event handler}.
66   * <br>
67   * The objects {@link WritableByteChannel} and {@link ReadableByteChannel} are forked process streams
68   * (standard input and output). Both are ignored in this implementation but they are used in {@link LegacyForkChannel}.
69   * <br>
70   * The channel is closed after the forked JVM has finished normally or the shutdown hook is executed in the plugin.
71   */
72  final class SurefireForkChannel extends ForkChannel
73  {
74      private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool( newDaemonThreadFactory() );
75  
76      private final AsynchronousServerSocketChannel server;
77      private final String localHost;
78      private final int localPort;
79      private volatile AsynchronousSocketChannel worker;
80      private volatile LineConsumerThread out;
81  
82      SurefireForkChannel( @Nonnull ForkNodeArguments arguments ) throws IOException
83      {
84          super( arguments );
85          server = open( withThreadPool( THREAD_POOL ) );
86          setTrueOptions( SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE );
87          InetAddress ip = Inet4Address.getLocalHost();
88          server.bind( new InetSocketAddress( ip, 0 ), 1 );
89          InetSocketAddress localAddress = (InetSocketAddress) server.getLocalAddress();
90          localHost = localAddress.getHostString();
91          localPort = localAddress.getPort();
92      }
93  
94      @Override
95      public void connectToClient() throws IOException
96      {
97          if ( worker != null )
98          {
99              throw new IllegalStateException( "already accepted TCP client connection" );
100         }
101 
102         try
103         {
104             worker = server.accept().get();
105         }
106         catch ( InterruptedException e )
107         {
108             throw new IOException( e.getLocalizedMessage(), e );
109         }
110         catch ( ExecutionException e )
111         {
112             throw new IOException( e.getLocalizedMessage(), e.getCause() );
113         }
114     }
115 
116     @SafeVarargs
117     private final void setTrueOptions( SocketOption<Boolean>... options )
118         throws IOException
119     {
120         for ( SocketOption<Boolean> option : options )
121         {
122             if ( server.supportedOptions().contains( option ) )
123             {
124                 server.setOption( option, true );
125             }
126         }
127     }
128 
129     @Override
130     public String getForkNodeConnectionString()
131     {
132         return "tcp://" + localHost + ":" + localPort;
133     }
134 
135     @Override
136     public int getCountdownCloseablePermits()
137     {
138         return 3;
139     }
140 
141     @Override
142     public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
143                                                     WritableByteChannel stdIn )
144     {
145         // dont use newBufferedChannel here - may cause the command is not sent and the JVM hangs
146         // only newChannel flushes the message
147         // newBufferedChannel does not flush
148         WritableByteChannel channel = newChannel( newOutputStream( worker ) );
149         return new StreamFeeder( "commands-fork-" + getArguments().getForkChannelId(), channel, commands,
150             getArguments().getConsoleLogger() );
151     }
152 
153     @Override
154     public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
155                                                    @Nonnull CountdownCloseable countdownCloseable,
156                                                    ReadableByteChannel stdOut )
157     {
158         out = new LineConsumerThread( "fork-" + getArguments().getForkChannelId() + "-out-thread", stdOut,
159             new NativeStdOutStreamConsumer( getArguments().getConsoleLogger() ), countdownCloseable );
160         out.start();
161 
162         ReadableByteChannel channel = newBufferedChannel( newInputStream( worker ) );
163         return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", channel,
164             eventHandler, countdownCloseable, getArguments() );
165     }
166 
167     @Override
168     public void close() throws IOException
169     {
170         //noinspection unused,EmptyTryBlock,EmptyTryBlock
171         try ( Closeable c1 = worker; Closeable c2 = server; Closeable c3 = out )
172         {
173             // only close all channels
174         }
175     }
176 }