001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.transport.jetty;
020
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.nio.ByteBuffer;
024import java.nio.channels.ByteChannel;
025import java.nio.channels.Channels;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.SeekableByteChannel;
029import java.nio.file.Files;
030import java.nio.file.StandardOpenOption;
031import java.util.Objects;
032import java.util.function.Supplier;
033
034import org.eclipse.aether.spi.connector.transport.PutTask;
035import org.eclipse.jetty.client.ByteBufferRequestContent;
036import org.eclipse.jetty.client.Request;
037import org.eclipse.jetty.io.ByteBufferPool;
038import org.eclipse.jetty.io.Content;
039import org.eclipse.jetty.io.RetainableByteBuffer;
040import org.eclipse.jetty.io.internal.ByteChannelContentSource;
041import org.eclipse.jetty.util.BufferUtil;
042import org.eclipse.jetty.util.ExceptionUtil;
043import org.eclipse.jetty.util.IO;
044import org.eclipse.jetty.util.TypeUtil;
045import org.eclipse.jetty.util.thread.AutoLock;
046import org.eclipse.jetty.util.thread.SerializedInvoker;
047
048/**
049 * Heavily inspired by Jetty's {@code org.eclipse.jetty.io.internal.ByteChannelContentSource} but adjusted to deal with
050 * {@link ReadableByteChannel}s and to support rewind (to be able to retry the requests).
051 * Also, Jetty's {@code ByteChannelContentSource} is an internal package so should not be used directly.
052 * @see <a href="https://javadoc.jetty.org/jetty-12/org/eclipse/jetty/io/internal/ByteChannelContentSource.html">ByteChannelContentSource</a>
053 * @see <a href="https://github.com/jetty/jetty.project/issues/14324">Jetty Issue #14324</a>
054 */
055public class PutTaskRequestContent extends ByteBufferRequestContent implements Request.Content {
056
057    public static Request.Content from(PutTask putTask) {
058        Supplier<ReadableByteChannel> newChannelSupplier;
059        if (putTask.getDataPath() != null) {
060            newChannelSupplier = () -> {
061                try {
062                    return Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
063                } catch (IOException e) {
064                    throw new UncheckedIOException(e);
065                }
066            };
067        } else {
068            newChannelSupplier = () -> {
069                try {
070                    return Channels.newChannel(putTask.newInputStream());
071                } catch (IOException e) {
072                    throw new UncheckedIOException(e);
073                }
074            };
075        }
076        return new PutTaskRequestContent(null, newChannelSupplier, 0L, putTask.getDataLength());
077    }
078
079    private final AutoLock lock = new AutoLock();
080    private final SerializedInvoker invoker = new SerializedInvoker(ByteChannelContentSource.class);
081    private final ByteBufferPool.Sized byteBufferPool;
082    private ReadableByteChannel byteChannel;
083    private final long offset;
084    private final long length;
085    private RetainableByteBuffer buffer;
086    private long offsetRemaining;
087    private long totalRead;
088    private Runnable demandCallback;
089    private Content.Chunk terminal;
090    /** Only necessary for rewind support when leveraging the input stream. */
091    private final Supplier<ReadableByteChannel> newByteChannelSupplier;
092
093    /**
094     * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}.
095     * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
096     * @param newByteChannelSupplier The {@link ByteChannel} supplier.
097     */
098    protected PutTaskRequestContent(
099            ByteBufferPool.Sized byteBufferPool, Supplier<ReadableByteChannel> newByteChannelSupplier) {
100        this(byteBufferPool, newByteChannelSupplier, 0L, -1L);
101    }
102
103    /**
104     * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}.
105     * If the {@link ByteChannel} is an instance of {@link SeekableByteChannel} the implementation will use
106     * {@link SeekableByteChannel#position(long)} to navigate to the starting offset.
107     * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
108     * @param newByteChannelSupplier The {@link ByteChannel} supplier.
109     * @param offset the offset byte of the content to start from.
110     *               Must be greater than or equal to 0 and less than the content length (if known).
111     * @param length the length of the content to make available, -1 for the full length.
112     *               If the size of the content is known, the length may be truncated to the content size minus the offset.
113     * @throws IndexOutOfBoundsException if the offset or length are out of range.
114     * @see TypeUtil#checkOffsetLengthSize(long, long, long)
115     */
116    protected PutTaskRequestContent(
117            ByteBufferPool.Sized byteBufferPool,
118            Supplier<ReadableByteChannel> newByteChannelSupplier,
119            long offset,
120            long length) {
121        this.byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING);
122        this.byteChannel = newByteChannelSupplier.get();
123        this.offset = offset;
124        this.length = TypeUtil.checkOffsetLengthSize(offset, length, -1L);
125        offsetRemaining = offset;
126        this.newByteChannelSupplier = newByteChannelSupplier;
127    }
128
129    protected ReadableByteChannel open() throws IOException {
130        return byteChannel;
131    }
132
133    @Override
134    public void demand(Runnable demandCallback) {
135        try (AutoLock ignored = lock.lock()) {
136            if (this.demandCallback != null) {
137                throw new IllegalStateException("demand pending");
138            }
139            this.demandCallback = demandCallback;
140        }
141        invoker.run(this::invokeDemandCallback);
142    }
143
144    private void invokeDemandCallback() {
145        Runnable demandCallback;
146        try (AutoLock ignored = lock.lock()) {
147            demandCallback = this.demandCallback;
148            this.demandCallback = null;
149        }
150        if (demandCallback != null) {
151            ExceptionUtil.run(demandCallback, this::fail);
152        }
153    }
154
155    protected void lockedSetTerminal(Content.Chunk terminal) {
156        assert lock.isHeldByCurrentThread();
157        if (terminal != null) {
158            ExceptionUtil.addSuppressedIfNotAssociated(terminal.getFailure(), terminal.getFailure());
159        }
160        IO.close(byteChannel);
161        if (buffer != null) {
162            buffer.release();
163        }
164        buffer = null;
165    }
166
167    private void lockedEnsureOpenOrTerminal() {
168        assert lock.isHeldByCurrentThread();
169        if (terminal == null && (byteChannel == null || !byteChannel.isOpen())) {
170            try {
171                byteChannel = open();
172                if (byteChannel == null || !byteChannel.isOpen()) {
173                    lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true));
174                } else if (byteChannel instanceof SeekableByteChannel) {
175                    ((SeekableByteChannel) byteChannel).position(offset);
176                    offsetRemaining = 0;
177                }
178            } catch (IOException e) {
179                lockedSetTerminal(Content.Chunk.from(e, true));
180            }
181        }
182    }
183
184    @Override
185    public Content.Chunk read() {
186        try (AutoLock ignored = lock.lock()) {
187            lockedEnsureOpenOrTerminal();
188
189            if (terminal != null) {
190                return terminal;
191            }
192
193            if (length == 0) {
194                lockedSetTerminal(Content.Chunk.EOF);
195                return Content.Chunk.EOF;
196            }
197
198            if (buffer == null) {
199                buffer = byteBufferPool.acquire();
200            } else if (buffer.isRetained()) {
201                buffer.release();
202                buffer = byteBufferPool.acquire();
203            }
204
205            try {
206                ByteBuffer byteBuffer = buffer.getByteBuffer();
207                if (offsetRemaining > 0) {
208                    // Discard all bytes read until we reach the staring offset.
209                    while (offsetRemaining > 0) {
210                        BufferUtil.clearToFill(byteBuffer);
211                        byteBuffer.limit((int) Math.min(buffer.capacity(), offsetRemaining));
212                        int read = byteChannel.read(byteBuffer);
213                        if (read < 0) {
214                            lockedSetTerminal(Content.Chunk.EOF);
215                            return terminal;
216                        }
217                        if (read == 0) {
218                            return null;
219                        }
220
221                        offsetRemaining -= read;
222                    }
223                }
224
225                BufferUtil.clearToFill(byteBuffer);
226                if (length > 0) {
227                    byteBuffer.limit((int) Math.min(buffer.capacity(), length - totalRead));
228                }
229                int read = byteChannel.read(byteBuffer);
230                BufferUtil.flipToFlush(byteBuffer, 0);
231                if (read == 0) {
232                    return null;
233                }
234                if (read > 0) {
235                    totalRead += read;
236                    buffer.retain();
237                    if (length < 0 || totalRead < length) {
238                        return Content.Chunk.asChunk(byteBuffer, false, buffer);
239                    }
240
241                    Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, buffer);
242                    lockedSetTerminal(Content.Chunk.EOF);
243                    return last;
244                }
245                lockedSetTerminal(Content.Chunk.EOF);
246            } catch (Throwable t) {
247                lockedSetTerminal(Content.Chunk.from(t, true));
248            }
249        }
250        return terminal;
251    }
252
253    @Override
254    public void fail(Throwable failure) {
255        try (AutoLock ignored = lock.lock()) {
256            lockedSetTerminal(Content.Chunk.from(failure, true));
257        }
258    }
259
260    @Override
261    public long getLength() {
262        return length;
263    }
264
265    @Override
266    public boolean rewind() {
267        try (AutoLock ignored = lock.lock()) {
268            // open a new ByteChannel if we don't have a SeekableByteChannel.
269            if (!(byteChannel instanceof SeekableByteChannel)) {
270                try {
271                    byteChannel.close();
272                } catch (IOException e) {
273                    throw new UncheckedIOException(e);
274                }
275                byteChannel = newByteChannelSupplier.get();
276                offsetRemaining = 0;
277                totalRead = 0;
278                return true;
279            }
280
281            // We can remove terminal condition for a rewind that is likely to occur
282            if (terminal != null
283                    && !Content.Chunk.isFailure(terminal)
284                    && (byteChannel == null || byteChannel instanceof SeekableByteChannel)) {
285                terminal = null;
286            }
287
288            lockedEnsureOpenOrTerminal();
289            if (terminal != null || byteChannel == null || !byteChannel.isOpen()) {
290                return false;
291            }
292
293            try {
294                ((SeekableByteChannel) byteChannel).position(offset);
295                offsetRemaining = 0;
296                totalRead = 0;
297                return true;
298            } catch (Throwable t) {
299                lockedSetTerminal(Content.Chunk.from(t, true));
300            }
301
302            return true;
303        }
304    }
305}