View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.jetty;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.Channels;
25  import java.nio.channels.ReadableByteChannel;
26  import java.nio.file.Files;
27  import java.nio.file.StandardOpenOption;
28  
29  import org.eclipse.aether.spi.connector.transport.PutTask;
30  import org.eclipse.jetty.client.util.AbstractRequestContent;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.Callback;
34  import org.eclipse.jetty.util.IO;
35  
36  class PutTaskRequestContent extends AbstractRequestContent {
37      private final PutTask putTask;
38      private final int bufferSize;
39      private ByteBufferPool bufferPool;
40      private boolean useDirectByteBuffers = true;
41  
42      PutTaskRequestContent(PutTask putTask) {
43          this(putTask, 4096);
44      }
45  
46      PutTaskRequestContent(PutTask putTask, int bufferSize) {
47          super("application/octet-stream");
48          this.putTask = putTask;
49          this.bufferSize = bufferSize;
50      }
51  
52      @Override
53      public long getLength() {
54          return putTask.getDataLength();
55      }
56  
57      @Override
58      public boolean isReproducible() {
59          return true;
60      }
61  
62      public ByteBufferPool getByteBufferPool() {
63          return bufferPool;
64      }
65  
66      public void setByteBufferPool(ByteBufferPool byteBufferPool) {
67          this.bufferPool = byteBufferPool;
68      }
69  
70      public boolean isUseDirectByteBuffers() {
71          return useDirectByteBuffers;
72      }
73  
74      public void setUseDirectByteBuffers(boolean useDirectByteBuffers) {
75          this.useDirectByteBuffers = useDirectByteBuffers;
76      }
77  
78      @Override
79      protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent) {
80          return new SubscriptionImpl(consumer, emitInitialContent);
81      }
82  
83      private class SubscriptionImpl extends AbstractSubscription {
84          private ReadableByteChannel channel;
85          private long readTotal;
86  
87          private SubscriptionImpl(Consumer consumer, boolean emitInitialContent) {
88              super(consumer, emitInitialContent);
89          }
90  
91          @Override
92          protected boolean produceContent(Producer producer) throws IOException {
93              ByteBuffer buffer;
94              boolean last;
95              if (channel == null) {
96                  if (putTask.getDataPath() != null) {
97                      channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
98                  } else {
99                      channel = Channels.newChannel(putTask.newInputStream());
100                 }
101             }
102 
103             buffer = bufferPool == null
104                     ? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers())
105                     : bufferPool.acquire(bufferSize, isUseDirectByteBuffers());
106 
107             BufferUtil.clearToFill(buffer);
108             int read = channel.read(buffer);
109             BufferUtil.flipToFlush(buffer, 0);
110             if (!channel.isOpen() && read < 0) {
111                 throw new EOFException("EOF reached for " + putTask);
112             }
113             if (read > 0) {
114                 readTotal += read;
115             }
116             last = readTotal == getLength();
117             if (last) {
118                 IO.close(channel);
119             }
120             return producer.produce(buffer, last, Callback.from(() -> release(buffer)));
121         }
122 
123         private void release(ByteBuffer buffer) {
124             if (bufferPool != null) {
125                 bufferPool.release(buffer);
126             }
127         }
128 
129         @Override
130         public void fail(Throwable failure) {
131             super.fail(failure);
132             IO.close(channel);
133         }
134     }
135 }