1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.RepositorySystemSession;
34 import org.eclipse.aether.metadata.Metadata;
35 import org.eclipse.aether.repository.RemoteRepository;
36 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
37 import org.eclipse.aether.spi.connector.ArtifactDownload;
38 import org.eclipse.aether.spi.connector.ArtifactTransfer;
39 import org.eclipse.aether.spi.connector.ArtifactUpload;
40 import org.eclipse.aether.spi.connector.MetadataDownload;
41 import org.eclipse.aether.spi.connector.MetadataTransfer;
42 import org.eclipse.aether.spi.connector.MetadataUpload;
43 import org.eclipse.aether.spi.connector.RepositoryConnector;
44 import org.eclipse.aether.spi.connector.Transfer;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51 import org.eclipse.aether.spi.connector.transport.GetTask;
52 import org.eclipse.aether.spi.connector.transport.PeekTask;
53 import org.eclipse.aether.spi.connector.transport.PutTask;
54 import org.eclipse.aether.spi.connector.transport.Transporter;
55 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56 import org.eclipse.aether.spi.io.ChecksumProcessor;
57 import org.eclipse.aether.spi.io.PathProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.TransferEvent;
62 import org.eclipse.aether.transfer.TransferResource;
63 import org.eclipse.aether.util.ConfigUtils;
64 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65 import org.eclipse.aether.util.concurrency.SmartExecutor;
66 import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import static java.util.Objects.requireNonNull;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
81
82
83
84
85 final class BasicRepositoryConnector implements RepositoryConnector {
86 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
87
88 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
89
90 private final PathProcessor pathProcessor;
91
92 private final ChecksumProcessor checksumProcessor;
93
94 private final RemoteRepository repository;
95
96 private final RepositorySystemSession session;
97
98 private final Transporter transporter;
99
100 private final RepositoryLayout layout;
101
102 private final ChecksumPolicyProvider checksumPolicyProvider;
103
104 private final int maxDownstreamThreads;
105
106 private final int maxUpstreamThreads;
107
108 private final boolean smartChecksums;
109
110 private final boolean parallelPut;
111
112 private final boolean persistedChecksums;
113
114 private final ConcurrentHashMap<Boolean, SmartExecutor> executors;
115
116 private final AtomicBoolean closed;
117
118 @SuppressWarnings("checkstyle:parameternumber")
119 BasicRepositoryConnector(
120 RepositorySystemSession session,
121 RemoteRepository repository,
122 TransporterProvider transporterProvider,
123 RepositoryLayoutProvider layoutProvider,
124 ChecksumPolicyProvider checksumPolicyProvider,
125 PathProcessor pathProcessor,
126 ChecksumProcessor checksumProcessor,
127 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
128 throws NoRepositoryConnectorException {
129 try {
130 layout = layoutProvider.newRepositoryLayout(session, repository);
131 } catch (NoRepositoryLayoutException e) {
132 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
133 }
134 try {
135 transporter = transporterProvider.newTransporter(session, repository);
136 } catch (RuntimeException e) {
137 throw new NoRepositoryConnectorException(
138 repository, "Transporter configuration issue: " + e.getMessage(), e);
139 } catch (Exception e) {
140 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
141 }
142 this.checksumPolicyProvider = checksumPolicyProvider;
143
144 this.session = session;
145 this.repository = repository;
146 this.checksumProcessor = checksumProcessor;
147 this.pathProcessor = pathProcessor;
148 this.providedChecksumsSources = providedChecksumsSources;
149 this.executors = new ConcurrentHashMap<>();
150 this.closed = new AtomicBoolean(false);
151
152 maxUpstreamThreads = ConfigUtils.getInteger(
153 session,
154 DEFAULT_THREADS,
155 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
156 CONFIG_PROP_UPSTREAM_THREADS,
157 CONFIG_PROP_THREADS);
158 maxDownstreamThreads = ConfigUtils.getInteger(
159 session,
160 DEFAULT_THREADS,
161 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
162 CONFIG_PROP_DOWNSTREAM_THREADS,
163 CONFIG_PROP_THREADS);
164 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
165 parallelPut = ConfigUtils.getBoolean(
166 session,
167 DEFAULT_PARALLEL_PUT,
168 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
169 CONFIG_PROP_PARALLEL_PUT);
170 persistedChecksums =
171 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
172 }
173
174
175
176
177 private SmartExecutor getExecutor(boolean downstream, int tasks) {
178 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
179 if (maxThreads <= 1 || tasks <= 1) {
180
181 return null;
182 }
183
184
185 return executors.computeIfAbsent(
186 downstream,
187 k -> SmartExecutorUtils.smartExecutor(
188 session, null, maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
189 }
190
191 @Override
192 public void close() {
193 if (closed.compareAndSet(false, true)) {
194 for (SmartExecutor executor : executors.values()) {
195 executor.close();
196 }
197 transporter.close();
198 }
199 }
200
201 private void failIfClosed() {
202 if (closed.get()) {
203 throw new IllegalStateException("connector already closed");
204 }
205 }
206
207 @Override
208 public void get(
209 Collection<? extends ArtifactDownload> artifactDownloads,
210 Collection<? extends MetadataDownload> metadataDownloads) {
211 failIfClosed();
212
213 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
214 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
215
216 SmartExecutor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
217 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
219
220 boolean first = true;
221
222 for (MetadataDownload transfer : safeMetadataDownloads) {
223 URI location = layout.getLocation(transfer.getMetadata(), false);
224
225 TransferResource resource = newTransferResource(location, transfer);
226 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
227 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
228
229 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
230 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
231 if (checksumPolicy != null) {
232 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
233 }
234
235 Runnable task = new GetTaskRunner(
236 location,
237 transfer.getPath(),
238 checksumPolicy,
239 checksumAlgorithmFactories,
240 checksumLocations,
241 null,
242 listener);
243 if (executor == null || first) {
244 task.run();
245 first = false;
246 } else {
247 executor.submit(errorForwarder.wrap(task));
248 }
249 }
250
251 for (ArtifactDownload transfer : safeArtifactDownloads) {
252 Map<String, String> providedChecksums = Collections.emptyMap();
253 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
254 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
255 session, transfer, repository, checksumAlgorithmFactories);
256
257 if (provided != null) {
258 providedChecksums = provided;
259 break;
260 }
261 }
262
263 URI location = layout.getLocation(transfer.getArtifact(), false);
264
265 TransferResource resource = newTransferResource(location, transfer);
266 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
267 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
268
269 Runnable task;
270 if (transfer.isExistenceCheck()) {
271 task = new PeekTaskRunner(location, listener);
272 } else {
273 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
274 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
275 if (checksumPolicy != null) {
276 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
277 }
278
279 task = new GetTaskRunner(
280 location,
281 transfer.getPath(),
282 checksumPolicy,
283 checksumAlgorithmFactories,
284 checksumLocations,
285 providedChecksums,
286 listener);
287 }
288 if (executor == null || first) {
289 task.run();
290 first = false;
291 } else {
292 executor.submit(errorForwarder.wrap(task));
293 }
294 }
295
296 errorForwarder.await();
297 }
298
299 @Override
300 public void put(
301 Collection<? extends ArtifactUpload> artifactUploads,
302 Collection<? extends MetadataUpload> metadataUploads) {
303 failIfClosed();
304
305 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
306 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
307
308 SmartExecutor executor =
309 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
310 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
311
312 boolean first = true;
313
314 for (ArtifactUpload transfer : safeArtifactUploads) {
315 URI location = layout.getLocation(transfer.getArtifact(), true);
316
317 TransferResource resource = newTransferResource(location, transfer);
318 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
319 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
320
321 List<RepositoryLayout.ChecksumLocation> checksumLocations =
322 layout.getChecksumLocations(transfer.getArtifact(), true, location);
323
324 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
325 if (executor == null || first) {
326 task.run();
327 first = false;
328 } else {
329 executor.submit(errorForwarder.wrap(task));
330 }
331 }
332
333 errorForwarder.await();
334
335 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
336 for (MetadataUpload transfer : transferGroup) {
337 URI location = layout.getLocation(transfer.getMetadata(), true);
338
339 TransferResource resource = newTransferResource(location, transfer);
340 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
341 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
342
343 List<RepositoryLayout.ChecksumLocation> checksumLocations =
344 layout.getChecksumLocations(transfer.getMetadata(), true, location);
345
346 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
347 if (executor == null || first) {
348 task.run();
349 first = false;
350 } else {
351 executor.submit(errorForwarder.wrap(task));
352 }
353 }
354
355 errorForwarder.await();
356 }
357 }
358
359
360
361
362
363
364 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
365 ArrayList<MetadataUpload> v = new ArrayList<>();
366 ArrayList<MetadataUpload> a = new ArrayList<>();
367 ArrayList<MetadataUpload> g = new ArrayList<>();
368 ArrayList<MetadataUpload> r = new ArrayList<>();
369
370 for (MetadataUpload transfer : metadataUploads) {
371 Metadata metadata = transfer.getMetadata();
372 if (!"".equals(metadata.getVersion())) {
373 v.add(transfer);
374 } else if (!"".equals(metadata.getArtifactId())) {
375 a.add(transfer);
376 } else if (!"".equals(metadata.getGroupId())) {
377 g.add(transfer);
378 } else {
379 r.add(transfer);
380 }
381 }
382
383 List<List<MetadataUpload>> result = new ArrayList<>(4);
384 if (!v.isEmpty()) {
385 result.add(v);
386 }
387 if (!a.isEmpty()) {
388 result.add(a);
389 }
390 if (!g.isEmpty()) {
391 result.add(g);
392 }
393 if (!r.isEmpty()) {
394 result.add(r);
395 }
396 return result;
397 }
398
399 private static <T> Collection<T> safe(Collection<T> items) {
400 return (items != null) ? items : Collections.emptyList();
401 }
402
403 private TransferResource newTransferResource(URI path, Transfer transfer) {
404 if (transfer instanceof ArtifactTransfer) {
405 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
406 return new TransferResource(
407 repository.getId(),
408 repository.getUrl(),
409 path.toString(),
410 artifactTransfer.getPath(),
411 artifactTransfer.getArtifact(),
412 artifactTransfer.getTrace());
413 } else if (transfer instanceof MetadataTransfer) {
414 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
415 return new TransferResource(
416 repository.getId(),
417 repository.getUrl(),
418 path.toString(),
419 metadataTransfer.getPath(),
420 metadataTransfer.getMetadata(),
421 metadataTransfer.getTrace());
422 } else {
423 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
424 }
425 }
426
427 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
428 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
429 if (upload) {
430 builder.setRequestType(TransferEvent.RequestType.PUT);
431 } else if (!peek) {
432 builder.setRequestType(TransferEvent.RequestType.GET);
433 } else {
434 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
435 }
436 return builder;
437 }
438
439 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
440 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
441 }
442
443 @Override
444 public String toString() {
445 return BasicRepositoryConnectorFactory.NAME + "( " + repository + " )";
446 }
447
448 abstract class TaskRunner implements Runnable {
449
450 protected final URI path;
451
452 protected final TransferTransportListener<?> listener;
453
454 TaskRunner(URI path, TransferTransportListener<?> listener) {
455 this.path = path;
456 this.listener = listener;
457 }
458
459 @Override
460 public void run() {
461 try {
462 listener.transferInitiated();
463 runTask();
464 listener.transferSucceeded();
465 } catch (Exception e) {
466 listener.transferFailed(e, transporter.classify(e));
467 }
468 }
469
470 protected abstract void runTask() throws Exception;
471 }
472
473 class PeekTaskRunner extends TaskRunner {
474
475 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
476 super(path, listener);
477 }
478
479 @Override
480 protected void runTask() throws Exception {
481 transporter.peek(new PeekTask(path));
482 }
483 }
484
485 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
486
487 private final Path file;
488
489 private final ChecksumValidator checksumValidator;
490
491 GetTaskRunner(
492 URI path,
493 Path file,
494 ChecksumPolicy checksumPolicy,
495 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
496 List<RepositoryLayout.ChecksumLocation> checksumLocations,
497 Map<String, String> providedChecksums,
498 TransferTransportListener<?> listener) {
499 super(path, listener);
500 this.file = requireNonNull(file, "destination file cannot be null");
501 checksumValidator = new ChecksumValidator(
502 file,
503 checksumAlgorithmFactories,
504 pathProcessor,
505 checksumProcessor,
506 this,
507 checksumPolicy,
508 providedChecksums,
509 safe(checksumLocations));
510 }
511
512 @Override
513 public boolean fetchChecksum(URI remote, Path local) throws Exception {
514 try {
515 transporter.get(new GetTask(remote).setDataPath(local));
516 } catch (Exception e) {
517 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
518 return false;
519 }
520 throw e;
521 }
522 return true;
523 }
524
525 @Override
526 protected void runTask() throws Exception {
527 try (PathProcessor.CollocatedTempFile tempFile = pathProcessor.newTempFile(file)) {
528 final Path tmp = tempFile.getPath();
529 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
530 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
531 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
532 transporter.get(task);
533 try {
534 checksumValidator.validate(
535 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
536 break;
537 } catch (ChecksumFailureException e) {
538 boolean retry = trial < lastTrial && e.isRetryWorthy();
539 if (!retry && !checksumValidator.handle(e)) {
540 throw e;
541 }
542 listener.transferCorrupted(e);
543 if (retry) {
544 checksumValidator.retry();
545 } else {
546 break;
547 }
548 }
549 }
550 tempFile.move();
551 if (persistedChecksums) {
552 checksumValidator.commit();
553 }
554 }
555 }
556 }
557
558 class PutTaskRunner extends TaskRunner {
559
560 private final Path file;
561
562 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
563
564 PutTaskRunner(
565 URI path,
566 Path file,
567 List<RepositoryLayout.ChecksumLocation> checksumLocations,
568 TransferTransportListener<?> listener) {
569 super(path, listener);
570 this.file = requireNonNull(file, "source file cannot be null");
571 this.checksumLocations = safe(checksumLocations);
572 }
573
574 @SuppressWarnings("checkstyle:innerassignment")
575 @Override
576 protected void runTask() throws Exception {
577 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
578 uploadChecksums(file, null);
579 }
580
581
582
583
584
585 private void uploadChecksums(Path path, byte[] bytes) {
586 if (checksumLocations.isEmpty()) {
587 return;
588 }
589 try {
590 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
591 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
592 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
593 }
594
595 Map<String, String> sumsByAlgo;
596 if (bytes != null) {
597 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
598 } else {
599 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
600 }
601
602 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
603 uploadChecksum(
604 checksumLocation.getLocation(),
605 sumsByAlgo.get(checksumLocation
606 .getChecksumAlgorithmFactory()
607 .getName()));
608 }
609 } catch (IOException e) {
610 LOGGER.warn("Failed to upload checksums for {}", file, e);
611 throw new UncheckedIOException(e);
612 }
613 }
614
615 private void uploadChecksum(URI location, Object checksum) {
616 try {
617 if (checksum instanceof Exception) {
618 throw (Exception) checksum;
619 }
620 transporter.put(new PutTask(location).setDataString((String) checksum));
621 } catch (Exception e) {
622 LOGGER.warn("Failed to upload checksum to {}", location, e);
623 }
624 }
625 }
626 }