1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.transport.jetty;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.Channels;
25 import java.nio.channels.ReadableByteChannel;
26 import java.nio.file.Files;
27 import java.nio.file.StandardOpenOption;
28
29 import org.eclipse.aether.spi.connector.transport.PutTask;
30 import org.eclipse.jetty.client.util.AbstractRequestContent;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.Callback;
34 import org.eclipse.jetty.util.IO;
35
36 class PutTaskRequestContent extends AbstractRequestContent {
37 private final PutTask putTask;
38 private final int bufferSize;
39 private ByteBufferPool bufferPool;
40 private boolean useDirectByteBuffers = true;
41
42 PutTaskRequestContent(PutTask putTask) {
43 this(putTask, 4096);
44 }
45
46 PutTaskRequestContent(PutTask putTask, int bufferSize) {
47 super("application/octet-stream");
48 this.putTask = putTask;
49 this.bufferSize = bufferSize;
50 }
51
52 @Override
53 public long getLength() {
54 return putTask.getDataLength();
55 }
56
57 @Override
58 public boolean isReproducible() {
59 return true;
60 }
61
62 public ByteBufferPool getByteBufferPool() {
63 return bufferPool;
64 }
65
66 public void setByteBufferPool(ByteBufferPool byteBufferPool) {
67 this.bufferPool = byteBufferPool;
68 }
69
70 public boolean isUseDirectByteBuffers() {
71 return useDirectByteBuffers;
72 }
73
74 public void setUseDirectByteBuffers(boolean useDirectByteBuffers) {
75 this.useDirectByteBuffers = useDirectByteBuffers;
76 }
77
78 @Override
79 protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent) {
80 return new SubscriptionImpl(consumer, emitInitialContent);
81 }
82
83 private class SubscriptionImpl extends AbstractSubscription {
84 private ReadableByteChannel channel;
85 private long readTotal;
86
87 private SubscriptionImpl(Consumer consumer, boolean emitInitialContent) {
88 super(consumer, emitInitialContent);
89 }
90
91 @Override
92 protected boolean produceContent(Producer producer) throws IOException {
93 ByteBuffer buffer;
94 boolean last;
95 if (channel == null) {
96 if (putTask.getDataPath() != null) {
97 channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
98 } else {
99 channel = Channels.newChannel(putTask.newInputStream());
100 }
101 }
102
103 buffer = bufferPool == null
104 ? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers())
105 : bufferPool.acquire(bufferSize, isUseDirectByteBuffers());
106
107 BufferUtil.clearToFill(buffer);
108 int read = channel.read(buffer);
109 BufferUtil.flipToFlush(buffer, 0);
110 if (!channel.isOpen() && read < 0) {
111 throw new EOFException("EOF reached for " + putTask);
112 }
113 if (read > 0) {
114 readTotal += read;
115 }
116 last = readTotal == getLength();
117 if (last) {
118 IO.close(channel);
119 }
120 return producer.produce(buffer, last, Callback.from(() -> release(buffer)));
121 }
122
123 private void release(ByteBuffer buffer) {
124 if (bufferPool != null) {
125 bufferPool.release(buffer);
126 }
127 }
128
129 @Override
130 public void fail(Throwable failure) {
131 super.fail(failure);
132 IO.close(channel);
133 }
134 }
135 }