1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.UncheckedIOException;
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import org.eclipse.aether.ConfigurationProperties;
36 import org.eclipse.aether.RepositorySystemSession;
37 import org.eclipse.aether.RequestTrace;
38 import org.eclipse.aether.metadata.Metadata;
39 import org.eclipse.aether.repository.RemoteRepository;
40 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
41 import org.eclipse.aether.spi.connector.ArtifactDownload;
42 import org.eclipse.aether.spi.connector.ArtifactUpload;
43 import org.eclipse.aether.spi.connector.MetadataDownload;
44 import org.eclipse.aether.spi.connector.MetadataUpload;
45 import org.eclipse.aether.spi.connector.RepositoryConnector;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
49 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52 import org.eclipse.aether.spi.connector.transport.GetTask;
53 import org.eclipse.aether.spi.connector.transport.PeekTask;
54 import org.eclipse.aether.spi.connector.transport.PutTask;
55 import org.eclipse.aether.spi.connector.transport.Transporter;
56 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57 import org.eclipse.aether.spi.io.FileProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.NoTransporterException;
62 import org.eclipse.aether.transfer.TransferEvent;
63 import org.eclipse.aether.transfer.TransferResource;
64 import org.eclipse.aether.transform.FileTransformer;
65 import org.eclipse.aether.util.ConfigUtils;
66 import org.eclipse.aether.util.FileUtils;
67 import org.eclipse.aether.util.concurrency.ExecutorUtils;
68 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 import static java.util.Objects.requireNonNull;
73
74
75
76
77 final class BasicRepositoryConnector implements RepositoryConnector {
78
79 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
80
81 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82
83 private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
84
85 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
86
87 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
88
89 private final FileProcessor fileProcessor;
90
91 private final RemoteRepository repository;
92
93 private final RepositorySystemSession session;
94
95 private final Transporter transporter;
96
97 private final RepositoryLayout layout;
98
99 private final ChecksumPolicyProvider checksumPolicyProvider;
100
101 private final int maxThreads;
102
103 private final boolean smartChecksums;
104
105 private final boolean parallelPut;
106
107 private final boolean persistedChecksums;
108
109 private Executor executor;
110
111 private final AtomicBoolean closed;
112
113 BasicRepositoryConnector(
114 RepositorySystemSession session,
115 RemoteRepository repository,
116 TransporterProvider transporterProvider,
117 RepositoryLayoutProvider layoutProvider,
118 ChecksumPolicyProvider checksumPolicyProvider,
119 FileProcessor fileProcessor,
120 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
121 throws NoRepositoryConnectorException {
122 try {
123 layout = layoutProvider.newRepositoryLayout(session, repository);
124 } catch (NoRepositoryLayoutException e) {
125 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
126 }
127 try {
128 transporter = transporterProvider.newTransporter(session, repository);
129 } catch (NoTransporterException e) {
130 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
131 }
132 this.checksumPolicyProvider = checksumPolicyProvider;
133
134 this.session = session;
135 this.repository = repository;
136 this.fileProcessor = fileProcessor;
137 this.providedChecksumsSources = providedChecksumsSources;
138 this.closed = new AtomicBoolean(false);
139
140 maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
141 smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
142 parallelPut = ConfigUtils.getBoolean(
143 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
144 persistedChecksums = ConfigUtils.getBoolean(
145 session,
146 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
147 ConfigurationProperties.PERSISTED_CHECKSUMS);
148 }
149
150 private Executor getExecutor(int tasks) {
151 if (maxThreads <= 1) {
152 return ExecutorUtils.DIRECT_EXECUTOR;
153 }
154 if (tasks <= 1) {
155 return ExecutorUtils.DIRECT_EXECUTOR;
156 }
157 if (executor == null) {
158 executor =
159 ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
160 }
161 return executor;
162 }
163
164 @Override
165 public void close() {
166 if (closed.compareAndSet(false, true)) {
167 ExecutorUtils.shutdown(executor);
168 transporter.close();
169 }
170 }
171
172 private void failIfClosed() {
173 if (closed.get()) {
174 throw new IllegalStateException("connector already closed");
175 }
176 }
177
178 @Override
179 public void get(
180 Collection<? extends ArtifactDownload> artifactDownloads,
181 Collection<? extends MetadataDownload> metadataDownloads) {
182 failIfClosed();
183
184 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
185 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
186
187 Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
188 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
189 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
190
191 for (MetadataDownload transfer : safeMetadataDownloads) {
192 URI location = layout.getLocation(transfer.getMetadata(), false);
193
194 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
195 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
196 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
197
198 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
199 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
200 if (checksumPolicy != null) {
201 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
202 }
203
204 Runnable task = new GetTaskRunner(
205 location,
206 transfer.getFile(),
207 checksumPolicy,
208 checksumAlgorithmFactories,
209 checksumLocations,
210 null,
211 listener);
212 executor.execute(errorForwarder.wrap(task));
213 }
214
215 for (ArtifactDownload transfer : safeArtifactDownloads) {
216 Map<String, String> providedChecksums = Collections.emptyMap();
217 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
218 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
219 session, transfer, repository, checksumAlgorithmFactories);
220
221 if (provided != null) {
222 providedChecksums = provided;
223 break;
224 }
225 }
226
227 URI location = layout.getLocation(transfer.getArtifact(), false);
228
229 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
230 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
231 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
232
233 Runnable task;
234 if (transfer.isExistenceCheck()) {
235 task = new PeekTaskRunner(location, listener);
236 } else {
237 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
238 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
239 if (checksumPolicy != null) {
240 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
241 }
242
243 task = new GetTaskRunner(
244 location,
245 transfer.getFile(),
246 checksumPolicy,
247 checksumAlgorithmFactories,
248 checksumLocations,
249 providedChecksums,
250 listener);
251 }
252 executor.execute(errorForwarder.wrap(task));
253 }
254
255 errorForwarder.await();
256 }
257
258 @Override
259 public void put(
260 Collection<? extends ArtifactUpload> artifactUploads,
261 Collection<? extends MetadataUpload> metadataUploads) {
262 failIfClosed();
263
264 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
265 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
266
267 Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
268 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
269
270 for (ArtifactUpload transfer : safeArtifactUploads) {
271 URI location = layout.getLocation(transfer.getArtifact(), true);
272
273 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
274 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
275 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
276
277 List<RepositoryLayout.ChecksumLocation> checksumLocations =
278 layout.getChecksumLocations(transfer.getArtifact(), true, location);
279
280 Runnable task = new PutTaskRunner(
281 location, transfer.getFile(), transfer.getFileTransformer(), checksumLocations, listener);
282
283 executor.execute(errorForwarder.wrap(task));
284 }
285
286 errorForwarder.await();
287
288 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
289 for (MetadataUpload transfer : transferGroup) {
290 URI location = layout.getLocation(transfer.getMetadata(), true);
291
292 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
293 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
294 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
295
296 List<RepositoryLayout.ChecksumLocation> checksumLocations =
297 layout.getChecksumLocations(transfer.getMetadata(), true, location);
298
299 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
300
301 executor.execute(errorForwarder.wrap(task));
302 }
303
304 errorForwarder.await();
305 }
306 }
307
308
309
310
311
312
313 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
314 ArrayList<MetadataUpload> v = new ArrayList<>();
315 ArrayList<MetadataUpload> a = new ArrayList<>();
316 ArrayList<MetadataUpload> g = new ArrayList<>();
317 ArrayList<MetadataUpload> r = new ArrayList<>();
318
319 for (MetadataUpload transfer : metadataUploads) {
320 Metadata metadata = transfer.getMetadata();
321 if (!"".equals(metadata.getVersion())) {
322 v.add(transfer);
323 } else if (!"".equals(metadata.getArtifactId())) {
324 a.add(transfer);
325 } else if (!"".equals(metadata.getGroupId())) {
326 g.add(transfer);
327 } else {
328 r.add(transfer);
329 }
330 }
331
332 List<List<MetadataUpload>> result = new ArrayList<>(4);
333 if (!v.isEmpty()) {
334 result.add(v);
335 }
336 if (!a.isEmpty()) {
337 result.add(a);
338 }
339 if (!g.isEmpty()) {
340 result.add(g);
341 }
342 if (!r.isEmpty()) {
343 result.add(r);
344 }
345 return result;
346 }
347
348 private static <T> Collection<T> safe(Collection<T> items) {
349 return (items != null) ? items : Collections.emptyList();
350 }
351
352 private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
353 return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
354 }
355
356 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
357 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
358 if (upload) {
359 builder.setRequestType(TransferEvent.RequestType.PUT);
360 } else if (!peek) {
361 builder.setRequestType(TransferEvent.RequestType.GET);
362 } else {
363 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
364 }
365 return builder;
366 }
367
368 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
369 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
370 }
371
372 @Override
373 public String toString() {
374 return String.valueOf(repository);
375 }
376
377 abstract class TaskRunner implements Runnable {
378
379 protected final URI path;
380
381 protected final TransferTransportListener<?> listener;
382
383 TaskRunner(URI path, TransferTransportListener<?> listener) {
384 this.path = path;
385 this.listener = listener;
386 }
387
388 @Override
389 public void run() {
390 try {
391 listener.transferInitiated();
392 runTask();
393 listener.transferSucceeded();
394 } catch (Exception e) {
395 listener.transferFailed(e, transporter.classify(e));
396 }
397 }
398
399 protected abstract void runTask() throws Exception;
400 }
401
402 class PeekTaskRunner extends TaskRunner {
403
404 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
405 super(path, listener);
406 }
407
408 @Override
409 protected void runTask() throws Exception {
410 transporter.peek(new PeekTask(path));
411 }
412 }
413
414 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
415
416 private final File file;
417
418 private final ChecksumValidator checksumValidator;
419
420 GetTaskRunner(
421 URI path,
422 File file,
423 ChecksumPolicy checksumPolicy,
424 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
425 List<RepositoryLayout.ChecksumLocation> checksumLocations,
426 Map<String, String> providedChecksums,
427 TransferTransportListener<?> listener) {
428 super(path, listener);
429 this.file = requireNonNull(file, "destination file cannot be null");
430 checksumValidator = new ChecksumValidator(
431 file,
432 checksumAlgorithmFactories,
433 fileProcessor,
434 this,
435 checksumPolicy,
436 providedChecksums,
437 safe(checksumLocations));
438 }
439
440 @Override
441 public boolean fetchChecksum(URI remote, File local) throws Exception {
442 try {
443 transporter.get(new GetTask(remote).setDataFile(local));
444 } catch (Exception e) {
445 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
446 return false;
447 }
448 throw e;
449 }
450 return true;
451 }
452
453 @Override
454 protected void runTask() throws Exception {
455 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
456 final File tmp = tempFile.getPath().toFile();
457 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
458 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
459 GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
460 transporter.get(task);
461 try {
462 checksumValidator.validate(
463 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
464 break;
465 } catch (ChecksumFailureException e) {
466 boolean retry = trial < lastTrial && e.isRetryWorthy();
467 if (!retry && !checksumValidator.handle(e)) {
468 throw e;
469 }
470 listener.transferCorrupted(e);
471 if (retry) {
472 checksumValidator.retry();
473 } else {
474 break;
475 }
476 }
477 }
478 tempFile.move();
479 if (persistedChecksums) {
480 checksumValidator.commit();
481 }
482 }
483 }
484 }
485
486 class PutTaskRunner extends TaskRunner {
487
488 private final File file;
489
490 private final FileTransformer fileTransformer;
491
492 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
493
494 PutTaskRunner(
495 URI path,
496 File file,
497 List<RepositoryLayout.ChecksumLocation> checksumLocations,
498 TransferTransportListener<?> listener) {
499 this(path, file, null, checksumLocations, listener);
500 }
501
502
503
504
505
506
507
508
509
510
511
512 PutTaskRunner(
513 URI path,
514 File file,
515 FileTransformer fileTransformer,
516 List<RepositoryLayout.ChecksumLocation> checksumLocations,
517 TransferTransportListener<?> listener) {
518 super(path, listener);
519 this.file = requireNonNull(file, "source file cannot be null");
520 this.fileTransformer = fileTransformer;
521 this.checksumLocations = safe(checksumLocations);
522 }
523
524 @SuppressWarnings("checkstyle:innerassignment")
525 @Override
526 protected void runTask() throws Exception {
527 if (fileTransformer != null) {
528
529 ByteArrayOutputStream baos = new ByteArrayOutputStream();
530 byte[] buffer = new byte[1024];
531
532 try (InputStream transformData = fileTransformer.transformData(file)) {
533 for (int read; (read = transformData.read(buffer, 0, buffer.length)) != -1; ) {
534 baos.write(buffer, 0, read);
535 }
536 }
537
538 byte[] bytes = baos.toByteArray();
539 transporter.put(new PutTask(path).setDataBytes(bytes).setListener(listener));
540 uploadChecksums(file, bytes);
541 } else {
542 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
543 uploadChecksums(file, null);
544 }
545 }
546
547
548
549
550
551 private void uploadChecksums(File file, byte[] bytes) {
552 if (checksumLocations.isEmpty()) {
553 return;
554 }
555 try {
556 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
557 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
558 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
559 }
560
561 Map<String, String> sumsByAlgo;
562 if (bytes != null) {
563 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
564 } else {
565 sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
566 }
567
568 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
569 uploadChecksum(
570 checksumLocation.getLocation(),
571 sumsByAlgo.get(checksumLocation
572 .getChecksumAlgorithmFactory()
573 .getName()));
574 }
575 } catch (IOException e) {
576 LOGGER.warn("Failed to upload checksums for {}", file, e);
577 throw new UncheckedIOException(e);
578 }
579 }
580
581 private void uploadChecksum(URI location, Object checksum) {
582 try {
583 if (checksum instanceof Exception) {
584 throw (Exception) checksum;
585 }
586 transporter.put(new PutTask(location).setDataString((String) checksum));
587 } catch (Exception e) {
588 LOGGER.warn("Failed to upload checksum to {}", location, e);
589 }
590 }
591 }
592 }