001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.transport.jetty; 020 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.nio.ByteBuffer; 024import java.nio.channels.ByteChannel; 025import java.nio.channels.Channels; 026import java.nio.channels.ClosedChannelException; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.SeekableByteChannel; 029import java.nio.file.Files; 030import java.nio.file.StandardOpenOption; 031import java.util.Objects; 032import java.util.function.Supplier; 033 034import org.eclipse.aether.spi.connector.transport.PutTask; 035import org.eclipse.jetty.client.ByteBufferRequestContent; 036import org.eclipse.jetty.client.Request; 037import org.eclipse.jetty.io.ByteBufferPool; 038import org.eclipse.jetty.io.Content; 039import org.eclipse.jetty.io.RetainableByteBuffer; 040import org.eclipse.jetty.io.internal.ByteChannelContentSource; 041import org.eclipse.jetty.util.BufferUtil; 042import org.eclipse.jetty.util.ExceptionUtil; 043import org.eclipse.jetty.util.IO; 044import org.eclipse.jetty.util.TypeUtil; 045import org.eclipse.jetty.util.thread.AutoLock; 046import org.eclipse.jetty.util.thread.SerializedInvoker; 047 048/** 049 * Heavily inspired by Jetty's {@code org.eclipse.jetty.io.internal.ByteChannelContentSource} but adjusted to deal with 050 * {@link ReadableByteChannel}s and to support rewind (to be able to retry the requests). 051 * Also, Jetty's {@code ByteChannelContentSource} is an internal package so should not be used directly. 052 * @see <a href="https://javadoc.jetty.org/jetty-12/org/eclipse/jetty/io/internal/ByteChannelContentSource.html">ByteChannelContentSource</a> 053 * @see <a href="https://github.com/jetty/jetty.project/issues/14324">Jetty Issue #14324</a> 054 */ 055public class PutTaskRequestContent extends ByteBufferRequestContent implements Request.Content { 056 057 public static Request.Content from(PutTask putTask) { 058 Supplier<ReadableByteChannel> newChannelSupplier; 059 if (putTask.getDataPath() != null) { 060 newChannelSupplier = () -> { 061 try { 062 return Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ); 063 } catch (IOException e) { 064 throw new UncheckedIOException(e); 065 } 066 }; 067 } else { 068 newChannelSupplier = () -> { 069 try { 070 return Channels.newChannel(putTask.newInputStream()); 071 } catch (IOException e) { 072 throw new UncheckedIOException(e); 073 } 074 }; 075 } 076 return new PutTaskRequestContent(null, newChannelSupplier, 0L, putTask.getDataLength()); 077 } 078 079 private final AutoLock lock = new AutoLock(); 080 private final SerializedInvoker invoker = new SerializedInvoker(ByteChannelContentSource.class); 081 private final ByteBufferPool.Sized byteBufferPool; 082 private ReadableByteChannel byteChannel; 083 private final long offset; 084 private final long length; 085 private RetainableByteBuffer buffer; 086 private long offsetRemaining; 087 private long totalRead; 088 private Runnable demandCallback; 089 private Content.Chunk terminal; 090 /** Only necessary for rewind support when leveraging the input stream. */ 091 private final Supplier<ReadableByteChannel> newByteChannelSupplier; 092 093 /** 094 * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}. 095 * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers. 096 * @param newByteChannelSupplier The {@link ByteChannel} supplier. 097 */ 098 protected PutTaskRequestContent( 099 ByteBufferPool.Sized byteBufferPool, Supplier<ReadableByteChannel> newByteChannelSupplier) { 100 this(byteBufferPool, newByteChannelSupplier, 0L, -1L); 101 } 102 103 /** 104 * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}. 105 * If the {@link ByteChannel} is an instance of {@link SeekableByteChannel} the implementation will use 106 * {@link SeekableByteChannel#position(long)} to navigate to the starting offset. 107 * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers. 108 * @param newByteChannelSupplier The {@link ByteChannel} supplier. 109 * @param offset the offset byte of the content to start from. 110 * Must be greater than or equal to 0 and less than the content length (if known). 111 * @param length the length of the content to make available, -1 for the full length. 112 * If the size of the content is known, the length may be truncated to the content size minus the offset. 113 * @throws IndexOutOfBoundsException if the offset or length are out of range. 114 * @see TypeUtil#checkOffsetLengthSize(long, long, long) 115 */ 116 protected PutTaskRequestContent( 117 ByteBufferPool.Sized byteBufferPool, 118 Supplier<ReadableByteChannel> newByteChannelSupplier, 119 long offset, 120 long length) { 121 this.byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING); 122 this.byteChannel = newByteChannelSupplier.get(); 123 this.offset = offset; 124 this.length = TypeUtil.checkOffsetLengthSize(offset, length, -1L); 125 offsetRemaining = offset; 126 this.newByteChannelSupplier = newByteChannelSupplier; 127 } 128 129 protected ReadableByteChannel open() throws IOException { 130 return byteChannel; 131 } 132 133 @Override 134 public void demand(Runnable demandCallback) { 135 try (AutoLock ignored = lock.lock()) { 136 if (this.demandCallback != null) { 137 throw new IllegalStateException("demand pending"); 138 } 139 this.demandCallback = demandCallback; 140 } 141 invoker.run(this::invokeDemandCallback); 142 } 143 144 private void invokeDemandCallback() { 145 Runnable demandCallback; 146 try (AutoLock ignored = lock.lock()) { 147 demandCallback = this.demandCallback; 148 this.demandCallback = null; 149 } 150 if (demandCallback != null) { 151 ExceptionUtil.run(demandCallback, this::fail); 152 } 153 } 154 155 protected void lockedSetTerminal(Content.Chunk terminal) { 156 assert lock.isHeldByCurrentThread(); 157 if (terminal != null) { 158 ExceptionUtil.addSuppressedIfNotAssociated(terminal.getFailure(), terminal.getFailure()); 159 } 160 IO.close(byteChannel); 161 if (buffer != null) { 162 buffer.release(); 163 } 164 buffer = null; 165 } 166 167 private void lockedEnsureOpenOrTerminal() { 168 assert lock.isHeldByCurrentThread(); 169 if (terminal == null && (byteChannel == null || !byteChannel.isOpen())) { 170 try { 171 byteChannel = open(); 172 if (byteChannel == null || !byteChannel.isOpen()) { 173 lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true)); 174 } else if (byteChannel instanceof SeekableByteChannel) { 175 ((SeekableByteChannel) byteChannel).position(offset); 176 offsetRemaining = 0; 177 } 178 } catch (IOException e) { 179 lockedSetTerminal(Content.Chunk.from(e, true)); 180 } 181 } 182 } 183 184 @Override 185 public Content.Chunk read() { 186 try (AutoLock ignored = lock.lock()) { 187 lockedEnsureOpenOrTerminal(); 188 189 if (terminal != null) { 190 return terminal; 191 } 192 193 if (length == 0) { 194 lockedSetTerminal(Content.Chunk.EOF); 195 return Content.Chunk.EOF; 196 } 197 198 if (buffer == null) { 199 buffer = byteBufferPool.acquire(); 200 } else if (buffer.isRetained()) { 201 buffer.release(); 202 buffer = byteBufferPool.acquire(); 203 } 204 205 try { 206 ByteBuffer byteBuffer = buffer.getByteBuffer(); 207 if (offsetRemaining > 0) { 208 // Discard all bytes read until we reach the staring offset. 209 while (offsetRemaining > 0) { 210 BufferUtil.clearToFill(byteBuffer); 211 byteBuffer.limit((int) Math.min(buffer.capacity(), offsetRemaining)); 212 int read = byteChannel.read(byteBuffer); 213 if (read < 0) { 214 lockedSetTerminal(Content.Chunk.EOF); 215 return terminal; 216 } 217 if (read == 0) { 218 return null; 219 } 220 221 offsetRemaining -= read; 222 } 223 } 224 225 BufferUtil.clearToFill(byteBuffer); 226 if (length > 0) { 227 byteBuffer.limit((int) Math.min(buffer.capacity(), length - totalRead)); 228 } 229 int read = byteChannel.read(byteBuffer); 230 BufferUtil.flipToFlush(byteBuffer, 0); 231 if (read == 0) { 232 return null; 233 } 234 if (read > 0) { 235 totalRead += read; 236 buffer.retain(); 237 if (length < 0 || totalRead < length) { 238 return Content.Chunk.asChunk(byteBuffer, false, buffer); 239 } 240 241 Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, buffer); 242 lockedSetTerminal(Content.Chunk.EOF); 243 return last; 244 } 245 lockedSetTerminal(Content.Chunk.EOF); 246 } catch (Throwable t) { 247 lockedSetTerminal(Content.Chunk.from(t, true)); 248 } 249 } 250 return terminal; 251 } 252 253 @Override 254 public void fail(Throwable failure) { 255 try (AutoLock ignored = lock.lock()) { 256 lockedSetTerminal(Content.Chunk.from(failure, true)); 257 } 258 } 259 260 @Override 261 public long getLength() { 262 return length; 263 } 264 265 @Override 266 public boolean rewind() { 267 try (AutoLock ignored = lock.lock()) { 268 // open a new ByteChannel if we don't have a SeekableByteChannel. 269 if (!(byteChannel instanceof SeekableByteChannel)) { 270 try { 271 byteChannel.close(); 272 } catch (IOException e) { 273 throw new UncheckedIOException(e); 274 } 275 byteChannel = newByteChannelSupplier.get(); 276 offsetRemaining = 0; 277 totalRead = 0; 278 return true; 279 } 280 281 // We can remove terminal condition for a rewind that is likely to occur 282 if (terminal != null 283 && !Content.Chunk.isFailure(terminal) 284 && (byteChannel == null || byteChannel instanceof SeekableByteChannel)) { 285 terminal = null; 286 } 287 288 lockedEnsureOpenOrTerminal(); 289 if (terminal != null || byteChannel == null || !byteChannel.isOpen()) { 290 return false; 291 } 292 293 try { 294 ((SeekableByteChannel) byteChannel).position(offset); 295 offsetRemaining = 0; 296 totalRead = 0; 297 return true; 298 } catch (Throwable t) { 299 lockedSetTerminal(Content.Chunk.from(t, true)); 300 } 301 302 return true; 303 } 304 } 305}