1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.eclipse.aether.RepositorySystemSession;
35 import org.eclipse.aether.metadata.Metadata;
36 import org.eclipse.aether.repository.RemoteRepository;
37 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
38 import org.eclipse.aether.spi.connector.ArtifactDownload;
39 import org.eclipse.aether.spi.connector.ArtifactTransfer;
40 import org.eclipse.aether.spi.connector.ArtifactUpload;
41 import org.eclipse.aether.spi.connector.MetadataDownload;
42 import org.eclipse.aether.spi.connector.MetadataTransfer;
43 import org.eclipse.aether.spi.connector.MetadataUpload;
44 import org.eclipse.aether.spi.connector.RepositoryConnector;
45 import org.eclipse.aether.spi.connector.Transfer;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
49 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52 import org.eclipse.aether.spi.connector.transport.GetTask;
53 import org.eclipse.aether.spi.connector.transport.PeekTask;
54 import org.eclipse.aether.spi.connector.transport.PutTask;
55 import org.eclipse.aether.spi.connector.transport.Transporter;
56 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57 import org.eclipse.aether.spi.io.ChecksumProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.NoTransporterException;
62 import org.eclipse.aether.transfer.TransferEvent;
63 import org.eclipse.aether.transfer.TransferResource;
64 import org.eclipse.aether.util.ConfigUtils;
65 import org.eclipse.aether.util.FileUtils;
66 import org.eclipse.aether.util.concurrency.ExecutorUtils;
67 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 import static java.util.Objects.requireNonNull;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
77 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
78 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
79 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
80 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
81 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
82
83
84
85
86 final class BasicRepositoryConnector implements RepositoryConnector {
87 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
88
89 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
90
91 private final ChecksumProcessor checksumProcessor;
92
93 private final RemoteRepository repository;
94
95 private final RepositorySystemSession session;
96
97 private final Transporter transporter;
98
99 private final RepositoryLayout layout;
100
101 private final ChecksumPolicyProvider checksumPolicyProvider;
102
103 private final int maxDownstreamThreads;
104
105 private final int maxUpstreamThreads;
106
107 private final boolean smartChecksums;
108
109 private final boolean parallelPut;
110
111 private final boolean persistedChecksums;
112
113 private final ConcurrentHashMap<Boolean, Executor> executors;
114
115 private final AtomicBoolean closed;
116
117 BasicRepositoryConnector(
118 RepositorySystemSession session,
119 RemoteRepository repository,
120 TransporterProvider transporterProvider,
121 RepositoryLayoutProvider layoutProvider,
122 ChecksumPolicyProvider checksumPolicyProvider,
123 ChecksumProcessor checksumProcessor,
124 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
125 throws NoRepositoryConnectorException {
126 try {
127 layout = layoutProvider.newRepositoryLayout(session, repository);
128 } catch (NoRepositoryLayoutException e) {
129 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
130 }
131 try {
132 transporter = transporterProvider.newTransporter(session, repository);
133 } catch (NoTransporterException e) {
134 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
135 }
136 this.checksumPolicyProvider = checksumPolicyProvider;
137
138 this.session = session;
139 this.repository = repository;
140 this.checksumProcessor = checksumProcessor;
141 this.providedChecksumsSources = providedChecksumsSources;
142 this.executors = new ConcurrentHashMap<>();
143 this.closed = new AtomicBoolean(false);
144
145 maxUpstreamThreads = ExecutorUtils.threadCount(
146 session,
147 DEFAULT_THREADS,
148 CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
149 CONFIG_PROP_UPSTREAM_THREADS,
150 CONFIG_PROP_THREADS);
151 maxDownstreamThreads = ExecutorUtils.threadCount(
152 session,
153 DEFAULT_THREADS,
154 CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
155 CONFIG_PROP_DOWNSTREAM_THREADS,
156 CONFIG_PROP_THREADS);
157 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
158 parallelPut = ConfigUtils.getBoolean(
159 session,
160 DEFAULT_PARALLEL_PUT,
161 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
162 CONFIG_PROP_PARALLEL_PUT);
163 persistedChecksums =
164 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
165 }
166
167 private Executor getExecutor(boolean downstream, int tasks) {
168 int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
169 if (maxThreads <= 1) {
170 return ExecutorUtils.DIRECT_EXECUTOR;
171 }
172 if (tasks <= 1) {
173 return ExecutorUtils.DIRECT_EXECUTOR;
174 }
175 return executors.computeIfAbsent(
176 downstream,
177 k -> ExecutorUtils.threadPool(
178 maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
179 }
180
181 @Override
182 public void close() {
183 if (closed.compareAndSet(false, true)) {
184 for (Executor executor : executors.values()) {
185 ExecutorUtils.shutdown(executor);
186 }
187 transporter.close();
188 }
189 }
190
191 private void failIfClosed() {
192 if (closed.get()) {
193 throw new IllegalStateException("connector already closed");
194 }
195 }
196
197 @Override
198 public void get(
199 Collection<? extends ArtifactDownload> artifactDownloads,
200 Collection<? extends MetadataDownload> metadataDownloads) {
201 failIfClosed();
202
203 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
204 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
205
206 Executor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
207 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
208 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
209
210 boolean first = true;
211
212 for (MetadataDownload transfer : safeMetadataDownloads) {
213 URI location = layout.getLocation(transfer.getMetadata(), false);
214
215 TransferResource resource = newTransferResource(location, transfer);
216 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
217 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
218
219 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
220 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
221 if (checksumPolicy != null) {
222 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
223 }
224
225 Runnable task = new GetTaskRunner(
226 location,
227 transfer.getPath(),
228 checksumPolicy,
229 checksumAlgorithmFactories,
230 checksumLocations,
231 null,
232 listener);
233 if (first) {
234 task.run();
235 first = false;
236 } else {
237 executor.execute(errorForwarder.wrap(task));
238 }
239 }
240
241 for (ArtifactDownload transfer : safeArtifactDownloads) {
242 Map<String, String> providedChecksums = Collections.emptyMap();
243 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
244 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
245 session, transfer, repository, checksumAlgorithmFactories);
246
247 if (provided != null) {
248 providedChecksums = provided;
249 break;
250 }
251 }
252
253 URI location = layout.getLocation(transfer.getArtifact(), false);
254
255 TransferResource resource = newTransferResource(location, transfer);
256 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
257 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
258
259 Runnable task;
260 if (transfer.isExistenceCheck()) {
261 task = new PeekTaskRunner(location, listener);
262 } else {
263 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
264 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
265 if (checksumPolicy != null) {
266 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
267 }
268
269 task = new GetTaskRunner(
270 location,
271 transfer.getPath(),
272 checksumPolicy,
273 checksumAlgorithmFactories,
274 checksumLocations,
275 providedChecksums,
276 listener);
277 }
278 if (first) {
279 task.run();
280 first = false;
281 } else {
282 executor.execute(errorForwarder.wrap(task));
283 }
284 }
285
286 errorForwarder.await();
287 }
288
289 @Override
290 public void put(
291 Collection<? extends ArtifactUpload> artifactUploads,
292 Collection<? extends MetadataUpload> metadataUploads) {
293 failIfClosed();
294
295 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
296 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
297
298 Executor executor =
299 getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
300 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
301
302 boolean first = true;
303
304 for (ArtifactUpload transfer : safeArtifactUploads) {
305 URI location = layout.getLocation(transfer.getArtifact(), true);
306
307 TransferResource resource = newTransferResource(location, transfer);
308 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
309 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
310
311 List<RepositoryLayout.ChecksumLocation> checksumLocations =
312 layout.getChecksumLocations(transfer.getArtifact(), true, location);
313
314 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
315 if (first) {
316 task.run();
317 first = false;
318 } else {
319 executor.execute(errorForwarder.wrap(task));
320 }
321 }
322
323 errorForwarder.await();
324
325 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
326 for (MetadataUpload transfer : transferGroup) {
327 URI location = layout.getLocation(transfer.getMetadata(), true);
328
329 TransferResource resource = newTransferResource(location, transfer);
330 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
331 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
332
333 List<RepositoryLayout.ChecksumLocation> checksumLocations =
334 layout.getChecksumLocations(transfer.getMetadata(), true, location);
335
336 Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
337 if (first) {
338 task.run();
339 first = false;
340 } else {
341 executor.execute(errorForwarder.wrap(task));
342 }
343 }
344
345 errorForwarder.await();
346 }
347 }
348
349
350
351
352
353
354 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
355 ArrayList<MetadataUpload> v = new ArrayList<>();
356 ArrayList<MetadataUpload> a = new ArrayList<>();
357 ArrayList<MetadataUpload> g = new ArrayList<>();
358 ArrayList<MetadataUpload> r = new ArrayList<>();
359
360 for (MetadataUpload transfer : metadataUploads) {
361 Metadata metadata = transfer.getMetadata();
362 if (!"".equals(metadata.getVersion())) {
363 v.add(transfer);
364 } else if (!"".equals(metadata.getArtifactId())) {
365 a.add(transfer);
366 } else if (!"".equals(metadata.getGroupId())) {
367 g.add(transfer);
368 } else {
369 r.add(transfer);
370 }
371 }
372
373 List<List<MetadataUpload>> result = new ArrayList<>(4);
374 if (!v.isEmpty()) {
375 result.add(v);
376 }
377 if (!a.isEmpty()) {
378 result.add(a);
379 }
380 if (!g.isEmpty()) {
381 result.add(g);
382 }
383 if (!r.isEmpty()) {
384 result.add(r);
385 }
386 return result;
387 }
388
389 private static <T> Collection<T> safe(Collection<T> items) {
390 return (items != null) ? items : Collections.emptyList();
391 }
392
393 private TransferResource newTransferResource(URI path, Transfer transfer) {
394 if (transfer instanceof ArtifactTransfer) {
395 ArtifactTransfer artifactTransfer = (ArtifactTransfer) transfer;
396 return new TransferResource(
397 repository.getId(),
398 repository.getUrl(),
399 path.toString(),
400 artifactTransfer.getPath(),
401 artifactTransfer.getArtifact(),
402 artifactTransfer.getTrace());
403 } else if (transfer instanceof MetadataTransfer) {
404 MetadataTransfer metadataTransfer = (MetadataTransfer) transfer;
405 return new TransferResource(
406 repository.getId(),
407 repository.getUrl(),
408 path.toString(),
409 metadataTransfer.getPath(),
410 metadataTransfer.getMetadata(),
411 metadataTransfer.getTrace());
412 } else {
413 throw new IllegalArgumentException("Accepting only artifact or metadata transfers");
414 }
415 }
416
417 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
418 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
419 if (upload) {
420 builder.setRequestType(TransferEvent.RequestType.PUT);
421 } else if (!peek) {
422 builder.setRequestType(TransferEvent.RequestType.GET);
423 } else {
424 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
425 }
426 return builder;
427 }
428
429 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
430 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
431 }
432
433 @Override
434 public String toString() {
435 return String.valueOf(repository);
436 }
437
438 abstract class TaskRunner implements Runnable {
439
440 protected final URI path;
441
442 protected final TransferTransportListener<?> listener;
443
444 TaskRunner(URI path, TransferTransportListener<?> listener) {
445 this.path = path;
446 this.listener = listener;
447 }
448
449 @Override
450 public void run() {
451 try {
452 listener.transferInitiated();
453 runTask();
454 listener.transferSucceeded();
455 } catch (Exception e) {
456 listener.transferFailed(e, transporter.classify(e));
457 }
458 }
459
460 protected abstract void runTask() throws Exception;
461 }
462
463 class PeekTaskRunner extends TaskRunner {
464
465 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
466 super(path, listener);
467 }
468
469 @Override
470 protected void runTask() throws Exception {
471 transporter.peek(new PeekTask(path));
472 }
473 }
474
475 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
476
477 private final Path file;
478
479 private final ChecksumValidator checksumValidator;
480
481 GetTaskRunner(
482 URI path,
483 Path file,
484 ChecksumPolicy checksumPolicy,
485 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
486 List<RepositoryLayout.ChecksumLocation> checksumLocations,
487 Map<String, String> providedChecksums,
488 TransferTransportListener<?> listener) {
489 super(path, listener);
490 this.file = requireNonNull(file, "destination file cannot be null");
491 checksumValidator = new ChecksumValidator(
492 file,
493 checksumAlgorithmFactories,
494 checksumProcessor,
495 this,
496 checksumPolicy,
497 providedChecksums,
498 safe(checksumLocations));
499 }
500
501 @Override
502 public boolean fetchChecksum(URI remote, Path local) throws Exception {
503 try {
504 transporter.get(new GetTask(remote).setDataPath(local));
505 } catch (Exception e) {
506 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
507 return false;
508 }
509 throw e;
510 }
511 return true;
512 }
513
514 @Override
515 protected void runTask() throws Exception {
516 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file)) {
517 final Path tmp = tempFile.getPath();
518 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
519 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
520 GetTask task = new GetTask(path).setDataPath(tmp, false).setListener(listener);
521 transporter.get(task);
522 try {
523 checksumValidator.validate(
524 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
525 break;
526 } catch (ChecksumFailureException e) {
527 boolean retry = trial < lastTrial && e.isRetryWorthy();
528 if (!retry && !checksumValidator.handle(e)) {
529 throw e;
530 }
531 listener.transferCorrupted(e);
532 if (retry) {
533 checksumValidator.retry();
534 } else {
535 break;
536 }
537 }
538 }
539 tempFile.move();
540 if (persistedChecksums) {
541 checksumValidator.commit();
542 }
543 }
544 }
545 }
546
547 class PutTaskRunner extends TaskRunner {
548
549 private final Path file;
550
551 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
552
553 PutTaskRunner(
554 URI path,
555 Path file,
556 List<RepositoryLayout.ChecksumLocation> checksumLocations,
557 TransferTransportListener<?> listener) {
558 super(path, listener);
559 this.file = requireNonNull(file, "source file cannot be null");
560 this.checksumLocations = safe(checksumLocations);
561 }
562
563 @SuppressWarnings("checkstyle:innerassignment")
564 @Override
565 protected void runTask() throws Exception {
566 transporter.put(new PutTask(path).setDataPath(file).setListener(listener));
567 uploadChecksums(file, null);
568 }
569
570
571
572
573
574 private void uploadChecksums(Path path, byte[] bytes) {
575 if (checksumLocations.isEmpty()) {
576 return;
577 }
578 try {
579 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
580 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
581 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
582 }
583
584 Map<String, String> sumsByAlgo;
585 if (bytes != null) {
586 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
587 } else {
588 sumsByAlgo = ChecksumAlgorithmHelper.calculate(path, algorithms);
589 }
590
591 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
592 uploadChecksum(
593 checksumLocation.getLocation(),
594 sumsByAlgo.get(checksumLocation
595 .getChecksumAlgorithmFactory()
596 .getName()));
597 }
598 } catch (IOException e) {
599 LOGGER.warn("Failed to upload checksums for {}", file, e);
600 throw new UncheckedIOException(e);
601 }
602 }
603
604 private void uploadChecksum(URI location, Object checksum) {
605 try {
606 if (checksum instanceof Exception) {
607 throw (Exception) checksum;
608 }
609 transporter.put(new PutTask(location).setDataString((String) checksum));
610 } catch (Exception e) {
611 LOGGER.warn("Failed to upload checksum to {}", location, e);
612 }
613 }
614 }
615 }