1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.RepositorySystemSession;
34 import org.eclipse.aether.metadata.Metadata;
35 import org.eclipse.aether.repository.RemoteRepository;
36 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
37 import org.eclipse.aether.spi.connector.ArtifactDownload;
38 import org.eclipse.aether.spi.connector.ArtifactTransfer;
39 import org.eclipse.aether.spi.connector.ArtifactUpload;
40 import org.eclipse.aether.spi.connector.MetadataDownload;
41 import org.eclipse.aether.spi.connector.MetadataTransfer;
42 import org.eclipse.aether.spi.connector.MetadataUpload;
43 import org.eclipse.aether.spi.connector.RepositoryConnector;
44 import org.eclipse.aether.spi.connector.Transfer;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51 import org.eclipse.aether.spi.connector.transport.GetTask;
52 import org.eclipse.aether.spi.connector.transport.PeekTask;
53 import org.eclipse.aether.spi.connector.transport.PutTask;
54 import org.eclipse.aether.spi.connector.transport.Transporter;
55 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56 import org.eclipse.aether.spi.io.ChecksumProcessor;
57 import org.eclipse.aether.spi.io.PathProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.TransferEvent;
62 import org.eclipse.aether.transfer.TransferResource;
63 import org.eclipse.aether.util.ConfigUtils;
64 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65 import org.eclipse.aether.util.concurrency.SmartExecutor;
66 import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import static java.util.Objects.requireNonNull;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_INCLUDED_CHECKSUMS;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_INCLUDED_CHECKSUMS;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
81
82
83
84
85 final class BasicRepositoryConnector implements RepositoryConnector {
86 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
87
88 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
89
90 private final PathProcessor pathProcessor;
91
92 private final ChecksumProcessor checksumProcessor;
93
94 private final RemoteRepository repository;
95
96 private final RepositorySystemSession session;
97
98 private final Transporter transporter;
99
100 private final RepositoryLayout layout;
101
102 private final ChecksumPolicyProvider checksumPolicyProvider;
103
104 private final int maxDownstreamThreads;
105
106 private final int maxUpstreamThreads;
107
108 private final boolean includedChecksums;
109
110 private final boolean parallelPut;
111
112 private final boolean persistedChecksums;
113
114 private final ConcurrentHashMap<Boolean, SmartExecutor> executors;
115
116 private final AtomicBoolean closed;
117
118 @SuppressWarnings("checkstyle:parameternumber")
119 BasicRepositoryConnector(
120 RepositorySystemSession session,
121 RemoteRepository repository,
122 TransporterProvider transporterProvider,
123 RepositoryLayoutProvider layoutProvider,
124 ChecksumPolicyProvider checksumPolicyProvider,
125 PathProcessor pathProcessor,
126 ChecksumProcessor checksumProcessor,
127 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
128 throws NoRepositoryConnectorException {
129 try {
130 layout = layoutProvider.newRepositoryLayout(session, repository);
131 } catch (NoRepositoryLayoutException e) {
132 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
133 }
134 try {
135 transporter = transporterProvider.newTransporter(session, repository);
136 } catch (RuntimeException e) {
137 throw new NoRepositoryConnectorException(
138 repository, "Transporter configuration issue: " + e.getMessage(), e);
139 } catch (Exception e) {
140 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
141 }
142 this.checksumPolicyProvider = checksumPolicyProvider;
143
144 this.session = session;
145 this.repository = repository;
146 this.checksumProcessor = checksumProcessor;
147 this.pathProcessor = pathProcessor;
148 this.providedChecksumsSources = providedChecksumsSources;
149 this.executors = new ConcurrentHashMap<>();
150 this.closed = new AtomicBoolean(false);
151
152 maxUpstreamThreads = ConfigUtils.getInteger(
153 session,
154 DEFAULT_THREADS,
155 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
156 CONFIG_PROP_UPSTREAM_THREADS,
157 CONFIG_PROP_THREADS);
158 maxDownstreamThreads = ConfigUtils.getInteger(
159 session,
160 DEFAULT_THREADS,
161 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
162 CONFIG_PROP_DOWNSTREAM_THREADS,
163 CONFIG_PROP_THREADS);
164 includedChecksums = ConfigUtils.getBoolean(
165 session, DEFAULT_INCLUDED_CHECKSUMS, CONFIG_PROP_INCLUDED_CHECKSUMS, "aether.connector.smartChecksums");
166 parallelPut = ConfigUtils.getBoolean(
167 session,
168 DEFAULT_PARALLEL_PUT,
169 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
170 CONFIG_PROP_PARALLEL_PUT);
171 persistedChecksums =
172 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
173 }
174
175
176
177
178 private SmartExecutor getExecutor(boolean downstream, int tasks) {
179 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
180 if (maxThreads <= 1 || tasks <= 1) {
181
182 return null;
183 }
184
185
186 return executors.computeIfAbsent(
187 downstream,
188 k -> SmartExecutorUtils.smartExecutor(
189 session, null, maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
190 }
191
192 @Override
193 public void close() {
194 if (closed.compareAndSet(false, true)) {
195 for (SmartExecutor executor : executors.values()) {
196 executor.close();
197 }
198 transporter.close();
199 }
200 }
201
202 private void failIfClosed() {
203 if (closed.get()) {
204 throw new IllegalStateException("connector already closed");
205 }
206 }
207
208 @Override
209 public void get(
210 Collection<? extends ArtifactDownload> artifactDownloads,
211 Collection<? extends MetadataDownload> metadataDownloads) {
212 failIfClosed();
213
214 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
215 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
216
217 SmartExecutor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
218 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
219 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories(false);
220
221 boolean first = true;
222
223 for (MetadataDownload transfer : safeMetadataDownloads) {
224 URI location = layout.getLocation(transfer.getMetadata(), false);
225
226 TransferResource resource = newTransferResource(location, transfer);
227 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
228 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
229
230 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
231 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
232 if (checksumPolicy != null) {
233 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
234 }
235
236 Runnable task = new GetTaskRunner(
237 location,
238 transfer.getPath(),
239 checksumPolicy,
240 checksumAlgorithmFactories,
241 checksumLocations,
242 null,
243 listener);
244 if (executor == null || first) {
245 task.run();
246 first = false;
247 } else {
248 executor.submit(errorForwarder.wrap(task));
249 }
250 }
251
252 for (ArtifactDownload transfer : safeArtifactDownloads) {
253 Map<String, String> providedChecksums = Collections.emptyMap();
254 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
255 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
256 session, transfer, repository, checksumAlgorithmFactories);
257
258 if (provided != null) {
259 providedChecksums = provided;
260 break;
261 }
262 }
263
264 URI location = layout.getLocation(transfer.getArtifact(), false);
265
266 TransferResource resource = newTransferResource(location, transfer);
267 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
268 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
269
270 Runnable task;
271 if (transfer.isExistenceCheck()) {
272 task = new PeekTaskRunner(location, listener);
273 } else {
274 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
275 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
276 if (checksumPolicy != null) {
277 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
278 }
279
280 task = new GetTaskRunner(
281 location,
282 transfer.getPath(),
283 checksumPolicy,
284 checksumAlgorithmFactories,
285 checksumLocations,
286 providedChecksums,
287 listener);
288 }
289 if (executor == null || first) {
290 task.run();
291 first = false;
292 } else {
293 executor.submit(errorForwarder.wrap(task));
294 }
295 }
296
297 errorForwarder.await();
298 }
299
300 @Override
301 public void put(
302 Collection<? extends ArtifactUpload> artifactUploads,
303 Collection<? extends MetadataUpload> metadataUploads) {
304 failIfClosed();
305
306 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
307 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
308
309 SmartExecutor executor =
310 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
311 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
312
313 boolean first = true;
314
315 for (ArtifactUpload transfer : safeArtifactUploads) {
316 URI location = layout.getLocation(transfer.getArtifact(), true);
317
318 TransferResource resource = newTransferResource(location, transfer);
319 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
320 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
321
322 List<RepositoryLayout.ChecksumLocation> checksumLocations =
323 layout.getChecksumLocations(transfer.getArtifact(), true, location);
324
325 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
326 if (executor == null || first) {
327 task.run();
328 first = false;
329 } else {
330 executor.submit(errorForwarder.wrap(task));
331 }
332 }
333
334 errorForwarder.await();
335
336 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
337 for (MetadataUpload transfer : transferGroup) {
338 URI location = layout.getLocation(transfer.getMetadata(), true);
339
340 TransferResource resource = newTransferResource(location, transfer);
341 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
342 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
343
344 List<RepositoryLayout.ChecksumLocation> checksumLocations =
345 layout.getChecksumLocations(transfer.getMetadata(), true, location);
346
347 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
348 if (executor == null || first) {
349 task.run();
350 first = false;
351 } else {
352 executor.submit(errorForwarder.wrap(task));
353 }
354 }
355
356 errorForwarder.await();
357 }
358 }
359
360
361
362
363
364
365 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
366 ArrayList<MetadataUpload> v = new ArrayList<>();
367 ArrayList<MetadataUpload> a = new ArrayList<>();
368 ArrayList<MetadataUpload> g = new ArrayList<>();
369 ArrayList<MetadataUpload> r = new ArrayList<>();
370
371 for (MetadataUpload transfer : metadataUploads) {
372 Metadata metadata = transfer.getMetadata();
373 if (!"".equals(metadata.getVersion())) {
374 v.add(transfer);
375 } else if (!"".equals(metadata.getArtifactId())) {
376 a.add(transfer);
377 } else if (!"".equals(metadata.getGroupId())) {
378 g.add(transfer);
379 } else {
380 r.add(transfer);
381 }
382 }
383
384 List<List<MetadataUpload>> result = new ArrayList<>(4);
385 if (!v.isEmpty()) {
386 result.add(v);
387 }
388 if (!a.isEmpty()) {
389 result.add(a);
390 }
391 if (!g.isEmpty()) {
392 result.add(g);
393 }
394 if (!r.isEmpty()) {
395 result.add(r);
396 }
397 return result;
398 }
399
400 private static <T> Collection<T> safe(Collection<T> items) {
401 return (items != null) ? items : Collections.emptyList();
402 }
403
404 private TransferResource newTransferResource(URI path, Transfer transfer) {
405 if (transfer instanceof ArtifactTransfer) {
406 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
407 return new TransferResource(
408 repository.getId(),
409 repository.getUrl(),
410 path.toString(),
411 artifactTransfer.getPath(),
412 artifactTransfer.getArtifact(),
413 artifactTransfer.getTrace());
414 } else if (transfer instanceof MetadataTransfer) {
415 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
416 return new TransferResource(
417 repository.getId(),
418 repository.getUrl(),
419 path.toString(),
420 metadataTransfer.getPath(),
421 metadataTransfer.getMetadata(),
422 metadataTransfer.getTrace());
423 } else {
424 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
425 }
426 }
427
428 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
429 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
430 if (upload) {
431 builder.setRequestType(TransferEvent.RequestType.PUT);
432 } else if (!peek) {
433 builder.setRequestType(TransferEvent.RequestType.GET);
434 } else {
435 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
436 }
437 return builder;
438 }
439
440 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
441 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
442 }
443
444 @Override
445 public String toString() {
446 return BasicRepositoryConnectorFactory.NAME + "( " + repository + " )";
447 }
448
449 abstract class TaskRunner implements Runnable {
450
451 protected final URI path;
452
453 protected final TransferTransportListener<?> listener;
454
455 TaskRunner(URI path, TransferTransportListener<?> listener) {
456 this.path = path;
457 this.listener = listener;
458 }
459
460 @Override
461 public void run() {
462 try {
463 listener.transferInitiated();
464 runTask();
465 listener.transferSucceeded();
466 } catch (Exception e) {
467 listener.transferFailed(e, transporter.classify(e));
468 }
469 }
470
471 protected abstract void runTask() throws Exception;
472 }
473
474 class PeekTaskRunner extends TaskRunner {
475
476 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
477 super(path, listener);
478 }
479
480 @Override
481 protected void runTask() throws Exception {
482 transporter.peek(new PeekTask(path));
483 }
484 }
485
486 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
487
488 private final Path file;
489
490 private final ChecksumValidator checksumValidator;
491
492 GetTaskRunner(
493 URI path,
494 Path file,
495 ChecksumPolicy checksumPolicy,
496 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
497 List<RepositoryLayout.ChecksumLocation> checksumLocations,
498 Map<String, String> providedChecksums,
499 TransferTransportListener<?> listener) {
500 super(path, listener);
501 this.file = requireNonNull(file, "destination file cannot be null");
502 checksumValidator = new ChecksumValidator(
503 file,
504 checksumAlgorithmFactories,
505 pathProcessor,
506 checksumProcessor,
507 this,
508 checksumPolicy,
509 providedChecksums,
510 safe(checksumLocations));
511 }
512
513 @Override
514 public boolean fetchChecksum(URI remote, Path local) throws Exception {
515 try {
516 transporter.get(new GetTask(remote).setDataPath(local));
517 } catch (Exception e) {
518 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
519 return false;
520 }
521 throw e;
522 }
523 return true;
524 }
525
526 @Override
527 protected void runTask() throws Exception {
528 try (PathProcessor.CollocatedTempFile tempFile = pathProcessor.newTempFile(file)) {
529 final Path tmp = tempFile.getPath();
530 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
531 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
532 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
533 transporter.get(task);
534 try {
535 checksumValidator.validate(
536 listener.getChecksums(), includedChecksums ? task.getChecksums() : null);
537 break;
538 } catch (ChecksumFailureException e) {
539 boolean retry = trial < lastTrial && e.isRetryWorthy();
540 if (!retry && !checksumValidator.handle(e)) {
541 throw e;
542 }
543 listener.transferCorrupted(e);
544 if (retry) {
545 checksumValidator.retry();
546 } else {
547 break;
548 }
549 }
550 }
551 tempFile.move();
552 if (persistedChecksums) {
553 checksumValidator.commit();
554 }
555 }
556 }
557 }
558
559 class PutTaskRunner extends TaskRunner {
560
561 private final Path file;
562
563 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
564
565 PutTaskRunner(
566 URI path,
567 Path file,
568 List<RepositoryLayout.ChecksumLocation> checksumLocations,
569 TransferTransportListener<?> listener) {
570 super(path, listener);
571 this.file = requireNonNull(file, "source file cannot be null");
572 this.checksumLocations = safe(checksumLocations);
573 }
574
575 @SuppressWarnings("checkstyle:innerassignment")
576 @Override
577 protected void runTask() throws Exception {
578 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
579 uploadChecksums(file, null);
580 }
581
582
583
584
585
586 private void uploadChecksums(Path path, byte[] bytes) {
587 if (checksumLocations.isEmpty()) {
588 return;
589 }
590 try {
591 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
592 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
593 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
594 }
595
596 Map<String, String> sumsByAlgo;
597 if (bytes != null) {
598 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
599 } else {
600 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
601 }
602
603 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
604 uploadChecksum(
605 checksumLocation.getLocation(),
606 sumsByAlgo.get(checksumLocation
607 .getChecksumAlgorithmFactory()
608 .getName()));
609 }
610 } catch (IOException e) {
611 LOGGER.warn("Failed to upload checksums for {}", file, e);
612 throw new UncheckedIOException(e);
613 }
614 }
615
616 private void uploadChecksum(URI location, Object checksum) {
617 try {
618 if (checksum instanceof Exception) {
619 throw (Exception) checksum;
620 }
621 transporter.put(new PutTask(location).setDataString((String) checksum));
622 } catch (Exception e) {
623 LOGGER.warn("Failed to upload checksum to {}", location, e);
624 }
625 }
626 }
627 }