1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.RepositorySystemSession;
34 import org.eclipse.aether.metadata.Metadata;
35 import org.eclipse.aether.repository.RemoteRepository;
36 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
37 import org.eclipse.aether.spi.connector.ArtifactDownload;
38 import org.eclipse.aether.spi.connector.ArtifactTransfer;
39 import org.eclipse.aether.spi.connector.ArtifactUpload;
40 import org.eclipse.aether.spi.connector.MetadataDownload;
41 import org.eclipse.aether.spi.connector.MetadataTransfer;
42 import org.eclipse.aether.spi.connector.MetadataUpload;
43 import org.eclipse.aether.spi.connector.RepositoryConnector;
44 import org.eclipse.aether.spi.connector.Transfer;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51 import org.eclipse.aether.spi.connector.transport.GetTask;
52 import org.eclipse.aether.spi.connector.transport.PeekTask;
53 import org.eclipse.aether.spi.connector.transport.PutTask;
54 import org.eclipse.aether.spi.connector.transport.Transporter;
55 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56 import org.eclipse.aether.spi.io.ChecksumProcessor;
57 import org.eclipse.aether.spi.io.PathProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.TransferEvent;
62 import org.eclipse.aether.transfer.TransferResource;
63 import org.eclipse.aether.util.ConfigUtils;
64 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65 import org.eclipse.aether.util.concurrency.SmartExecutor;
66 import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import static java.util.Objects.requireNonNull;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
81
82
83
84
85 final class BasicRepositoryConnector implements RepositoryConnector {
86 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
87
88 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
89
90 private final PathProcessor pathProcessor;
91
92 private final ChecksumProcessor checksumProcessor;
93
94 private final RemoteRepository repository;
95
96 private final RepositorySystemSession session;
97
98 private final Transporter transporter;
99
100 private final RepositoryLayout layout;
101
102 private final ChecksumPolicyProvider checksumPolicyProvider;
103
104 private final int maxDownstreamThreads;
105
106 private final int maxUpstreamThreads;
107
108 private final boolean smartChecksums;
109
110 private final boolean parallelPut;
111
112 private final boolean persistedChecksums;
113
114 private final ConcurrentHashMap<Boolean, SmartExecutor> executors;
115
116 private final AtomicBoolean closed;
117
118 @SuppressWarnings("checkstyle:parameternumber")
119 BasicRepositoryConnector(
120 RepositorySystemSession session,
121 RemoteRepository repository,
122 TransporterProvider transporterProvider,
123 RepositoryLayoutProvider layoutProvider,
124 ChecksumPolicyProvider checksumPolicyProvider,
125 PathProcessor pathProcessor,
126 ChecksumProcessor checksumProcessor,
127 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
128 throws NoRepositoryConnectorException {
129 try {
130 layout = layoutProvider.newRepositoryLayout(session, repository);
131 } catch (NoRepositoryLayoutException e) {
132 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
133 }
134 try {
135 transporter = transporterProvider.newTransporter(session, repository);
136 } catch (RuntimeException e) {
137 throw new NoRepositoryConnectorException(
138 repository, "Transporter configuration issue: " + e.getMessage(), e);
139 } catch (Exception e) {
140 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
141 }
142 this.checksumPolicyProvider = checksumPolicyProvider;
143
144 this.session = session;
145 this.repository = repository;
146 this.checksumProcessor = checksumProcessor;
147 this.pathProcessor = pathProcessor;
148 this.providedChecksumsSources = providedChecksumsSources;
149 this.executors = new ConcurrentHashMap<>();
150 this.closed = new AtomicBoolean(false);
151
152 maxUpstreamThreads = ConfigUtils.getInteger(
153 session,
154 DEFAULT_THREADS,
155 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
156 CONFIG_PROP_UPSTREAM_THREADS,
157 CONFIG_PROP_THREADS);
158 maxDownstreamThreads = ConfigUtils.getInteger(
159 session,
160 DEFAULT_THREADS,
161 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
162 CONFIG_PROP_DOWNSTREAM_THREADS,
163 CONFIG_PROP_THREADS);
164 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
165 parallelPut = ConfigUtils.getBoolean(
166 session,
167 DEFAULT_PARALLEL_PUT,
168 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
169 CONFIG_PROP_PARALLEL_PUT);
170 persistedChecksums =
171 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
172 }
173
174
175
176
177 private SmartExecutor getExecutor(boolean downstream, int tasks) {
178 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
179 if (maxThreads <= 1 || tasks <= 1) {
180
181 return null;
182 }
183
184
185 return executors.computeIfAbsent(
186 downstream,
187 k -> SmartExecutorUtils.smartExecutor(
188 session, null, maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
189 }
190
191 @Override
192 public void close() {
193 if (closed.compareAndSet(false, true)) {
194 for (SmartExecutor executor : executors.values()) {
195 executor.close();
196 }
197 transporter.close();
198 }
199 }
200
201 private void failIfClosed() {
202 if (closed.get()) {
203 throw new IllegalStateException("connector already closed");
204 }
205 }
206
207 @Override
208 public void get(
209 Collection<? extends ArtifactDownload> artifactDownloads,
210 Collection<? extends MetadataDownload> metadataDownloads) {
211 failIfClosed();
212
213 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
214 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
215
216 SmartExecutor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
217 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
219
220 for (MetadataDownload transfer : safeMetadataDownloads) {
221 URI location = layout.getLocation(transfer.getMetadata(), false);
222
223 TransferResource resource = newTransferResource(location, transfer);
224 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
225 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
226
227 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
228 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
229 if (checksumPolicy != null) {
230 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
231 }
232
233 Runnable task = new GetTaskRunner(
234 location,
235 transfer.getPath(),
236 checksumPolicy,
237 checksumAlgorithmFactories,
238 checksumLocations,
239 null,
240 listener);
241 if (executor == null) {
242 task.run();
243 } else {
244 executor.submit(errorForwarder.wrap(task));
245 }
246 }
247
248 for (ArtifactDownload transfer : safeArtifactDownloads) {
249 Map<String, String> providedChecksums = Collections.emptyMap();
250 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
251 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
252 session, transfer, repository, checksumAlgorithmFactories);
253
254 if (provided != null) {
255 providedChecksums = provided;
256 break;
257 }
258 }
259
260 URI location = layout.getLocation(transfer.getArtifact(), false);
261
262 TransferResource resource = newTransferResource(location, transfer);
263 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
264 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
265
266 Runnable task;
267 if (transfer.isExistenceCheck()) {
268 task = new PeekTaskRunner(location, listener);
269 } else {
270 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
271 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
272 if (checksumPolicy != null) {
273 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
274 }
275
276 task = new GetTaskRunner(
277 location,
278 transfer.getPath(),
279 checksumPolicy,
280 checksumAlgorithmFactories,
281 checksumLocations,
282 providedChecksums,
283 listener);
284 }
285 if (executor == null) {
286 task.run();
287 } else {
288 executor.submit(errorForwarder.wrap(task));
289 }
290 }
291
292 errorForwarder.await();
293 }
294
295 @Override
296 public void put(
297 Collection<? extends ArtifactUpload> artifactUploads,
298 Collection<? extends MetadataUpload> metadataUploads) {
299 failIfClosed();
300
301 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
302 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
303
304 SmartExecutor executor =
305 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
306 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
307
308 for (ArtifactUpload transfer : safeArtifactUploads) {
309 URI location = layout.getLocation(transfer.getArtifact(), true);
310
311 TransferResource resource = newTransferResource(location, transfer);
312 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
313 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
314
315 List<RepositoryLayout.ChecksumLocation> checksumLocations =
316 layout.getChecksumLocations(transfer.getArtifact(), true, location);
317
318 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
319 if (executor == null) {
320 task.run();
321 } else {
322 executor.submit(errorForwarder.wrap(task));
323 }
324 }
325
326 errorForwarder.await();
327
328 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
329 for (MetadataUpload transfer : transferGroup) {
330 URI location = layout.getLocation(transfer.getMetadata(), true);
331
332 TransferResource resource = newTransferResource(location, transfer);
333 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
334 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
335
336 List<RepositoryLayout.ChecksumLocation> checksumLocations =
337 layout.getChecksumLocations(transfer.getMetadata(), true, location);
338
339 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
340 if (executor == null) {
341 task.run();
342 } else {
343 executor.submit(errorForwarder.wrap(task));
344 }
345 }
346
347 errorForwarder.await();
348 }
349 }
350
351
352
353
354
355
356 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
357 ArrayList<MetadataUpload> v = new ArrayList<>();
358 ArrayList<MetadataUpload> a = new ArrayList<>();
359 ArrayList<MetadataUpload> g = new ArrayList<>();
360 ArrayList<MetadataUpload> r = new ArrayList<>();
361
362 for (MetadataUpload transfer : metadataUploads) {
363 Metadata metadata = transfer.getMetadata();
364 if (!"".equals(metadata.getVersion())) {
365 v.add(transfer);
366 } else if (!"".equals(metadata.getArtifactId())) {
367 a.add(transfer);
368 } else if (!"".equals(metadata.getGroupId())) {
369 g.add(transfer);
370 } else {
371 r.add(transfer);
372 }
373 }
374
375 List<List<MetadataUpload>> result = new ArrayList<>(4);
376 if (!v.isEmpty()) {
377 result.add(v);
378 }
379 if (!a.isEmpty()) {
380 result.add(a);
381 }
382 if (!g.isEmpty()) {
383 result.add(g);
384 }
385 if (!r.isEmpty()) {
386 result.add(r);
387 }
388 return result;
389 }
390
391 private static <T> Collection<T> safe(Collection<T> items) {
392 return (items != null) ? items : Collections.emptyList();
393 }
394
395 private TransferResource newTransferResource(URI path, Transfer transfer) {
396 if (transfer instanceof ArtifactTransfer) {
397 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
398 return new TransferResource(
399 repository.getId(),
400 repository.getUrl(),
401 path.toString(),
402 artifactTransfer.getPath(),
403 artifactTransfer.getArtifact(),
404 artifactTransfer.getTrace());
405 } else if (transfer instanceof MetadataTransfer) {
406 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
407 return new TransferResource(
408 repository.getId(),
409 repository.getUrl(),
410 path.toString(),
411 metadataTransfer.getPath(),
412 metadataTransfer.getMetadata(),
413 metadataTransfer.getTrace());
414 } else {
415 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
416 }
417 }
418
419 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
420 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
421 if (upload) {
422 builder.setRequestType(TransferEvent.RequestType.PUT);
423 } else if (!peek) {
424 builder.setRequestType(TransferEvent.RequestType.GET);
425 } else {
426 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
427 }
428 return builder;
429 }
430
431 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
432 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
433 }
434
435 @Override
436 public String toString() {
437 return BasicRepositoryConnectorFactory.NAME + "( " + repository + " )";
438 }
439
440 abstract class TaskRunner implements Runnable {
441
442 protected final URI path;
443
444 protected final TransferTransportListener<?> listener;
445
446 TaskRunner(URI path, TransferTransportListener<?> listener) {
447 this.path = path;
448 this.listener = listener;
449 }
450
451 @Override
452 public void run() {
453 try {
454 listener.transferInitiated();
455 runTask();
456 listener.transferSucceeded();
457 } catch (Exception e) {
458 listener.transferFailed(e, transporter.classify(e));
459 }
460 }
461
462 protected abstract void runTask() throws Exception;
463 }
464
465 class PeekTaskRunner extends TaskRunner {
466
467 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
468 super(path, listener);
469 }
470
471 @Override
472 protected void runTask() throws Exception {
473 transporter.peek(new PeekTask(path));
474 }
475 }
476
477 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
478
479 private final Path file;
480
481 private final ChecksumValidator checksumValidator;
482
483 GetTaskRunner(
484 URI path,
485 Path file,
486 ChecksumPolicy checksumPolicy,
487 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
488 List<RepositoryLayout.ChecksumLocation> checksumLocations,
489 Map<String, String> providedChecksums,
490 TransferTransportListener<?> listener) {
491 super(path, listener);
492 this.file = requireNonNull(file, "destination file cannot be null");
493 checksumValidator = new ChecksumValidator(
494 file,
495 checksumAlgorithmFactories,
496 pathProcessor,
497 checksumProcessor,
498 this,
499 checksumPolicy,
500 providedChecksums,
501 safe(checksumLocations));
502 }
503
504 @Override
505 public boolean fetchChecksum(URI remote, Path local) throws Exception {
506 try {
507 transporter.get(new GetTask(remote).setDataPath(local));
508 } catch (Exception e) {
509 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
510 return false;
511 }
512 throw e;
513 }
514 return true;
515 }
516
517 @Override
518 protected void runTask() throws Exception {
519 try (PathProcessor.CollocatedTempFile tempFile = pathProcessor.newTempFile(file)) {
520 final Path tmp = tempFile.getPath();
521 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
522 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
523 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
524 transporter.get(task);
525 try {
526 checksumValidator.validate(
527 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
528 break;
529 } catch (ChecksumFailureException e) {
530 boolean retry = trial < lastTrial && e.isRetryWorthy();
531 if (!retry && !checksumValidator.handle(e)) {
532 throw e;
533 }
534 listener.transferCorrupted(e);
535 if (retry) {
536 checksumValidator.retry();
537 } else {
538 break;
539 }
540 }
541 }
542 tempFile.move();
543 if (persistedChecksums) {
544 checksumValidator.commit();
545 }
546 }
547 }
548 }
549
550 class PutTaskRunner extends TaskRunner {
551
552 private final Path file;
553
554 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
555
556 PutTaskRunner(
557 URI path,
558 Path file,
559 List<RepositoryLayout.ChecksumLocation> checksumLocations,
560 TransferTransportListener<?> listener) {
561 super(path, listener);
562 this.file = requireNonNull(file, "source file cannot be null");
563 this.checksumLocations = safe(checksumLocations);
564 }
565
566 @SuppressWarnings("checkstyle:innerassignment")
567 @Override
568 protected void runTask() throws Exception {
569 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
570 uploadChecksums(file, null);
571 }
572
573
574
575
576
577 private void uploadChecksums(Path path, byte[] bytes) {
578 if (checksumLocations.isEmpty()) {
579 return;
580 }
581 try {
582 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
583 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
584 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
585 }
586
587 Map<String, String> sumsByAlgo;
588 if (bytes != null) {
589 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
590 } else {
591 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
592 }
593
594 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
595 uploadChecksum(
596 checksumLocation.getLocation(),
597 sumsByAlgo.get(checksumLocation
598 .getChecksumAlgorithmFactory()
599 .getName()));
600 }
601 } catch (IOException e) {
602 LOGGER.warn("Failed to upload checksums for {}", file, e);
603 throw new UncheckedIOException(e);
604 }
605 }
606
607 private void uploadChecksum(URI location, Object checksum) {
608 try {
609 if (checksum instanceof Exception) {
610 throw (Exception) checksum;
611 }
612 transporter.put(new PutTask(location).setDataString((String) checksum));
613 } catch (Exception e) {
614 LOGGER.warn("Failed to upload checksum to {}", location, e);
615 }
616 }
617 }
618 }