1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.eclipse.aether.RepositorySystemSession;
35 import org.eclipse.aether.metadata.Metadata;
36 import org.eclipse.aether.repository.RemoteRepository;
37 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
38 import org.eclipse.aether.spi.connector.ArtifactDownload;
39 import org.eclipse.aether.spi.connector.ArtifactTransfer;
40 import org.eclipse.aether.spi.connector.ArtifactUpload;
41 import org.eclipse.aether.spi.connector.MetadataDownload;
42 import org.eclipse.aether.spi.connector.MetadataTransfer;
43 import org.eclipse.aether.spi.connector.MetadataUpload;
44 import org.eclipse.aether.spi.connector.RepositoryConnector;
45 import org.eclipse.aether.spi.connector.Transfer;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
49 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52 import org.eclipse.aether.spi.connector.transport.GetTask;
53 import org.eclipse.aether.spi.connector.transport.PeekTask;
54 import org.eclipse.aether.spi.connector.transport.PutTask;
55 import org.eclipse.aether.spi.connector.transport.Transporter;
56 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57 import org.eclipse.aether.spi.io.ChecksumProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.TransferEvent;
62 import org.eclipse.aether.transfer.TransferResource;
63 import org.eclipse.aether.util.ConfigUtils;
64 import org.eclipse.aether.util.FileUtils;
65 import org.eclipse.aether.util.concurrency.ExecutorUtils;
66 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import static java.util.Objects.requireNonNull;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
81
82
83
84
85 final class BasicRepositoryConnector implements RepositoryConnector {
86 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
87
88 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
89
90 private final ChecksumProcessor checksumProcessor;
91
92 private final RemoteRepository repository;
93
94 private final RepositorySystemSession session;
95
96 private final Transporter transporter;
97
98 private final RepositoryLayout layout;
99
100 private final ChecksumPolicyProvider checksumPolicyProvider;
101
102 private final int maxDownstreamThreads;
103
104 private final int maxUpstreamThreads;
105
106 private final boolean smartChecksums;
107
108 private final boolean parallelPut;
109
110 private final boolean persistedChecksums;
111
112 private final ConcurrentHashMap<Boolean, Executor> executors;
113
114 private final AtomicBoolean closed;
115
116 BasicRepositoryConnector(
117 RepositorySystemSession session,
118 RemoteRepository repository,
119 TransporterProvider transporterProvider,
120 RepositoryLayoutProvider layoutProvider,
121 ChecksumPolicyProvider checksumPolicyProvider,
122 ChecksumProcessor checksumProcessor,
123 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
124 throws NoRepositoryConnectorException {
125 try {
126 layout = layoutProvider.newRepositoryLayout(session, repository);
127 } catch (NoRepositoryLayoutException e) {
128 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
129 }
130 try {
131 transporter = transporterProvider.newTransporter(session, repository);
132 } catch (RuntimeException e) {
133 throw new NoRepositoryConnectorException(
134 repository, "Transporter configuration issue: " + e.getMessage(), e);
135 } catch (Exception e) {
136 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
137 }
138 this.checksumPolicyProvider = checksumPolicyProvider;
139
140 this.session = session;
141 this.repository = repository;
142 this.checksumProcessor = checksumProcessor;
143 this.providedChecksumsSources = providedChecksumsSources;
144 this.executors = new ConcurrentHashMap<>();
145 this.closed = new AtomicBoolean(false);
146
147 maxUpstreamThreads = ExecutorUtils.threadCount(
148 session,
149 DEFAULT_THREADS,
150 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
151 CONFIG_PROP_UPSTREAM_THREADS,
152 CONFIG_PROP_THREADS);
153 maxDownstreamThreads = ExecutorUtils.threadCount(
154 session,
155 DEFAULT_THREADS,
156 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
157 CONFIG_PROP_DOWNSTREAM_THREADS,
158 CONFIG_PROP_THREADS);
159 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
160 parallelPut = ConfigUtils.getBoolean(
161 session,
162 DEFAULT_PARALLEL_PUT,
163 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
164 CONFIG_PROP_PARALLEL_PUT);
165 persistedChecksums =
166 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
167 }
168
169 private Executor getExecutor(boolean downstream, int tasks) {
170 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
171 if (maxThreads <= 1) {
172 return ExecutorUtils.DIRECT_EXECUTOR;
173 }
174 if (tasks <= 1) {
175 return ExecutorUtils.DIRECT_EXECUTOR;
176 }
177 return executors.computeIfAbsent(
178 downstream,
179 k -> ExecutorUtils.threadPool(
180 maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
181 }
182
183 @Override
184 public void close() {
185 if (closed.compareAndSet(false, true)) {
186 for (Executor executor : executors.values()) {
187 ExecutorUtils.shutdown(executor);
188 }
189 transporter.close();
190 }
191 }
192
193 private void failIfClosed() {
194 if (closed.get()) {
195 throw new IllegalStateException("connector already closed");
196 }
197 }
198
199 @Override
200 public void get(
201 Collection<? extends ArtifactDownload> artifactDownloads,
202 Collection<? extends MetadataDownload> metadataDownloads) {
203 failIfClosed();
204
205 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
206 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
207
208 Executor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
209 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
210 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
211
212 boolean first = true;
213
214 for (MetadataDownload transfer : safeMetadataDownloads) {
215 URI location = layout.getLocation(transfer.getMetadata(), false);
216
217 TransferResource resource = newTransferResource(location, transfer);
218 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
219 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
220
221 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
222 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
223 if (checksumPolicy != null) {
224 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
225 }
226
227 Runnable task = new GetTaskRunner(
228 location,
229 transfer.getPath(),
230 checksumPolicy,
231 checksumAlgorithmFactories,
232 checksumLocations,
233 null,
234 listener);
235 if (first) {
236 task.run();
237 first = false;
238 } else {
239 executor.execute(errorForwarder.wrap(task));
240 }
241 }
242
243 for (ArtifactDownload transfer : safeArtifactDownloads) {
244 Map<String, String> providedChecksums = Collections.emptyMap();
245 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
246 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
247 session, transfer, repository, checksumAlgorithmFactories);
248
249 if (provided != null) {
250 providedChecksums = provided;
251 break;
252 }
253 }
254
255 URI location = layout.getLocation(transfer.getArtifact(), false);
256
257 TransferResource resource = newTransferResource(location, transfer);
258 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
259 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
260
261 Runnable task;
262 if (transfer.isExistenceCheck()) {
263 task = new PeekTaskRunner(location, listener);
264 } else {
265 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
266 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
267 if (checksumPolicy != null) {
268 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
269 }
270
271 task = new GetTaskRunner(
272 location,
273 transfer.getPath(),
274 checksumPolicy,
275 checksumAlgorithmFactories,
276 checksumLocations,
277 providedChecksums,
278 listener);
279 }
280 if (first) {
281 task.run();
282 first = false;
283 } else {
284 executor.execute(errorForwarder.wrap(task));
285 }
286 }
287
288 errorForwarder.await();
289 }
290
291 @Override
292 public void put(
293 Collection<? extends ArtifactUpload> artifactUploads,
294 Collection<? extends MetadataUpload> metadataUploads) {
295 failIfClosed();
296
297 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
298 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
299
300 Executor executor =
301 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
302 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
303
304 boolean first = true;
305
306 for (ArtifactUpload transfer : safeArtifactUploads) {
307 URI location = layout.getLocation(transfer.getArtifact(), true);
308
309 TransferResource resource = newTransferResource(location, transfer);
310 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
311 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
312
313 List<RepositoryLayout.ChecksumLocation> checksumLocations =
314 layout.getChecksumLocations(transfer.getArtifact(), true, location);
315
316 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
317 if (first) {
318 task.run();
319 first = false;
320 } else {
321 executor.execute(errorForwarder.wrap(task));
322 }
323 }
324
325 errorForwarder.await();
326
327 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
328 for (MetadataUpload transfer : transferGroup) {
329 URI location = layout.getLocation(transfer.getMetadata(), true);
330
331 TransferResource resource = newTransferResource(location, transfer);
332 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
333 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
334
335 List<RepositoryLayout.ChecksumLocation> checksumLocations =
336 layout.getChecksumLocations(transfer.getMetadata(), true, location);
337
338 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
339 if (first) {
340 task.run();
341 first = false;
342 } else {
343 executor.execute(errorForwarder.wrap(task));
344 }
345 }
346
347 errorForwarder.await();
348 }
349 }
350
351
352
353
354
355
356 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
357 ArrayList<MetadataUpload> v = new ArrayList<>();
358 ArrayList<MetadataUpload> a = new ArrayList<>();
359 ArrayList<MetadataUpload> g = new ArrayList<>();
360 ArrayList<MetadataUpload> r = new ArrayList<>();
361
362 for (MetadataUpload transfer : metadataUploads) {
363 Metadata metadata = transfer.getMetadata();
364 if (!"".equals(metadata.getVersion())) {
365 v.add(transfer);
366 } else if (!"".equals(metadata.getArtifactId())) {
367 a.add(transfer);
368 } else if (!"".equals(metadata.getGroupId())) {
369 g.add(transfer);
370 } else {
371 r.add(transfer);
372 }
373 }
374
375 List<List<MetadataUpload>> result = new ArrayList<>(4);
376 if (!v.isEmpty()) {
377 result.add(v);
378 }
379 if (!a.isEmpty()) {
380 result.add(a);
381 }
382 if (!g.isEmpty()) {
383 result.add(g);
384 }
385 if (!r.isEmpty()) {
386 result.add(r);
387 }
388 return result;
389 }
390
391 private static <T> Collection<T> safe(Collection<T> items) {
392 return (items != null) ? items : Collections.emptyList();
393 }
394
395 private TransferResource newTransferResource(URI path, Transfer transfer) {
396 if (transfer instanceof ArtifactTransfer) {
397 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
398 return new TransferResource(
399 repository.getId(),
400 repository.getUrl(),
401 path.toString(),
402 artifactTransfer.getPath(),
403 artifactTransfer.getArtifact(),
404 artifactTransfer.getTrace());
405 } else if (transfer instanceof MetadataTransfer) {
406 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
407 return new TransferResource(
408 repository.getId(),
409 repository.getUrl(),
410 path.toString(),
411 metadataTransfer.getPath(),
412 metadataTransfer.getMetadata(),
413 metadataTransfer.getTrace());
414 } else {
415 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
416 }
417 }
418
419 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
420 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
421 if (upload) {
422 builder.setRequestType(TransferEvent.RequestType.PUT);
423 } else if (!peek) {
424 builder.setRequestType(TransferEvent.RequestType.GET);
425 } else {
426 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
427 }
428 return builder;
429 }
430
431 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
432 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
433 }
434
435 @Override
436 public String toString() {
437 return String.valueOf(repository);
438 }
439
440 abstract class TaskRunner implements Runnable {
441
442 protected final URI path;
443
444 protected final TransferTransportListener<?> listener;
445
446 TaskRunner(URI path, TransferTransportListener<?> listener) {
447 this.path = path;
448 this.listener = listener;
449 }
450
451 @Override
452 public void run() {
453 try {
454 listener.transferInitiated();
455 runTask();
456 listener.transferSucceeded();
457 } catch (Exception e) {
458 listener.transferFailed(e, transporter.classify(e));
459 }
460 }
461
462 protected abstract void runTask() throws Exception;
463 }
464
465 class PeekTaskRunner extends TaskRunner {
466
467 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
468 super(path, listener);
469 }
470
471 @Override
472 protected void runTask() throws Exception {
473 transporter.peek(new PeekTask(path));
474 }
475 }
476
477 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
478
479 private final Path file;
480
481 private final ChecksumValidator checksumValidator;
482
483 GetTaskRunner(
484 URI path,
485 Path file,
486 ChecksumPolicy checksumPolicy,
487 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
488 List<RepositoryLayout.ChecksumLocation> checksumLocations,
489 Map<String, String> providedChecksums,
490 TransferTransportListener<?> listener) {
491 super(path, listener);
492 this.file = requireNonNull(file, "destination file cannot be null");
493 checksumValidator = new ChecksumValidator(
494 file,
495 checksumAlgorithmFactories,
496 checksumProcessor,
497 this,
498 checksumPolicy,
499 providedChecksums,
500 safe(checksumLocations));
501 }
502
503 @Override
504 public boolean fetchChecksum(URI remote, Path local) throws Exception {
505 try {
506 transporter.get(new GetTask(remote).setDataPath(local));
507 } catch (Exception e) {
508 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
509 return false;
510 }
511 throw e;
512 }
513 return true;
514 }
515
516 @Override
517 protected void runTask() throws Exception {
518 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file)) {
519 final Path tmp = tempFile.getPath();
520 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
521 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
522 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
523 transporter.get(task);
524 try {
525 checksumValidator.validate(
526 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
527 break;
528 } catch (ChecksumFailureException e) {
529 boolean retry = trial < lastTrial && e.isRetryWorthy();
530 if (!retry && !checksumValidator.handle(e)) {
531 throw e;
532 }
533 listener.transferCorrupted(e);
534 if (retry) {
535 checksumValidator.retry();
536 } else {
537 break;
538 }
539 }
540 }
541 tempFile.move();
542 if (persistedChecksums) {
543 checksumValidator.commit();
544 }
545 }
546 }
547 }
548
549 class PutTaskRunner extends TaskRunner {
550
551 private final Path file;
552
553 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
554
555 PutTaskRunner(
556 URI path,
557 Path file,
558 List<RepositoryLayout.ChecksumLocation> checksumLocations,
559 TransferTransportListener<?> listener) {
560 super(path, listener);
561 this.file = requireNonNull(file, "source file cannot be null");
562 this.checksumLocations = safe(checksumLocations);
563 }
564
565 @SuppressWarnings("checkstyle:innerassignment")
566 @Override
567 protected void runTask() throws Exception {
568 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
569 uploadChecksums(file, null);
570 }
571
572
573
574
575
576 private void uploadChecksums(Path path, byte[] bytes) {
577 if (checksumLocations.isEmpty()) {
578 return;
579 }
580 try {
581 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
582 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
583 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
584 }
585
586 Map<String, String> sumsByAlgo;
587 if (bytes != null) {
588 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
589 } else {
590 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
591 }
592
593 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
594 uploadChecksum(
595 checksumLocation.getLocation(),
596 sumsByAlgo.get(checksumLocation
597 .getChecksumAlgorithmFactory()
598 .getName()));
599 }
600 } catch (IOException e) {
601 LOGGER.warn("Failed to upload checksums for {}", file, e);
602 throw new UncheckedIOException(e);
603 }
604 }
605
606 private void uploadChecksum(URI location, Object checksum) {
607 try {
608 if (checksum instanceof Exception) {
609 throw (Exception) checksum;
610 }
611 transporter.put(new PutTask(location).setDataString((String) checksum));
612 } catch (Exception e) {
613 LOGGER.warn("Failed to upload checksum to {}", location, e);
614 }
615 }
616 }
617 }