1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.RepositorySystemSession;
34 import org.eclipse.aether.metadata.Metadata;
35 import org.eclipse.aether.repository.RemoteRepository;
36 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
37 import org.eclipse.aether.spi.connector.ArtifactDownload;
38 import org.eclipse.aether.spi.connector.ArtifactTransfer;
39 import org.eclipse.aether.spi.connector.ArtifactUpload;
40 import org.eclipse.aether.spi.connector.MetadataDownload;
41 import org.eclipse.aether.spi.connector.MetadataTransfer;
42 import org.eclipse.aether.spi.connector.MetadataUpload;
43 import org.eclipse.aether.spi.connector.RepositoryConnector;
44 import org.eclipse.aether.spi.connector.Transfer;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51 import org.eclipse.aether.spi.connector.transport.GetTask;
52 import org.eclipse.aether.spi.connector.transport.PeekTask;
53 import org.eclipse.aether.spi.connector.transport.PutTask;
54 import org.eclipse.aether.spi.connector.transport.Transporter;
55 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56 import org.eclipse.aether.spi.io.ChecksumProcessor;
57 import org.eclipse.aether.transfer.ChecksumFailureException;
58 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
59 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
60 import org.eclipse.aether.transfer.TransferEvent;
61 import org.eclipse.aether.transfer.TransferResource;
62 import org.eclipse.aether.util.ConfigUtils;
63 import org.eclipse.aether.util.FileUtils;
64 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65 import org.eclipse.aether.util.concurrency.SmartExecutor;
66 import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import static java.util.Objects.requireNonNull;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
81
82
83
84
85 final class BasicRepositoryConnector implements RepositoryConnector {
86 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
87
88 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
89
90 private final ChecksumProcessor checksumProcessor;
91
92 private final RemoteRepository repository;
93
94 private final RepositorySystemSession session;
95
96 private final Transporter transporter;
97
98 private final RepositoryLayout layout;
99
100 private final ChecksumPolicyProvider checksumPolicyProvider;
101
102 private final int maxDownstreamThreads;
103
104 private final int maxUpstreamThreads;
105
106 private final boolean smartChecksums;
107
108 private final boolean parallelPut;
109
110 private final boolean persistedChecksums;
111
112 private final ConcurrentHashMap<Boolean, SmartExecutor> executors;
113
114 private final AtomicBoolean closed;
115
116 BasicRepositoryConnector(
117 RepositorySystemSession session,
118 RemoteRepository repository,
119 TransporterProvider transporterProvider,
120 RepositoryLayoutProvider layoutProvider,
121 ChecksumPolicyProvider checksumPolicyProvider,
122 ChecksumProcessor checksumProcessor,
123 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
124 throws NoRepositoryConnectorException {
125 try {
126 layout = layoutProvider.newRepositoryLayout(session, repository);
127 } catch (NoRepositoryLayoutException e) {
128 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
129 }
130 try {
131 transporter = transporterProvider.newTransporter(session, repository);
132 } catch (RuntimeException e) {
133 throw new NoRepositoryConnectorException(
134 repository, "Transporter configuration issue: " + e.getMessage(), e);
135 } catch (Exception e) {
136 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
137 }
138 this.checksumPolicyProvider = checksumPolicyProvider;
139
140 this.session = session;
141 this.repository = repository;
142 this.checksumProcessor = checksumProcessor;
143 this.providedChecksumsSources = providedChecksumsSources;
144 this.executors = new ConcurrentHashMap<>();
145 this.closed = new AtomicBoolean(false);
146
147 maxUpstreamThreads = ConfigUtils.getInteger(
148 session,
149 DEFAULT_THREADS,
150 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
151 CONFIG_PROP_UPSTREAM_THREADS,
152 CONFIG_PROP_THREADS);
153 maxDownstreamThreads = ConfigUtils.getInteger(
154 session,
155 DEFAULT_THREADS,
156 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
157 CONFIG_PROP_DOWNSTREAM_THREADS,
158 CONFIG_PROP_THREADS);
159 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
160 parallelPut = ConfigUtils.getBoolean(
161 session,
162 DEFAULT_PARALLEL_PUT,
163 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
164 CONFIG_PROP_PARALLEL_PUT);
165 persistedChecksums =
166 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
167 }
168
169
170
171
172 private SmartExecutor getExecutor(boolean downstream, int tasks) {
173 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
174 if (maxThreads <= 1 || tasks <= 1) {
175
176 return null;
177 }
178
179
180 return executors.computeIfAbsent(
181 downstream,
182 k -> SmartExecutorUtils.smartExecutor(
183 session, null, maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
184 }
185
186 @Override
187 public void close() {
188 if (closed.compareAndSet(false, true)) {
189 for (SmartExecutor executor : executors.values()) {
190 executor.close();
191 }
192 transporter.close();
193 }
194 }
195
196 private void failIfClosed() {
197 if (closed.get()) {
198 throw new IllegalStateException("connector already closed");
199 }
200 }
201
202 @Override
203 public void get(
204 Collection<? extends ArtifactDownload> artifactDownloads,
205 Collection<? extends MetadataDownload> metadataDownloads) {
206 failIfClosed();
207
208 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
209 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
210
211 SmartExecutor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
212 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
213 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
214
215 boolean first = true;
216
217 for (MetadataDownload transfer : safeMetadataDownloads) {
218 URI location = layout.getLocation(transfer.getMetadata(), false);
219
220 TransferResource resource = newTransferResource(location, transfer);
221 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
222 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
223
224 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
225 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
226 if (checksumPolicy != null) {
227 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
228 }
229
230 Runnable task = new GetTaskRunner(
231 location,
232 transfer.getPath(),
233 checksumPolicy,
234 checksumAlgorithmFactories,
235 checksumLocations,
236 null,
237 listener);
238 if (executor == null || first) {
239 task.run();
240 first = false;
241 } else {
242 executor.submit(errorForwarder.wrap(task));
243 }
244 }
245
246 for (ArtifactDownload transfer : safeArtifactDownloads) {
247 Map<String, String> providedChecksums = Collections.emptyMap();
248 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
249 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
250 session, transfer, repository, checksumAlgorithmFactories);
251
252 if (provided != null) {
253 providedChecksums = provided;
254 break;
255 }
256 }
257
258 URI location = layout.getLocation(transfer.getArtifact(), false);
259
260 TransferResource resource = newTransferResource(location, transfer);
261 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
262 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
263
264 Runnable task;
265 if (transfer.isExistenceCheck()) {
266 task = new PeekTaskRunner(location, listener);
267 } else {
268 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
269 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
270 if (checksumPolicy != null) {
271 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
272 }
273
274 task = new GetTaskRunner(
275 location,
276 transfer.getPath(),
277 checksumPolicy,
278 checksumAlgorithmFactories,
279 checksumLocations,
280 providedChecksums,
281 listener);
282 }
283 if (executor == null || first) {
284 task.run();
285 first = false;
286 } else {
287 executor.submit(errorForwarder.wrap(task));
288 }
289 }
290
291 errorForwarder.await();
292 }
293
294 @Override
295 public void put(
296 Collection<? extends ArtifactUpload> artifactUploads,
297 Collection<? extends MetadataUpload> metadataUploads) {
298 failIfClosed();
299
300 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
301 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
302
303 SmartExecutor executor =
304 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
305 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
306
307 boolean first = true;
308
309 for (ArtifactUpload transfer : safeArtifactUploads) {
310 URI location = layout.getLocation(transfer.getArtifact(), true);
311
312 TransferResource resource = newTransferResource(location, transfer);
313 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
314 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
315
316 List<RepositoryLayout.ChecksumLocation> checksumLocations =
317 layout.getChecksumLocations(transfer.getArtifact(), true, location);
318
319 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
320 if (executor == null || first) {
321 task.run();
322 first = false;
323 } else {
324 executor.submit(errorForwarder.wrap(task));
325 }
326 }
327
328 errorForwarder.await();
329
330 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
331 for (MetadataUpload transfer : transferGroup) {
332 URI location = layout.getLocation(transfer.getMetadata(), true);
333
334 TransferResource resource = newTransferResource(location, transfer);
335 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
336 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
337
338 List<RepositoryLayout.ChecksumLocation> checksumLocations =
339 layout.getChecksumLocations(transfer.getMetadata(), true, location);
340
341 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
342 if (executor == null || first) {
343 task.run();
344 first = false;
345 } else {
346 executor.submit(errorForwarder.wrap(task));
347 }
348 }
349
350 errorForwarder.await();
351 }
352 }
353
354
355
356
357
358
359 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
360 ArrayList<MetadataUpload> v = new ArrayList<>();
361 ArrayList<MetadataUpload> a = new ArrayList<>();
362 ArrayList<MetadataUpload> g = new ArrayList<>();
363 ArrayList<MetadataUpload> r = new ArrayList<>();
364
365 for (MetadataUpload transfer : metadataUploads) {
366 Metadata metadata = transfer.getMetadata();
367 if (!"".equals(metadata.getVersion())) {
368 v.add(transfer);
369 } else if (!"".equals(metadata.getArtifactId())) {
370 a.add(transfer);
371 } else if (!"".equals(metadata.getGroupId())) {
372 g.add(transfer);
373 } else {
374 r.add(transfer);
375 }
376 }
377
378 List<List<MetadataUpload>> result = new ArrayList<>(4);
379 if (!v.isEmpty()) {
380 result.add(v);
381 }
382 if (!a.isEmpty()) {
383 result.add(a);
384 }
385 if (!g.isEmpty()) {
386 result.add(g);
387 }
388 if (!r.isEmpty()) {
389 result.add(r);
390 }
391 return result;
392 }
393
394 private static <T> Collection<T> safe(Collection<T> items) {
395 return (items != null) ? items : Collections.emptyList();
396 }
397
398 private TransferResource newTransferResource(URI path, Transfer transfer) {
399 if (transfer instanceof ArtifactTransfer) {
400 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
401 return new TransferResource(
402 repository.getId(),
403 repository.getUrl(),
404 path.toString(),
405 artifactTransfer.getPath(),
406 artifactTransfer.getArtifact(),
407 artifactTransfer.getTrace());
408 } else if (transfer instanceof MetadataTransfer) {
409 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
410 return new TransferResource(
411 repository.getId(),
412 repository.getUrl(),
413 path.toString(),
414 metadataTransfer.getPath(),
415 metadataTransfer.getMetadata(),
416 metadataTransfer.getTrace());
417 } else {
418 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
419 }
420 }
421
422 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
423 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
424 if (upload) {
425 builder.setRequestType(TransferEvent.RequestType.PUT);
426 } else if (!peek) {
427 builder.setRequestType(TransferEvent.RequestType.GET);
428 } else {
429 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
430 }
431 return builder;
432 }
433
434 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
435 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
436 }
437
438 @Override
439 public String toString() {
440 return BasicRepositoryConnectorFactory.NAME + "( " + repository + " )";
441 }
442
443 abstract class TaskRunner implements Runnable {
444
445 protected final URI path;
446
447 protected final TransferTransportListener<?> listener;
448
449 TaskRunner(URI path, TransferTransportListener<?> listener) {
450 this.path = path;
451 this.listener = listener;
452 }
453
454 @Override
455 public void run() {
456 try {
457 listener.transferInitiated();
458 runTask();
459 listener.transferSucceeded();
460 } catch (Exception e) {
461 listener.transferFailed(e, transporter.classify(e));
462 }
463 }
464
465 protected abstract void runTask() throws Exception;
466 }
467
468 class PeekTaskRunner extends TaskRunner {
469
470 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
471 super(path, listener);
472 }
473
474 @Override
475 protected void runTask() throws Exception {
476 transporter.peek(new PeekTask(path));
477 }
478 }
479
480 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
481
482 private final Path file;
483
484 private final ChecksumValidator checksumValidator;
485
486 GetTaskRunner(
487 URI path,
488 Path file,
489 ChecksumPolicy checksumPolicy,
490 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
491 List<RepositoryLayout.ChecksumLocation> checksumLocations,
492 Map<String, String> providedChecksums,
493 TransferTransportListener<?> listener) {
494 super(path, listener);
495 this.file = requireNonNull(file, "destination file cannot be null");
496 checksumValidator = new ChecksumValidator(
497 file,
498 checksumAlgorithmFactories,
499 checksumProcessor,
500 this,
501 checksumPolicy,
502 providedChecksums,
503 safe(checksumLocations));
504 }
505
506 @Override
507 public boolean fetchChecksum(URI remote, Path local) throws Exception {
508 try {
509 transporter.get(new GetTask(remote).setDataPath(local));
510 } catch (Exception e) {
511 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
512 return false;
513 }
514 throw e;
515 }
516 return true;
517 }
518
519 @Override
520 protected void runTask() throws Exception {
521 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file)) {
522 final Path tmp = tempFile.getPath();
523 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
524 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
525 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
526 transporter.get(task);
527 try {
528 checksumValidator.validate(
529 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
530 break;
531 } catch (ChecksumFailureException e) {
532 boolean retry = trial < lastTrial && e.isRetryWorthy();
533 if (!retry && !checksumValidator.handle(e)) {
534 throw e;
535 }
536 listener.transferCorrupted(e);
537 if (retry) {
538 checksumValidator.retry();
539 } else {
540 break;
541 }
542 }
543 }
544 tempFile.move();
545 if (persistedChecksums) {
546 checksumValidator.commit();
547 }
548 }
549 }
550 }
551
552 class PutTaskRunner extends TaskRunner {
553
554 private final Path file;
555
556 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
557
558 PutTaskRunner(
559 URI path,
560 Path file,
561 List<RepositoryLayout.ChecksumLocation> checksumLocations,
562 TransferTransportListener<?> listener) {
563 super(path, listener);
564 this.file = requireNonNull(file, "source file cannot be null");
565 this.checksumLocations = safe(checksumLocations);
566 }
567
568 @SuppressWarnings("checkstyle:innerassignment")
569 @Override
570 protected void runTask() throws Exception {
571 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
572 uploadChecksums(file, null);
573 }
574
575
576
577
578
579 private void uploadChecksums(Path path, byte[] bytes) {
580 if (checksumLocations.isEmpty()) {
581 return;
582 }
583 try {
584 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
585 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
586 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
587 }
588
589 Map<String, String> sumsByAlgo;
590 if (bytes != null) {
591 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
592 } else {
593 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
594 }
595
596 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
597 uploadChecksum(
598 checksumLocation.getLocation(),
599 sumsByAlgo.get(checksumLocation
600 .getChecksumAlgorithmFactory()
601 .getName()));
602 }
603 } catch (IOException e) {
604 LOGGER.warn("Failed to upload checksums for {}", file, e);
605 throw new UncheckedIOException(e);
606 }
607 }
608
609 private void uploadChecksum(URI location, Object checksum) {
610 try {
611 if (checksum instanceof Exception) {
612 throw (Exception) checksum;
613 }
614 transporter.put(new PutTask(location).setDataString((String) checksum));
615 } catch (Exception e) {
616 LOGGER.warn("Failed to upload checksum to {}", location, e);
617 }
618 }
619 }
620 }