1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.util.internal;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.nio.Buffer;
30 import java.nio.ByteBuffer;
31 import java.nio.channels.AsynchronousByteChannel;
32 import java.nio.channels.ClosedChannelException;
33 import java.nio.channels.ReadableByteChannel;
34 import java.nio.channels.WritableByteChannel;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import static java.lang.Math.max;
39 import static java.util.Objects.requireNonNull;
40
41
42
43
44
45
46
47
48
49
50
51
52 public final class Channels {
53 private static final int BUFFER_SIZE = 64 * 1024;
54
55 private Channels() {
56 throw new IllegalStateException("no instantiable constructor");
57 }
58
59 public static WritableByteChannel newChannel(@Nonnull OutputStream out) {
60 return newChannel(out, 0);
61 }
62
63 public static WritableBufferedByteChannel newBufferedChannel(@Nonnull OutputStream out) {
64 return newChannel(out, BUFFER_SIZE);
65 }
66
67 public static ReadableByteChannel newChannel(@Nonnull final InputStream is) {
68 return newChannel(is, 0);
69 }
70
71 public static ReadableByteChannel newBufferedChannel(@Nonnull final InputStream is) {
72 return newChannel(is, BUFFER_SIZE);
73 }
74
75 public static OutputStream newOutputStream(final AsynchronousByteChannel channel) {
76 return new OutputStream() {
77 @Override
78 public synchronized void write(byte[] b, int off, int len) throws IOException {
79 if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
80 throw new IndexOutOfBoundsException("b.length = " + b.length + ", off = " + off + ", len = " + len);
81 } else if (len > 0) {
82 ByteBuffer bb = ByteBuffer.wrap(b, off, len);
83 while (bb.hasRemaining()) {
84 try {
85 channel.write(bb).get();
86 } catch (ExecutionException e) {
87 Throwable t = e.getCause();
88 throw t instanceof IOException
89 ? (IOException) t
90 : new IOException((t == null ? e : t).getLocalizedMessage(), t);
91 } catch (Exception e) {
92 throw new IOException(e.getLocalizedMessage(), e);
93 }
94 }
95 }
96 }
97
98 @Override
99 public void write(int b) throws IOException {
100 write(new byte[] {(byte) b});
101 }
102
103 @Override
104 public synchronized void close() throws IOException {
105 if (channel.isOpen()) {
106 try {
107 channel.close();
108 } catch (ClosedChannelException e) {
109
110 }
111 }
112 }
113 };
114 }
115
116 public static InputStream newInputStream(final AsynchronousByteChannel channel) {
117 return new InputStream() {
118 @Override
119 public synchronized int read(byte[] b, int off, int len) throws IOException {
120 if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
121 throw new IndexOutOfBoundsException("b.length = " + b.length + ", off = " + off + ", len = " + len);
122 } else if (len == 0) {
123 return 0;
124 }
125 ByteBuffer bb = ByteBuffer.wrap(b, off, len);
126 try {
127 return channel.read(bb).get();
128 } catch (ExecutionException e) {
129 Throwable t = e.getCause();
130 throw t instanceof IOException
131 ? (IOException) t
132 : new IOException((t == null ? e : t).getLocalizedMessage(), t);
133 } catch (Exception e) {
134 throw new IOException(e.getLocalizedMessage(), e);
135 }
136 }
137
138 @Override
139 public int read() throws IOException {
140 int count;
141 byte[] b = new byte[1];
142 do {
143 count = read(b, 0, 1);
144 } while (count == 0);
145
146 return count == -1 ? -1 : b[0];
147 }
148
149 @Override
150 public synchronized void close() throws IOException {
151 if (channel.isOpen()) {
152 try {
153 channel.close();
154 } catch (ClosedChannelException e) {
155
156 }
157 }
158 }
159 };
160 }
161
162 private static ReadableByteChannel newChannel(@Nonnull InputStream is, @Nonnegative int bufferSize) {
163 requireNonNull(is, "the stream should not be null");
164 final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream(is, bufferSize);
165
166 return new AbstractNoninterruptibleReadableChannel() {
167 @Override
168 protected int readImpl(ByteBuffer src) throws IOException {
169 int count = bis.read(src.array(), src.arrayOffset() + ((Buffer) src).position(), src.remaining());
170 if (count > 0) {
171 ((Buffer) src).position(count + ((Buffer) src).position());
172 }
173 return count;
174 }
175
176 @Override
177 protected void closeImpl() throws IOException {
178 bis.close();
179 }
180 };
181 }
182
183 private static WritableBufferedByteChannel newChannel(
184 @Nonnull OutputStream out, @Nonnegative final int bufferSize) {
185 requireNonNull(out, "the stream should not be null");
186 final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream(out, bufferSize);
187
188 return new AbstractNoninterruptibleWritableChannel() {
189 private final AtomicLong bytesCounter = new AtomicLong();
190
191 @Override
192 public long countBufferOverflows() {
193 return bufferSize == 0 ? 0 : max(bytesCounter.get() - 1, 0) / bufferSize;
194 }
195
196 @Override
197 protected void writeImpl(ByteBuffer src) throws IOException {
198 int count = src.remaining();
199 bos.write(src.array(), src.arrayOffset() + ((Buffer) src).position(), count);
200 bytesCounter.getAndAdd(count);
201 }
202
203 @Override
204 protected void closeImpl() throws IOException {
205 bos.close();
206 }
207
208 @Override
209 protected void flushImpl() throws IOException {
210 bos.flush();
211 }
212 };
213 }
214 }