1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.transport.jetty;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.Channels;
25 import java.nio.channels.ReadableByteChannel;
26 import java.nio.file.Files;
27 import java.nio.file.StandardOpenOption;
28
29 import org.eclipse.aether.spi.connector.transport.PutTask;
30 import org.eclipse.jetty.client.util.AbstractRequestContent;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.Callback;
34 import org.eclipse.jetty.util.IO;
35
36 class PutTaskRequestContent extends AbstractRequestContent {
37 private final PutTask putTask;
38 private final int bufferSize;
39 private ByteBufferPool bufferPool;
40 private boolean useDirectByteBuffers = true;
41
42 @SuppressWarnings("checkstyle:MagicNumber")
43 PutTaskRequestContent(PutTask putTask) {
44 this(putTask, 4096);
45 }
46
47 PutTaskRequestContent(PutTask putTask, int bufferSize) {
48 super("application/octet-stream");
49 this.putTask = putTask;
50 this.bufferSize = bufferSize;
51 }
52
53 @Override
54 public long getLength() {
55 return putTask.getDataLength();
56 }
57
58 @Override
59 public boolean isReproducible() {
60 return true;
61 }
62
63 public ByteBufferPool getByteBufferPool() {
64 return bufferPool;
65 }
66
67 public void setByteBufferPool(ByteBufferPool byteBufferPool) {
68 this.bufferPool = byteBufferPool;
69 }
70
71 public boolean isUseDirectByteBuffers() {
72 return useDirectByteBuffers;
73 }
74
75 public void setUseDirectByteBuffers(boolean useDirectByteBuffers) {
76 this.useDirectByteBuffers = useDirectByteBuffers;
77 }
78
79 @Override
80 protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent) {
81 return new SubscriptionImpl(consumer, emitInitialContent);
82 }
83
84 private class SubscriptionImpl extends AbstractSubscription {
85 private ReadableByteChannel channel;
86 private long readTotal;
87
88 private SubscriptionImpl(Consumer consumer, boolean emitInitialContent) {
89 super(consumer, emitInitialContent);
90 }
91
92 @Override
93 protected boolean produceContent(Producer producer) throws IOException {
94 ByteBuffer buffer;
95 boolean last;
96 if (channel == null) {
97 if (putTask.getDataPath() != null) {
98 channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
99 } else {
100 channel = Channels.newChannel(putTask.newInputStream());
101 }
102 }
103
104 buffer = bufferPool == null
105 ? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers())
106 : bufferPool.acquire(bufferSize, isUseDirectByteBuffers());
107
108 BufferUtil.clearToFill(buffer);
109 int read = channel.read(buffer);
110 BufferUtil.flipToFlush(buffer, 0);
111 if (!channel.isOpen() && read < 0) {
112 throw new EOFException("EOF reached for " + putTask);
113 }
114 if (read > 0) {
115 readTotal += read;
116 }
117 last = readTotal == getLength();
118 if (last) {
119 IO.close(channel);
120 }
121 return producer.produce(buffer, last, Callback.from(() -> release(buffer)));
122 }
123
124 private void release(ByteBuffer buffer) {
125 if (bufferPool != null) {
126 bufferPool.release(buffer);
127 }
128 }
129
130 @Override
131 public void fail(Throwable failure) {
132 super.fail(failure);
133 IO.close(channel);
134 }
135 }
136 }