View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.jetty;
20  
21  import java.io.IOException;
22  import java.io.UncheckedIOException;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.ByteChannel;
25  import java.nio.channels.Channels;
26  import java.nio.channels.ClosedChannelException;
27  import java.nio.channels.ReadableByteChannel;
28  import java.nio.channels.SeekableByteChannel;
29  import java.nio.file.Files;
30  import java.nio.file.StandardOpenOption;
31  import java.util.Objects;
32  import java.util.function.Supplier;
33  
34  import org.eclipse.aether.spi.connector.transport.PutTask;
35  import org.eclipse.jetty.client.ByteBufferRequestContent;
36  import org.eclipse.jetty.client.Request;
37  import org.eclipse.jetty.io.ByteBufferPool;
38  import org.eclipse.jetty.io.Content;
39  import org.eclipse.jetty.io.RetainableByteBuffer;
40  import org.eclipse.jetty.io.internal.ByteChannelContentSource;
41  import org.eclipse.jetty.util.BufferUtil;
42  import org.eclipse.jetty.util.ExceptionUtil;
43  import org.eclipse.jetty.util.IO;
44  import org.eclipse.jetty.util.TypeUtil;
45  import org.eclipse.jetty.util.thread.AutoLock;
46  import org.eclipse.jetty.util.thread.SerializedInvoker;
47  
48  /**
49   * Heavily inspired by Jetty's {@code org.eclipse.jetty.io.internal.ByteChannelContentSource} but adjusted to deal with
50   * {@link ReadableByteChannel}s and to support rewind (to be able to retry the requests).
51   * Also, Jetty's {@code ByteChannelContentSource} is an internal package so should not be used directly.
52   * @see <a href="https://javadoc.jetty.org/jetty-12/org/eclipse/jetty/io/internal/ByteChannelContentSource.html">ByteChannelContentSource</a>
53   * @see <a href="https://github.com/jetty/jetty.project/issues/14324">Jetty Issue #14324</a>
54   */
55  public class PutTaskRequestContent extends ByteBufferRequestContent implements Request.Content {
56  
57      public static Request.Content from(PutTask putTask) {
58          Supplier<ReadableByteChannel> newChannelSupplier;
59          if (putTask.getDataPath() != null) {
60              newChannelSupplier = () -> {
61                  try {
62                      return Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
63                  } catch (IOException e) {
64                      throw new UncheckedIOException(e);
65                  }
66              };
67          } else {
68              newChannelSupplier = () -> {
69                  try {
70                      return Channels.newChannel(putTask.newInputStream());
71                  } catch (IOException e) {
72                      throw new UncheckedIOException(e);
73                  }
74              };
75          }
76          return new PutTaskRequestContent(null, newChannelSupplier, 0L, putTask.getDataLength());
77      }
78  
79      private final AutoLock lock = new AutoLock();
80      private final SerializedInvoker invoker = new SerializedInvoker(ByteChannelContentSource.class);
81      private final ByteBufferPool.Sized byteBufferPool;
82      private ReadableByteChannel byteChannel;
83      private final long offset;
84      private final long length;
85      private RetainableByteBuffer buffer;
86      private long offsetRemaining;
87      private long totalRead;
88      private Runnable demandCallback;
89      private Content.Chunk terminal;
90      /** Only necessary for rewind support when leveraging the input stream. */
91      private final Supplier<ReadableByteChannel> newByteChannelSupplier;
92  
93      /**
94       * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}.
95       * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
96       * @param newByteChannelSupplier The {@link ByteChannel} supplier.
97       */
98      protected PutTaskRequestContent(
99              ByteBufferPool.Sized byteBufferPool, Supplier<ReadableByteChannel> newByteChannelSupplier) {
100         this(byteBufferPool, newByteChannelSupplier, 0L, -1L);
101     }
102 
103     /**
104      * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}.
105      * If the {@link ByteChannel} is an instance of {@link SeekableByteChannel} the implementation will use
106      * {@link SeekableByteChannel#position(long)} to navigate to the starting offset.
107      * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
108      * @param newByteChannelSupplier The {@link ByteChannel} supplier.
109      * @param offset the offset byte of the content to start from.
110      *               Must be greater than or equal to 0 and less than the content length (if known).
111      * @param length the length of the content to make available, -1 for the full length.
112      *               If the size of the content is known, the length may be truncated to the content size minus the offset.
113      * @throws IndexOutOfBoundsException if the offset or length are out of range.
114      * @see TypeUtil#checkOffsetLengthSize(long, long, long)
115      */
116     protected PutTaskRequestContent(
117             ByteBufferPool.Sized byteBufferPool,
118             Supplier<ReadableByteChannel> newByteChannelSupplier,
119             long offset,
120             long length) {
121         this.byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING);
122         this.byteChannel = newByteChannelSupplier.get();
123         this.offset = offset;
124         this.length = TypeUtil.checkOffsetLengthSize(offset, length, -1L);
125         offsetRemaining = offset;
126         this.newByteChannelSupplier = newByteChannelSupplier;
127     }
128 
129     protected ReadableByteChannel open() throws IOException {
130         return byteChannel;
131     }
132 
133     @Override
134     public void demand(Runnable demandCallback) {
135         try (AutoLock ignored = lock.lock()) {
136             if (this.demandCallback != null) {
137                 throw new IllegalStateException("demand pending");
138             }
139             this.demandCallback = demandCallback;
140         }
141         invoker.run(this::invokeDemandCallback);
142     }
143 
144     private void invokeDemandCallback() {
145         Runnable demandCallback;
146         try (AutoLock ignored = lock.lock()) {
147             demandCallback = this.demandCallback;
148             this.demandCallback = null;
149         }
150         if (demandCallback != null) {
151             ExceptionUtil.run(demandCallback, this::fail);
152         }
153     }
154 
155     protected void lockedSetTerminal(Content.Chunk terminal) {
156         assert lock.isHeldByCurrentThread();
157         if (terminal != null) {
158             ExceptionUtil.addSuppressedIfNotAssociated(terminal.getFailure(), terminal.getFailure());
159         }
160         IO.close(byteChannel);
161         if (buffer != null) {
162             buffer.release();
163         }
164         buffer = null;
165     }
166 
167     private void lockedEnsureOpenOrTerminal() {
168         assert lock.isHeldByCurrentThread();
169         if (terminal == null && (byteChannel == null || !byteChannel.isOpen())) {
170             try {
171                 byteChannel = open();
172                 if (byteChannel == null || !byteChannel.isOpen()) {
173                     lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true));
174                 } else if (byteChannel instanceof SeekableByteChannel) {
175                     ((SeekableByteChannel) byteChannel).position(offset);
176                     offsetRemaining = 0;
177                 }
178             } catch (IOException e) {
179                 lockedSetTerminal(Content.Chunk.from(e, true));
180             }
181         }
182     }
183 
184     @Override
185     public Content.Chunk read() {
186         try (AutoLock ignored = lock.lock()) {
187             lockedEnsureOpenOrTerminal();
188 
189             if (terminal != null) {
190                 return terminal;
191             }
192 
193             if (length == 0) {
194                 lockedSetTerminal(Content.Chunk.EOF);
195                 return Content.Chunk.EOF;
196             }
197 
198             if (buffer == null) {
199                 buffer = byteBufferPool.acquire();
200             } else if (buffer.isRetained()) {
201                 buffer.release();
202                 buffer = byteBufferPool.acquire();
203             }
204 
205             try {
206                 ByteBuffer byteBuffer = buffer.getByteBuffer();
207                 if (offsetRemaining > 0) {
208                     // Discard all bytes read until we reach the staring offset.
209                     while (offsetRemaining > 0) {
210                         BufferUtil.clearToFill(byteBuffer);
211                         byteBuffer.limit((int) Math.min(buffer.capacity(), offsetRemaining));
212                         int read = byteChannel.read(byteBuffer);
213                         if (read < 0) {
214                             lockedSetTerminal(Content.Chunk.EOF);
215                             return terminal;
216                         }
217                         if (read == 0) {
218                             return null;
219                         }
220 
221                         offsetRemaining -= read;
222                     }
223                 }
224 
225                 BufferUtil.clearToFill(byteBuffer);
226                 if (length > 0) {
227                     byteBuffer.limit((int) Math.min(buffer.capacity(), length - totalRead));
228                 }
229                 int read = byteChannel.read(byteBuffer);
230                 BufferUtil.flipToFlush(byteBuffer, 0);
231                 if (read == 0) {
232                     return null;
233                 }
234                 if (read > 0) {
235                     totalRead += read;
236                     buffer.retain();
237                     if (length < 0 || totalRead < length) {
238                         return Content.Chunk.asChunk(byteBuffer, false, buffer);
239                     }
240 
241                     Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, buffer);
242                     lockedSetTerminal(Content.Chunk.EOF);
243                     return last;
244                 }
245                 lockedSetTerminal(Content.Chunk.EOF);
246             } catch (Throwable t) {
247                 lockedSetTerminal(Content.Chunk.from(t, true));
248             }
249         }
250         return terminal;
251     }
252 
253     @Override
254     public void fail(Throwable failure) {
255         try (AutoLock ignored = lock.lock()) {
256             lockedSetTerminal(Content.Chunk.from(failure, true));
257         }
258     }
259 
260     @Override
261     public long getLength() {
262         return length;
263     }
264 
265     @Override
266     public boolean rewind() {
267         try (AutoLock ignored = lock.lock()) {
268             // open a new ByteChannel if we don't have a SeekableByteChannel.
269             if (!(byteChannel instanceof SeekableByteChannel)) {
270                 try {
271                     byteChannel.close();
272                 } catch (IOException e) {
273                     throw new UncheckedIOException(e);
274                 }
275                 byteChannel = newByteChannelSupplier.get();
276                 offsetRemaining = 0;
277                 totalRead = 0;
278                 return true;
279             }
280 
281             // We can remove terminal condition for a rewind that is likely to occur
282             if (terminal != null
283                     && !Content.Chunk.isFailure(terminal)
284                     && (byteChannel == null || byteChannel instanceof SeekableByteChannel)) {
285                 terminal = null;
286             }
287 
288             lockedEnsureOpenOrTerminal();
289             if (terminal != null || byteChannel == null || !byteChannel.isOpen()) {
290                 return false;
291             }
292 
293             try {
294                 ((SeekableByteChannel) byteChannel).position(offset);
295                 offsetRemaining = 0;
296                 totalRead = 0;
297                 return true;
298             } catch (Throwable t) {
299                 lockedSetTerminal(Content.Chunk.from(t, true));
300             }
301 
302             return true;
303         }
304     }
305 }