View Javadoc
1   package org.apache.maven.surefire.api.util.internal;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import javax.annotation.Nonnegative;
23  import javax.annotation.Nonnull;
24  import java.io.BufferedInputStream;
25  import java.io.BufferedOutputStream;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.AsynchronousByteChannel;
31  import java.nio.channels.ClosedChannelException;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.concurrent.ExecutionException;
35  
36  import static java.util.Objects.requireNonNull;
37  
38  /**
39   * Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream} to the Java {@link
40   * java.nio.channels.Channel}.
41   * <br>
42   * We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility closes the stream as
43   * soon as the particular Thread is interrupted. If the frameworks (Zookeeper, Netty) interrupts the thread, the
44   * communication channels become closed and the JVM hangs. Therefore we developed internal utility which is safe for the
45   * Surefire.
46   *
47   * @since 3.0.0-M5
48   */
49  public final class Channels
50  {
51      private static final int BUFFER_SIZE = 64 * 1024;
52  
53      private Channels()
54      {
55          throw new IllegalStateException( "no instantiable constructor" );
56      }
57  
58      public static WritableByteChannel newChannel( @Nonnull OutputStream out )
59      {
60          return newChannel( out, 0 );
61      }
62  
63      public static WritableBufferedByteChannel newBufferedChannel( @Nonnull OutputStream out )
64      {
65          return newChannel( out, BUFFER_SIZE );
66      }
67  
68      public static ReadableByteChannel newChannel( @Nonnull final InputStream is )
69      {
70          return newChannel( is, 0 );
71      }
72  
73      public static ReadableByteChannel newBufferedChannel( @Nonnull final InputStream is )
74      {
75          return newChannel( is, BUFFER_SIZE );
76      }
77  
78      public static OutputStream newOutputStream( final AsynchronousByteChannel channel )
79      {
80          return new OutputStream()
81          {
82              @Override
83              public synchronized void write( byte[] b, int off, int len ) throws IOException
84              {
85                  if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
86                  {
87                      throw new IndexOutOfBoundsException(
88                          "b.length = " + b.length + ", off = " + off + ", len = " + len );
89                  }
90                  else if ( len > 0 )
91                  {
92                      ByteBuffer bb = ByteBuffer.wrap( b, off, len );
93                      while ( bb.hasRemaining() )
94                      {
95                          try
96                          {
97                              channel.write( bb ).get();
98                          }
99                          catch ( ExecutionException e )
100                         {
101                             Throwable t = e.getCause();
102                             throw t instanceof IOException
103                                 ? (IOException) t
104                                 : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
105                         }
106                         catch ( Exception e )
107                         {
108                             throw new IOException( e.getLocalizedMessage(), e );
109                         }
110                     }
111                 }
112             }
113 
114             @Override
115             public void write( int b ) throws IOException
116             {
117                 write( new byte[] {(byte) b} );
118             }
119 
120             @Override
121             public synchronized void close() throws IOException
122             {
123                 if ( channel.isOpen() )
124                 {
125                     try
126                     {
127                         channel.close();
128                     }
129                     catch ( ClosedChannelException e )
130                     {
131                         // closed channel anyway
132                     }
133                 }
134             }
135         };
136     }
137 
138     public static InputStream newInputStream( final AsynchronousByteChannel channel )
139     {
140         return new InputStream()
141         {
142             @Override
143             public synchronized int read( byte[] b, int off, int len ) throws IOException
144             {
145                 if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
146                 {
147                     throw new IndexOutOfBoundsException(
148                         "b.length = " + b.length + ", off = " + off + ", len = " + len );
149                 }
150                 else if ( len == 0 )
151                 {
152                     return 0;
153                 }
154                 ByteBuffer bb = ByteBuffer.wrap( b, off, len );
155                 try
156                 {
157                     return channel.read( bb ).get();
158                 }
159                 catch ( ExecutionException e )
160                 {
161                     Throwable t = e.getCause();
162                     throw t instanceof IOException
163                         ? (IOException) t
164                         : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
165                 }
166                 catch ( Exception e )
167                 {
168                     throw new IOException( e.getLocalizedMessage(), e );
169                 }
170             }
171 
172             @Override
173             public int read() throws IOException
174             {
175                 int count;
176                 byte[] b = new byte[1];
177                 do
178                 {
179                     count = read( b, 0, 1 );
180                 }
181                 while ( count == 0 );
182 
183                 return count == -1 ? -1 : b[0];
184             }
185 
186             @Override
187             public synchronized void close() throws IOException
188             {
189                 if ( channel.isOpen() )
190                 {
191                     try
192                     {
193                         channel.close();
194                     }
195                     catch ( ClosedChannelException e )
196                     {
197                         // closed channel anyway
198                     }
199                 }
200             }
201         };
202     }
203 
204     private static ReadableByteChannel newChannel( @Nonnull InputStream is, @Nonnegative int bufferSize )
205     {
206         requireNonNull( is, "the stream should not be null" );
207         final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream( is, bufferSize );
208 
209         return new AbstractNoninterruptibleReadableChannel()
210         {
211             @Override
212             protected int readImpl( ByteBuffer src ) throws IOException
213             {
214                 int count = bis.read( src.array(), src.arrayOffset() + src.position(), src.remaining() );
215                 if ( count > 0 )
216                 {
217                     src.position( count + src.position() );
218                 }
219                 return count;
220             }
221 
222             @Override
223             protected void closeImpl() throws IOException
224             {
225                 bis.close();
226             }
227         };
228     }
229 
230     private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out, @Nonnegative int bufferSize )
231     {
232         requireNonNull( out, "the stream should not be null" );
233         final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize );
234 
235         return new AbstractNoninterruptibleWritableChannel()
236         {
237             @Override
238             protected void writeImpl( ByteBuffer src ) throws IOException
239             {
240                 bos.write( src.array(), src.arrayOffset() + src.position(), src.remaining() );
241             }
242 
243             @Override
244             protected void closeImpl() throws IOException
245             {
246                 bos.close();
247             }
248 
249             @Override
250             protected void flushImpl() throws IOException
251             {
252                 bos.flush();
253             }
254         };
255     }
256 }