1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.transport.jetty;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ByteChannel;
25 import java.nio.channels.Channels;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.ReadableByteChannel;
28 import java.nio.channels.SeekableByteChannel;
29 import java.nio.file.Files;
30 import java.nio.file.StandardOpenOption;
31 import java.util.Objects;
32 import java.util.function.Supplier;
33
34 import org.eclipse.aether.spi.connector.transport.PutTask;
35 import org.eclipse.jetty.client.ByteBufferRequestContent;
36 import org.eclipse.jetty.client.Request;
37 import org.eclipse.jetty.io.ByteBufferPool;
38 import org.eclipse.jetty.io.Content;
39 import org.eclipse.jetty.io.RetainableByteBuffer;
40 import org.eclipse.jetty.io.internal.ByteChannelContentSource;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.ExceptionUtil;
43 import org.eclipse.jetty.util.IO;
44 import org.eclipse.jetty.util.TypeUtil;
45 import org.eclipse.jetty.util.thread.AutoLock;
46 import org.eclipse.jetty.util.thread.SerializedInvoker;
47
48
49
50
51
52
53
54
55 public class PutTaskRequestContent extends ByteBufferRequestContent implements Request.Content {
56
57 public static Request.Content from(PutTask putTask) {
58 Supplier<ReadableByteChannel> newChannelSupplier;
59 if (putTask.getDataPath() != null) {
60 newChannelSupplier = () -> {
61 try {
62 return Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
63 } catch (IOException e) {
64 throw new UncheckedIOException(e);
65 }
66 };
67 } else {
68 newChannelSupplier = () -> {
69 try {
70 return Channels.newChannel(putTask.newInputStream());
71 } catch (IOException e) {
72 throw new UncheckedIOException(e);
73 }
74 };
75 }
76 return new PutTaskRequestContent(null, newChannelSupplier, 0L, putTask.getDataLength());
77 }
78
79 private final AutoLock lock = new AutoLock();
80 private final SerializedInvoker invoker = new SerializedInvoker(ByteChannelContentSource.class);
81 private final ByteBufferPool.Sized byteBufferPool;
82 private ReadableByteChannel byteChannel;
83 private final long offset;
84 private final long length;
85 private RetainableByteBuffer buffer;
86 private long offsetRemaining;
87 private long totalRead;
88 private Runnable demandCallback;
89 private Content.Chunk terminal;
90
91 private final Supplier<ReadableByteChannel> newByteChannelSupplier;
92
93
94
95
96
97
98 protected PutTaskRequestContent(
99 ByteBufferPool.Sized byteBufferPool, Supplier<ReadableByteChannel> newByteChannelSupplier) {
100 this(byteBufferPool, newByteChannelSupplier, 0L, -1L);
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 protected PutTaskRequestContent(
117 ByteBufferPool.Sized byteBufferPool,
118 Supplier<ReadableByteChannel> newByteChannelSupplier,
119 long offset,
120 long length) {
121 this.byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING);
122 this.byteChannel = newByteChannelSupplier.get();
123 this.offset = offset;
124 this.length = TypeUtil.checkOffsetLengthSize(offset, length, -1L);
125 offsetRemaining = offset;
126 this.newByteChannelSupplier = newByteChannelSupplier;
127 }
128
129 protected ReadableByteChannel open() throws IOException {
130 return byteChannel;
131 }
132
133 @Override
134 public void demand(Runnable demandCallback) {
135 try (AutoLock ignored = lock.lock()) {
136 if (this.demandCallback != null) {
137 throw new IllegalStateException("demand pending");
138 }
139 this.demandCallback = demandCallback;
140 }
141 invoker.run(this::invokeDemandCallback);
142 }
143
144 private void invokeDemandCallback() {
145 Runnable demandCallback;
146 try (AutoLock ignored = lock.lock()) {
147 demandCallback = this.demandCallback;
148 this.demandCallback = null;
149 }
150 if (demandCallback != null) {
151 ExceptionUtil.run(demandCallback, this::fail);
152 }
153 }
154
155 protected void lockedSetTerminal(Content.Chunk terminal) {
156 assert lock.isHeldByCurrentThread();
157 if (terminal != null) {
158 ExceptionUtil.addSuppressedIfNotAssociated(terminal.getFailure(), terminal.getFailure());
159 }
160 IO.close(byteChannel);
161 if (buffer != null) {
162 buffer.release();
163 }
164 buffer = null;
165 }
166
167 private void lockedEnsureOpenOrTerminal() {
168 assert lock.isHeldByCurrentThread();
169 if (terminal == null && (byteChannel == null || !byteChannel.isOpen())) {
170 try {
171 byteChannel = open();
172 if (byteChannel == null || !byteChannel.isOpen()) {
173 lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true));
174 } else if (byteChannel instanceof SeekableByteChannel) {
175 ((SeekableByteChannel) byteChannel).position(offset);
176 offsetRemaining = 0;
177 }
178 } catch (IOException e) {
179 lockedSetTerminal(Content.Chunk.from(e, true));
180 }
181 }
182 }
183
184 @Override
185 public Content.Chunk read() {
186 try (AutoLock ignored = lock.lock()) {
187 lockedEnsureOpenOrTerminal();
188
189 if (terminal != null) {
190 return terminal;
191 }
192
193 if (length == 0) {
194 lockedSetTerminal(Content.Chunk.EOF);
195 return Content.Chunk.EOF;
196 }
197
198 if (buffer == null) {
199 buffer = byteBufferPool.acquire();
200 } else if (buffer.isRetained()) {
201 buffer.release();
202 buffer = byteBufferPool.acquire();
203 }
204
205 try {
206 ByteBuffer byteBuffer = buffer.getByteBuffer();
207 if (offsetRemaining > 0) {
208
209 while (offsetRemaining > 0) {
210 BufferUtil.clearToFill(byteBuffer);
211 byteBuffer.limit((int) Math.min(buffer.capacity(), offsetRemaining));
212 int read = byteChannel.read(byteBuffer);
213 if (read < 0) {
214 lockedSetTerminal(Content.Chunk.EOF);
215 return terminal;
216 }
217 if (read == 0) {
218 return null;
219 }
220
221 offsetRemaining -= read;
222 }
223 }
224
225 BufferUtil.clearToFill(byteBuffer);
226 if (length > 0) {
227 byteBuffer.limit((int) Math.min(buffer.capacity(), length - totalRead));
228 }
229 int read = byteChannel.read(byteBuffer);
230 BufferUtil.flipToFlush(byteBuffer, 0);
231 if (read == 0) {
232 return null;
233 }
234 if (read > 0) {
235 totalRead += read;
236 buffer.retain();
237 if (length < 0 || totalRead < length) {
238 return Content.Chunk.asChunk(byteBuffer, false, buffer);
239 }
240
241 Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, buffer);
242 lockedSetTerminal(Content.Chunk.EOF);
243 return last;
244 }
245 lockedSetTerminal(Content.Chunk.EOF);
246 } catch (Throwable t) {
247 lockedSetTerminal(Content.Chunk.from(t, true));
248 }
249 }
250 return terminal;
251 }
252
253 @Override
254 public void fail(Throwable failure) {
255 try (AutoLock ignored = lock.lock()) {
256 lockedSetTerminal(Content.Chunk.from(failure, true));
257 }
258 }
259
260 @Override
261 public long getLength() {
262 return length;
263 }
264
265 @Override
266 public boolean rewind() {
267 try (AutoLock ignored = lock.lock()) {
268
269 if (!(byteChannel instanceof SeekableByteChannel)) {
270 try {
271 byteChannel.close();
272 } catch (IOException e) {
273 throw new UncheckedIOException(e);
274 }
275 byteChannel = newByteChannelSupplier.get();
276 offsetRemaining = 0;
277 totalRead = 0;
278 return true;
279 }
280
281
282 if (terminal != null
283 && !Content.Chunk.isFailure(terminal)
284 && (byteChannel == null || byteChannel instanceof SeekableByteChannel)) {
285 terminal = null;
286 }
287
288 lockedEnsureOpenOrTerminal();
289 if (terminal != null || byteChannel == null || !byteChannel.isOpen()) {
290 return false;
291 }
292
293 try {
294 ((SeekableByteChannel) byteChannel).position(offset);
295 offsetRemaining = 0;
296 totalRead = 0;
297 return true;
298 } catch (Throwable t) {
299 lockedSetTerminal(Content.Chunk.from(t, true));
300 }
301
302 return true;
303 }
304 }
305 }