1 package org.apache.maven.surefire.api.util.internal;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import javax.annotation.Nonnegative;
23 import javax.annotation.Nonnull;
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.nio.Buffer;
30 import java.nio.ByteBuffer;
31 import java.nio.channels.AsynchronousByteChannel;
32 import java.nio.channels.ClosedChannelException;
33 import java.nio.channels.ReadableByteChannel;
34 import java.nio.channels.WritableByteChannel;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import static java.lang.Math.max;
39 import static java.util.Objects.requireNonNull;
40
41
42
43
44
45
46
47
48
49
50
51
52 public final class Channels
53 {
54 private static final int BUFFER_SIZE = 64 * 1024;
55
56 private Channels()
57 {
58 throw new IllegalStateException( "no instantiable constructor" );
59 }
60
61 public static WritableByteChannel newChannel( @Nonnull OutputStream out )
62 {
63 return newChannel( out, 0 );
64 }
65
66 public static WritableBufferedByteChannel newBufferedChannel( @Nonnull OutputStream out )
67 {
68 return newChannel( out, BUFFER_SIZE );
69 }
70
71 public static ReadableByteChannel newChannel( @Nonnull final InputStream is )
72 {
73 return newChannel( is, 0 );
74 }
75
76 public static ReadableByteChannel newBufferedChannel( @Nonnull final InputStream is )
77 {
78 return newChannel( is, BUFFER_SIZE );
79 }
80
81 public static OutputStream newOutputStream( final AsynchronousByteChannel channel )
82 {
83 return new OutputStream()
84 {
85 @Override
86 public synchronized void write( byte[] b, int off, int len ) throws IOException
87 {
88 if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
89 {
90 throw new IndexOutOfBoundsException(
91 "b.length = " + b.length + ", off = " + off + ", len = " + len );
92 }
93 else if ( len > 0 )
94 {
95 ByteBuffer bb = ByteBuffer.wrap( b, off, len );
96 while ( bb.hasRemaining() )
97 {
98 try
99 {
100 channel.write( bb ).get();
101 }
102 catch ( ExecutionException e )
103 {
104 Throwable t = e.getCause();
105 throw t instanceof IOException
106 ? (IOException) t
107 : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
108 }
109 catch ( Exception e )
110 {
111 throw new IOException( e.getLocalizedMessage(), e );
112 }
113 }
114 }
115 }
116
117 @Override
118 public void write( int b ) throws IOException
119 {
120 write( new byte[] {(byte) b} );
121 }
122
123 @Override
124 public synchronized void close() throws IOException
125 {
126 if ( channel.isOpen() )
127 {
128 try
129 {
130 channel.close();
131 }
132 catch ( ClosedChannelException e )
133 {
134
135 }
136 }
137 }
138 };
139 }
140
141 public static InputStream newInputStream( final AsynchronousByteChannel channel )
142 {
143 return new InputStream()
144 {
145 @Override
146 public synchronized int read( byte[] b, int off, int len ) throws IOException
147 {
148 if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
149 {
150 throw new IndexOutOfBoundsException(
151 "b.length = " + b.length + ", off = " + off + ", len = " + len );
152 }
153 else if ( len == 0 )
154 {
155 return 0;
156 }
157 ByteBuffer bb = ByteBuffer.wrap( b, off, len );
158 try
159 {
160 return channel.read( bb ).get();
161 }
162 catch ( ExecutionException e )
163 {
164 Throwable t = e.getCause();
165 throw t instanceof IOException
166 ? (IOException) t
167 : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
168 }
169 catch ( Exception e )
170 {
171 throw new IOException( e.getLocalizedMessage(), e );
172 }
173 }
174
175 @Override
176 public int read() throws IOException
177 {
178 int count;
179 byte[] b = new byte[1];
180 do
181 {
182 count = read( b, 0, 1 );
183 }
184 while ( count == 0 );
185
186 return count == -1 ? -1 : b[0];
187 }
188
189 @Override
190 public synchronized void close() throws IOException
191 {
192 if ( channel.isOpen() )
193 {
194 try
195 {
196 channel.close();
197 }
198 catch ( ClosedChannelException e )
199 {
200
201 }
202 }
203 }
204 };
205 }
206
207 private static ReadableByteChannel newChannel( @Nonnull InputStream is, @Nonnegative int bufferSize )
208 {
209 requireNonNull( is, "the stream should not be null" );
210 final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream( is, bufferSize );
211
212 return new AbstractNoninterruptibleReadableChannel()
213 {
214 @Override
215 protected int readImpl( ByteBuffer src ) throws IOException
216 {
217 int count = bis.read( src.array(), src.arrayOffset() + ( (Buffer) src ).position(), src.remaining() );
218 if ( count > 0 )
219 {
220 ( (Buffer) src ).position( count + ( (Buffer) src ).position() );
221 }
222 return count;
223 }
224
225 @Override
226 protected void closeImpl() throws IOException
227 {
228 bis.close();
229 }
230 };
231 }
232
233 private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out,
234 @Nonnegative final int bufferSize )
235 {
236 requireNonNull( out, "the stream should not be null" );
237 final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize );
238
239 return new AbstractNoninterruptibleWritableChannel()
240 {
241 private final AtomicLong bytesCounter = new AtomicLong();
242
243 @Override
244 public long countBufferOverflows()
245 {
246 return bufferSize == 0 ? 0 : max( bytesCounter.get() - 1, 0 ) / bufferSize;
247 }
248
249 @Override
250 protected void writeImpl( ByteBuffer src ) throws IOException
251 {
252 int count = src.remaining();
253 bos.write( src.array(), src.arrayOffset() + ( (Buffer) src ).position(), count );
254 bytesCounter.getAndAdd( count );
255 }
256
257 @Override
258 protected void closeImpl() throws IOException
259 {
260 bos.close();
261 }
262
263 @Override
264 protected void flushImpl() throws IOException
265 {
266 bos.flush();
267 }
268 };
269 }
270 }