View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.connector.basic;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.File;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.UncheckedIOException;
26  import java.net.URI;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import org.eclipse.aether.ConfigurationProperties;
36  import org.eclipse.aether.RepositorySystemSession;
37  import org.eclipse.aether.RequestTrace;
38  import org.eclipse.aether.metadata.Metadata;
39  import org.eclipse.aether.repository.RemoteRepository;
40  import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
41  import org.eclipse.aether.spi.connector.ArtifactDownload;
42  import org.eclipse.aether.spi.connector.ArtifactUpload;
43  import org.eclipse.aether.spi.connector.MetadataDownload;
44  import org.eclipse.aether.spi.connector.MetadataUpload;
45  import org.eclipse.aether.spi.connector.RepositoryConnector;
46  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
47  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
48  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
49  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
50  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52  import org.eclipse.aether.spi.connector.transport.GetTask;
53  import org.eclipse.aether.spi.connector.transport.PeekTask;
54  import org.eclipse.aether.spi.connector.transport.PutTask;
55  import org.eclipse.aether.spi.connector.transport.Transporter;
56  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57  import org.eclipse.aether.spi.io.FileProcessor;
58  import org.eclipse.aether.transfer.ChecksumFailureException;
59  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61  import org.eclipse.aether.transfer.NoTransporterException;
62  import org.eclipse.aether.transfer.TransferEvent;
63  import org.eclipse.aether.transfer.TransferResource;
64  import org.eclipse.aether.transform.FileTransformer;
65  import org.eclipse.aether.util.ConfigUtils;
66  import org.eclipse.aether.util.FileUtils;
67  import org.eclipse.aether.util.concurrency.ExecutorUtils;
68  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
69  import org.slf4j.Logger;
70  import org.slf4j.LoggerFactory;
71  
72  import static java.util.Objects.requireNonNull;
73  
74  /**
75   *
76   */
77  final class BasicRepositoryConnector implements RepositoryConnector {
78  
79      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
80  
81      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82  
83      private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
84  
85      private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
86  
87      private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
88  
89      private final FileProcessor fileProcessor;
90  
91      private final RemoteRepository repository;
92  
93      private final RepositorySystemSession session;
94  
95      private final Transporter transporter;
96  
97      private final RepositoryLayout layout;
98  
99      private final ChecksumPolicyProvider checksumPolicyProvider;
100 
101     private final int maxThreads;
102 
103     private final boolean smartChecksums;
104 
105     private final boolean parallelPut;
106 
107     private final boolean persistedChecksums;
108 
109     private Executor executor;
110 
111     private final AtomicBoolean closed;
112 
113     BasicRepositoryConnector(
114             RepositorySystemSession session,
115             RemoteRepository repository,
116             TransporterProvider transporterProvider,
117             RepositoryLayoutProvider layoutProvider,
118             ChecksumPolicyProvider checksumPolicyProvider,
119             FileProcessor fileProcessor,
120             Map<String, ProvidedChecksumsSource> providedChecksumsSources)
121             throws NoRepositoryConnectorException {
122         try {
123             layout = layoutProvider.newRepositoryLayout(session, repository);
124         } catch (NoRepositoryLayoutException e) {
125             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
126         }
127         try {
128             transporter = transporterProvider.newTransporter(session, repository);
129         } catch (NoTransporterException e) {
130             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
131         }
132         this.checksumPolicyProvider = checksumPolicyProvider;
133 
134         this.session = session;
135         this.repository = repository;
136         this.fileProcessor = fileProcessor;
137         this.providedChecksumsSources = providedChecksumsSources;
138         this.closed = new AtomicBoolean(false);
139 
140         maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
141         smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
142         parallelPut = ConfigUtils.getBoolean(
143                 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
144         persistedChecksums = ConfigUtils.getBoolean(
145                 session,
146                 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
147                 ConfigurationProperties.PERSISTED_CHECKSUMS);
148     }
149 
150     private Executor getExecutor(int tasks) {
151         if (maxThreads <= 1) {
152             return ExecutorUtils.DIRECT_EXECUTOR;
153         }
154         if (tasks <= 1) {
155             return ExecutorUtils.DIRECT_EXECUTOR;
156         }
157         if (executor == null) {
158             executor =
159                     ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
160         }
161         return executor;
162     }
163 
164     @Override
165     public void close() {
166         if (closed.compareAndSet(false, true)) {
167             ExecutorUtils.shutdown(executor);
168             transporter.close();
169         }
170     }
171 
172     private void failIfClosed() {
173         if (closed.get()) {
174             throw new IllegalStateException("connector already closed");
175         }
176     }
177 
178     @Override
179     public void get(
180             Collection<? extends ArtifactDownload> artifactDownloads,
181             Collection<? extends MetadataDownload> metadataDownloads) {
182         failIfClosed();
183 
184         Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
185         Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
186 
187         Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
188         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
189         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
190 
191         boolean first = true;
192 
193         for (MetadataDownload transfer : safeMetadataDownloads) {
194             URI location = layout.getLocation(transfer.getMetadata(), false);
195 
196             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
197             TransferEvent.Builder builder = newEventBuilder(resource, false, false);
198             MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
199 
200             ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
201             List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
202             if (checksumPolicy != null) {
203                 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
204             }
205 
206             Runnable task = new GetTaskRunner(
207                     location,
208                     transfer.getFile(),
209                     checksumPolicy,
210                     checksumAlgorithmFactories,
211                     checksumLocations,
212                     null,
213                     listener);
214             if (first) {
215                 task.run();
216                 first = false;
217             } else {
218                 executor.execute(errorForwarder.wrap(task));
219             }
220         }
221 
222         for (ArtifactDownload transfer : safeArtifactDownloads) {
223             Map<String, String> providedChecksums = Collections.emptyMap();
224             for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
225                 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
226                         session, transfer, repository, checksumAlgorithmFactories);
227 
228                 if (provided != null) {
229                     providedChecksums = provided;
230                     break;
231                 }
232             }
233 
234             URI location = layout.getLocation(transfer.getArtifact(), false);
235 
236             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
237             TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
238             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
239 
240             Runnable task;
241             if (transfer.isExistenceCheck()) {
242                 task = new PeekTaskRunner(location, listener);
243             } else {
244                 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
245                 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
246                 if (checksumPolicy != null) {
247                     checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
248                 }
249 
250                 task = new GetTaskRunner(
251                         location,
252                         transfer.getFile(),
253                         checksumPolicy,
254                         checksumAlgorithmFactories,
255                         checksumLocations,
256                         providedChecksums,
257                         listener);
258             }
259             if (first) {
260                 task.run();
261                 first = false;
262             } else {
263                 executor.execute(errorForwarder.wrap(task));
264             }
265         }
266 
267         errorForwarder.await();
268     }
269 
270     @Override
271     public void put(
272             Collection<? extends ArtifactUpload> artifactUploads,
273             Collection<? extends MetadataUpload> metadataUploads) {
274         failIfClosed();
275 
276         Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
277         Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
278 
279         Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
280         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
281 
282         boolean first = true;
283 
284         for (ArtifactUpload transfer : safeArtifactUploads) {
285             URI location = layout.getLocation(transfer.getArtifact(), true);
286 
287             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
288             TransferEvent.Builder builder = newEventBuilder(resource, true, false);
289             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
290 
291             List<RepositoryLayout.ChecksumLocation> checksumLocations =
292                     layout.getChecksumLocations(transfer.getArtifact(), true, location);
293 
294             Runnable task = new PutTaskRunner(
295                     location, transfer.getFile(), transfer.getFileTransformer(), checksumLocations, listener);
296             if (first) {
297                 task.run();
298                 first = false;
299             } else {
300                 executor.execute(errorForwarder.wrap(task));
301             }
302         }
303 
304         errorForwarder.await(); // make sure all artifacts are PUT before we go with Metadata
305 
306         for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
307             for (MetadataUpload transfer : transferGroup) {
308                 URI location = layout.getLocation(transfer.getMetadata(), true);
309 
310                 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
311                 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
312                 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
313 
314                 List<RepositoryLayout.ChecksumLocation> checksumLocations =
315                         layout.getChecksumLocations(transfer.getMetadata(), true, location);
316 
317                 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
318                 if (first) {
319                     task.run();
320                     first = false;
321                 } else {
322                     executor.execute(errorForwarder.wrap(task));
323                 }
324             }
325 
326             errorForwarder.await(); // make sure each group is done before starting next group
327         }
328     }
329 
330     /**
331      * This method "groups" the Metadata to be uploaded by their level (version, artifact, group and root). This is MUST
332      * as clients consume metadata in opposite order (root, group, artifact, version), and hence, we must deploy and
333      * ensure (in case of parallel deploy) that all V level metadata is deployed before we start deploying A level, etc.
334      */
335     private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
336         ArrayList<MetadataUpload> v = new ArrayList<>();
337         ArrayList<MetadataUpload> a = new ArrayList<>();
338         ArrayList<MetadataUpload> g = new ArrayList<>();
339         ArrayList<MetadataUpload> r = new ArrayList<>();
340 
341         for (MetadataUpload transfer : metadataUploads) {
342             Metadata metadata = transfer.getMetadata();
343             if (!"".equals(metadata.getVersion())) {
344                 v.add(transfer);
345             } else if (!"".equals(metadata.getArtifactId())) {
346                 a.add(transfer);
347             } else if (!"".equals(metadata.getGroupId())) {
348                 g.add(transfer);
349             } else {
350                 r.add(transfer);
351             }
352         }
353 
354         List<List<MetadataUpload>> result = new ArrayList<>(4);
355         if (!v.isEmpty()) {
356             result.add(v);
357         }
358         if (!a.isEmpty()) {
359             result.add(a);
360         }
361         if (!g.isEmpty()) {
362             result.add(g);
363         }
364         if (!r.isEmpty()) {
365             result.add(r);
366         }
367         return result;
368     }
369 
370     private static <T> Collection<T> safe(Collection<T> items) {
371         return (items != null) ? items : Collections.emptyList();
372     }
373 
374     private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
375         return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
376     }
377 
378     private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
379         TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
380         if (upload) {
381             builder.setRequestType(TransferEvent.RequestType.PUT);
382         } else if (!peek) {
383             builder.setRequestType(TransferEvent.RequestType.GET);
384         } else {
385             builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
386         }
387         return builder;
388     }
389 
390     private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
391         return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
392     }
393 
394     @Override
395     public String toString() {
396         return String.valueOf(repository);
397     }
398 
399     abstract class TaskRunner implements Runnable {
400 
401         protected final URI path;
402 
403         protected final TransferTransportListener<?> listener;
404 
405         TaskRunner(URI path, TransferTransportListener<?> listener) {
406             this.path = path;
407             this.listener = listener;
408         }
409 
410         @Override
411         public void run() {
412             try {
413                 listener.transferInitiated();
414                 runTask();
415                 listener.transferSucceeded();
416             } catch (Exception e) {
417                 listener.transferFailed(e, transporter.classify(e));
418             }
419         }
420 
421         protected abstract void runTask() throws Exception;
422     }
423 
424     class PeekTaskRunner extends TaskRunner {
425 
426         PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
427             super(path, listener);
428         }
429 
430         @Override
431         protected void runTask() throws Exception {
432             transporter.peek(new PeekTask(path));
433         }
434     }
435 
436     class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
437 
438         private final File file;
439 
440         private final ChecksumValidator checksumValidator;
441 
442         GetTaskRunner(
443                 URI path,
444                 File file,
445                 ChecksumPolicy checksumPolicy,
446                 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
447                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
448                 Map<String, String> providedChecksums,
449                 TransferTransportListener<?> listener) {
450             super(path, listener);
451             this.file = requireNonNull(file, "destination file cannot be null");
452             checksumValidator = new ChecksumValidator(
453                     file,
454                     checksumAlgorithmFactories,
455                     fileProcessor,
456                     this,
457                     checksumPolicy,
458                     providedChecksums,
459                     safe(checksumLocations));
460         }
461 
462         @Override
463         public boolean fetchChecksum(URI remote, File local) throws Exception {
464             try {
465                 transporter.get(new GetTask(remote).setDataFile(local));
466             } catch (Exception e) {
467                 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
468                     return false;
469                 }
470                 throw e;
471             }
472             return true;
473         }
474 
475         @Override
476         protected void runTask() throws Exception {
477             try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
478                 final File tmp = tempFile.getPath().toFile();
479                 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
480                 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
481                     GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
482                     transporter.get(task);
483                     try {
484                         checksumValidator.validate(
485                                 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
486                         break;
487                     } catch (ChecksumFailureException e) {
488                         boolean retry = trial < lastTrial && e.isRetryWorthy();
489                         if (!retry && !checksumValidator.handle(e)) {
490                             throw e;
491                         }
492                         listener.transferCorrupted(e);
493                         if (retry) {
494                             checksumValidator.retry();
495                         } else {
496                             break;
497                         }
498                     }
499                 }
500                 tempFile.move();
501                 if (persistedChecksums) {
502                     checksumValidator.commit();
503                 }
504             }
505         }
506     }
507 
508     class PutTaskRunner extends TaskRunner {
509 
510         private final File file;
511 
512         private final FileTransformer fileTransformer;
513 
514         private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
515 
516         PutTaskRunner(
517                 URI path,
518                 File file,
519                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
520                 TransferTransportListener<?> listener) {
521             this(path, file, null, checksumLocations, listener);
522         }
523 
524         /**
525          * <strong>IMPORTANT</strong> When using a fileTransformer, the content of the file is stored in memory to
526          * ensure that file content and checksums stay in sync!
527          *
528          * @param path
529          * @param file
530          * @param fileTransformer
531          * @param checksumLocations
532          * @param listener
533          */
534         PutTaskRunner(
535                 URI path,
536                 File file,
537                 FileTransformer fileTransformer,
538                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
539                 TransferTransportListener<?> listener) {
540             super(path, listener);
541             this.file = requireNonNull(file, "source file cannot be null");
542             this.fileTransformer = fileTransformer;
543             this.checksumLocations = safe(checksumLocations);
544         }
545 
546         @SuppressWarnings("checkstyle:innerassignment")
547         @Override
548         protected void runTask() throws Exception {
549             if (fileTransformer != null) {
550                 // transform data once to byte array, ensure constant data for checksum
551                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
552                 byte[] buffer = new byte[1024];
553 
554                 try (InputStream transformData = fileTransformer.transformData(file)) {
555                     for (int read; (read = transformData.read(buffer, 0, buffer.length)) != -1; ) {
556                         baos.write(buffer, 0, read);
557                     }
558                 }
559 
560                 byte[] bytes = baos.toByteArray();
561                 transporter.put(new PutTask(path).setDataBytes(bytes).setListener(listener));
562                 uploadChecksums(file, bytes);
563             } else {
564                 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
565                 uploadChecksums(file, null);
566             }
567         }
568 
569         /**
570          * @param file  source
571          * @param bytes transformed data from file or {@code null}
572          */
573         private void uploadChecksums(File file, byte[] bytes) {
574             if (checksumLocations.isEmpty()) {
575                 return;
576             }
577             try {
578                 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
579                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
580                     algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
581                 }
582 
583                 Map<String, String> sumsByAlgo;
584                 if (bytes != null) {
585                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
586                 } else {
587                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
588                 }
589 
590                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
591                     uploadChecksum(
592                             checksumLocation.getLocation(),
593                             sumsByAlgo.get(checksumLocation
594                                     .getChecksumAlgorithmFactory()
595                                     .getName()));
596                 }
597             } catch (IOException e) {
598                 LOGGER.warn("Failed to upload checksums for {}", file, e);
599                 throw new UncheckedIOException(e);
600             }
601         }
602 
603         private void uploadChecksum(URI location, Object checksum) {
604             try {
605                 if (checksum instanceof Exception) {
606                     throw (Exception) checksum;
607                 }
608                 transporter.put(new PutTask(location).setDataString((String) checksum));
609             } catch (Exception e) {
610                 LOGGER.warn("Failed to upload checksum to {}", location, e);
611             }
612         }
613     }
614 }