View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.util.internal;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.BufferedInputStream;
25  import java.io.BufferedOutputStream;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.Buffer;
30  import java.nio.ByteBuffer;
31  import java.nio.channels.AsynchronousByteChannel;
32  import java.nio.channels.ClosedChannelException;
33  import java.nio.channels.ReadableByteChannel;
34  import java.nio.channels.WritableByteChannel;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import static java.lang.Math.max;
39  import static java.util.Objects.requireNonNull;
40  
41  /**
42   * Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream} to the Java {@link
43   * java.nio.channels.Channel}.
44   * <br>
45   * We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility closes the stream as
46   * soon as the particular Thread is interrupted. If the frameworks (Zookeeper, Netty) interrupts the thread, the
47   * communication channels become closed and the JVM hangs. Therefore we developed internal utility which is safe for the
48   * Surefire.
49   *
50   * @since 3.0.0-M5
51   */
52  public final class Channels {
53      private static final int BUFFER_SIZE = 64 * 1024;
54  
55      private Channels() {
56          throw new IllegalStateException("no instantiable constructor");
57      }
58  
59      public static WritableByteChannel newChannel(@Nonnull OutputStream out) {
60          return newChannel(out, 0);
61      }
62  
63      public static WritableBufferedByteChannel newBufferedChannel(@Nonnull OutputStream out) {
64          return newChannel(out, BUFFER_SIZE);
65      }
66  
67      public static ReadableByteChannel newChannel(@Nonnull final InputStream is) {
68          return newChannel(is, 0);
69      }
70  
71      public static ReadableByteChannel newBufferedChannel(@Nonnull final InputStream is) {
72          return newChannel(is, BUFFER_SIZE);
73      }
74  
75      public static OutputStream newOutputStream(final AsynchronousByteChannel channel) {
76          return new OutputStream() {
77              @Override
78              public synchronized void write(byte[] b, int off, int len) throws IOException {
79                  if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
80                      throw new IndexOutOfBoundsException("b.length = " + b.length + ", off = " + off + ", len = " + len);
81                  } else if (len > 0) {
82                      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
83                      while (bb.hasRemaining()) {
84                          try {
85                              channel.write(bb).get();
86                          } catch (ExecutionException e) {
87                              Throwable t = e.getCause();
88                              throw t instanceof IOException
89                                      ? (IOException) t
90                                      : new IOException((t == null ? e : t).getLocalizedMessage(), t);
91                          } catch (Exception e) {
92                              throw new IOException(e.getLocalizedMessage(), e);
93                          }
94                      }
95                  }
96              }
97  
98              @Override
99              public void write(int b) throws IOException {
100                 write(new byte[] {(byte) b});
101             }
102 
103             @Override
104             public synchronized void close() throws IOException {
105                 if (channel.isOpen()) {
106                     try {
107                         channel.close();
108                     } catch (ClosedChannelException e) {
109                         // closed channel anyway
110                     }
111                 }
112             }
113         };
114     }
115 
116     public static InputStream newInputStream(final AsynchronousByteChannel channel) {
117         return new InputStream() {
118             @Override
119             public synchronized int read(byte[] b, int off, int len) throws IOException {
120                 if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
121                     throw new IndexOutOfBoundsException("b.length = " + b.length + ", off = " + off + ", len = " + len);
122                 } else if (len == 0) {
123                     return 0;
124                 }
125                 ByteBuffer bb = ByteBuffer.wrap(b, off, len);
126                 try {
127                     return channel.read(bb).get();
128                 } catch (ExecutionException e) {
129                     Throwable t = e.getCause();
130                     throw t instanceof IOException
131                             ? (IOException) t
132                             : new IOException((t == null ? e : t).getLocalizedMessage(), t);
133                 } catch (Exception e) {
134                     throw new IOException(e.getLocalizedMessage(), e);
135                 }
136             }
137 
138             @Override
139             public int read() throws IOException {
140                 int count;
141                 byte[] b = new byte[1];
142                 do {
143                     count = read(b, 0, 1);
144                 } while (count == 0);
145 
146                 return count == -1 ? -1 : b[0];
147             }
148 
149             @Override
150             public synchronized void close() throws IOException {
151                 if (channel.isOpen()) {
152                     try {
153                         channel.close();
154                     } catch (ClosedChannelException e) {
155                         // closed channel anyway
156                     }
157                 }
158             }
159         };
160     }
161 
162     private static ReadableByteChannel newChannel(@Nonnull InputStream is, @Nonnegative int bufferSize) {
163         requireNonNull(is, "the stream should not be null");
164         final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream(is, bufferSize);
165 
166         return new AbstractNoninterruptibleReadableChannel() {
167             @Override
168             protected int readImpl(ByteBuffer src) throws IOException {
169                 int count = bis.read(src.array(), src.arrayOffset() + ((Buffer) src).position(), src.remaining());
170                 if (count > 0) {
171                     ((Buffer) src).position(count + ((Buffer) src).position());
172                 }
173                 return count;
174             }
175 
176             @Override
177             protected void closeImpl() throws IOException {
178                 bis.close();
179             }
180         };
181     }
182 
183     private static WritableBufferedByteChannel newChannel(
184             @Nonnull OutputStream out, @Nonnegative final int bufferSize) {
185         requireNonNull(out, "the stream should not be null");
186         final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream(out, bufferSize);
187 
188         return new AbstractNoninterruptibleWritableChannel() {
189             private final AtomicLong bytesCounter = new AtomicLong();
190 
191             @Override
192             public long countBufferOverflows() {
193                 return bufferSize == 0 ? 0 : max(bytesCounter.get() - 1, 0) / bufferSize;
194             }
195 
196             @Override
197             protected void writeImpl(ByteBuffer src) throws IOException {
198                 int count = src.remaining();
199                 bos.write(src.array(), src.arrayOffset() + ((Buffer) src).position(), count);
200                 bytesCounter.getAndAdd(count);
201             }
202 
203             @Override
204             protected void closeImpl() throws IOException {
205                 bos.close();
206             }
207 
208             @Override
209             protected void flushImpl() throws IOException {
210                 bos.flush();
211             }
212         };
213     }
214 }