View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.connector.basic;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.File;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.UncheckedIOException;
26  import java.net.URI;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import org.eclipse.aether.ConfigurationProperties;
36  import org.eclipse.aether.RepositorySystemSession;
37  import org.eclipse.aether.RequestTrace;
38  import org.eclipse.aether.metadata.Metadata;
39  import org.eclipse.aether.repository.RemoteRepository;
40  import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
41  import org.eclipse.aether.spi.connector.ArtifactDownload;
42  import org.eclipse.aether.spi.connector.ArtifactUpload;
43  import org.eclipse.aether.spi.connector.MetadataDownload;
44  import org.eclipse.aether.spi.connector.MetadataUpload;
45  import org.eclipse.aether.spi.connector.RepositoryConnector;
46  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
47  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
48  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
49  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
50  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52  import org.eclipse.aether.spi.connector.transport.GetTask;
53  import org.eclipse.aether.spi.connector.transport.PeekTask;
54  import org.eclipse.aether.spi.connector.transport.PutTask;
55  import org.eclipse.aether.spi.connector.transport.Transporter;
56  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57  import org.eclipse.aether.spi.io.FileProcessor;
58  import org.eclipse.aether.transfer.ChecksumFailureException;
59  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61  import org.eclipse.aether.transfer.NoTransporterException;
62  import org.eclipse.aether.transfer.TransferEvent;
63  import org.eclipse.aether.transfer.TransferResource;
64  import org.eclipse.aether.transform.FileTransformer;
65  import org.eclipse.aether.util.ConfigUtils;
66  import org.eclipse.aether.util.FileUtils;
67  import org.eclipse.aether.util.concurrency.ExecutorUtils;
68  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
69  import org.slf4j.Logger;
70  import org.slf4j.LoggerFactory;
71  
72  import static java.util.Objects.requireNonNull;
73  
74  /**
75   *
76   */
77  final class BasicRepositoryConnector implements RepositoryConnector {
78  
79      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
80  
81      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82  
83      private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
84  
85      private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
86  
87      private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
88  
89      private final FileProcessor fileProcessor;
90  
91      private final RemoteRepository repository;
92  
93      private final RepositorySystemSession session;
94  
95      private final Transporter transporter;
96  
97      private final RepositoryLayout layout;
98  
99      private final ChecksumPolicyProvider checksumPolicyProvider;
100 
101     private final int maxThreads;
102 
103     private final boolean smartChecksums;
104 
105     private final boolean parallelPut;
106 
107     private final boolean persistedChecksums;
108 
109     private Executor executor;
110 
111     private final AtomicBoolean closed;
112 
113     BasicRepositoryConnector(
114             RepositorySystemSession session,
115             RemoteRepository repository,
116             TransporterProvider transporterProvider,
117             RepositoryLayoutProvider layoutProvider,
118             ChecksumPolicyProvider checksumPolicyProvider,
119             FileProcessor fileProcessor,
120             Map<String, ProvidedChecksumsSource> providedChecksumsSources)
121             throws NoRepositoryConnectorException {
122         try {
123             layout = layoutProvider.newRepositoryLayout(session, repository);
124         } catch (NoRepositoryLayoutException e) {
125             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
126         }
127         try {
128             transporter = transporterProvider.newTransporter(session, repository);
129         } catch (NoTransporterException e) {
130             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
131         }
132         this.checksumPolicyProvider = checksumPolicyProvider;
133 
134         this.session = session;
135         this.repository = repository;
136         this.fileProcessor = fileProcessor;
137         this.providedChecksumsSources = providedChecksumsSources;
138         this.closed = new AtomicBoolean(false);
139 
140         maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
141         smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
142         parallelPut = ConfigUtils.getBoolean(
143                 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
144         persistedChecksums = ConfigUtils.getBoolean(
145                 session,
146                 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
147                 ConfigurationProperties.PERSISTED_CHECKSUMS);
148     }
149 
150     private Executor getExecutor(int tasks) {
151         if (maxThreads <= 1) {
152             return ExecutorUtils.DIRECT_EXECUTOR;
153         }
154         if (tasks <= 1) {
155             return ExecutorUtils.DIRECT_EXECUTOR;
156         }
157         if (executor == null) {
158             executor =
159                     ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
160         }
161         return executor;
162     }
163 
164     @Override
165     public void close() {
166         if (closed.compareAndSet(false, true)) {
167             ExecutorUtils.shutdown(executor);
168             transporter.close();
169         }
170     }
171 
172     private void failIfClosed() {
173         if (closed.get()) {
174             throw new IllegalStateException("connector already closed");
175         }
176     }
177 
178     @Override
179     public void get(
180             Collection<? extends ArtifactDownload> artifactDownloads,
181             Collection<? extends MetadataDownload> metadataDownloads) {
182         failIfClosed();
183 
184         Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
185         Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
186 
187         Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
188         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
189         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
190 
191         for (MetadataDownload transfer : safeMetadataDownloads) {
192             URI location = layout.getLocation(transfer.getMetadata(), false);
193 
194             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
195             TransferEvent.Builder builder = newEventBuilder(resource, false, false);
196             MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
197 
198             ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
199             List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
200             if (checksumPolicy != null) {
201                 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
202             }
203 
204             Runnable task = new GetTaskRunner(
205                     location,
206                     transfer.getFile(),
207                     checksumPolicy,
208                     checksumAlgorithmFactories,
209                     checksumLocations,
210                     null,
211                     listener);
212             executor.execute(errorForwarder.wrap(task));
213         }
214 
215         for (ArtifactDownload transfer : safeArtifactDownloads) {
216             Map<String, String> providedChecksums = Collections.emptyMap();
217             for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
218                 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
219                         session, transfer, repository, checksumAlgorithmFactories);
220 
221                 if (provided != null) {
222                     providedChecksums = provided;
223                     break;
224                 }
225             }
226 
227             URI location = layout.getLocation(transfer.getArtifact(), false);
228 
229             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
230             TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
231             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
232 
233             Runnable task;
234             if (transfer.isExistenceCheck()) {
235                 task = new PeekTaskRunner(location, listener);
236             } else {
237                 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
238                 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
239                 if (checksumPolicy != null) {
240                     checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
241                 }
242 
243                 task = new GetTaskRunner(
244                         location,
245                         transfer.getFile(),
246                         checksumPolicy,
247                         checksumAlgorithmFactories,
248                         checksumLocations,
249                         providedChecksums,
250                         listener);
251             }
252             executor.execute(errorForwarder.wrap(task));
253         }
254 
255         errorForwarder.await();
256     }
257 
258     @Override
259     public void put(
260             Collection<? extends ArtifactUpload> artifactUploads,
261             Collection<? extends MetadataUpload> metadataUploads) {
262         failIfClosed();
263 
264         Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
265         Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
266 
267         Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
268         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
269 
270         for (ArtifactUpload transfer : safeArtifactUploads) {
271             URI location = layout.getLocation(transfer.getArtifact(), true);
272 
273             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
274             TransferEvent.Builder builder = newEventBuilder(resource, true, false);
275             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
276 
277             List<RepositoryLayout.ChecksumLocation> checksumLocations =
278                     layout.getChecksumLocations(transfer.getArtifact(), true, location);
279 
280             Runnable task = new PutTaskRunner(
281                     location, transfer.getFile(), transfer.getFileTransformer(), checksumLocations, listener);
282 
283             executor.execute(errorForwarder.wrap(task));
284         }
285 
286         errorForwarder.await(); // make sure all artifacts are PUT before we go with Metadata
287 
288         for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
289             for (MetadataUpload transfer : transferGroup) {
290                 URI location = layout.getLocation(transfer.getMetadata(), true);
291 
292                 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
293                 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
294                 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
295 
296                 List<RepositoryLayout.ChecksumLocation> checksumLocations =
297                         layout.getChecksumLocations(transfer.getMetadata(), true, location);
298 
299                 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
300 
301                 executor.execute(errorForwarder.wrap(task));
302             }
303 
304             errorForwarder.await(); // make sure each group is done before starting next group
305         }
306     }
307 
308     /**
309      * This method "groups" the Metadata to be uploaded by their level (version, artifact, group and root). This is MUST
310      * as clients consume metadata in opposite order (root, group, artifact, version), and hence, we must deploy and
311      * ensure (in case of parallel deploy) that all V level metadata is deployed before we start deploying A level, etc.
312      */
313     private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
314         ArrayList<MetadataUpload> v = new ArrayList<>();
315         ArrayList<MetadataUpload> a = new ArrayList<>();
316         ArrayList<MetadataUpload> g = new ArrayList<>();
317         ArrayList<MetadataUpload> r = new ArrayList<>();
318 
319         for (MetadataUpload transfer : metadataUploads) {
320             Metadata metadata = transfer.getMetadata();
321             if (!"".equals(metadata.getVersion())) {
322                 v.add(transfer);
323             } else if (!"".equals(metadata.getArtifactId())) {
324                 a.add(transfer);
325             } else if (!"".equals(metadata.getGroupId())) {
326                 g.add(transfer);
327             } else {
328                 r.add(transfer);
329             }
330         }
331 
332         List<List<MetadataUpload>> result = new ArrayList<>(4);
333         if (!v.isEmpty()) {
334             result.add(v);
335         }
336         if (!a.isEmpty()) {
337             result.add(a);
338         }
339         if (!g.isEmpty()) {
340             result.add(g);
341         }
342         if (!r.isEmpty()) {
343             result.add(r);
344         }
345         return result;
346     }
347 
348     private static <T> Collection<T> safe(Collection<T> items) {
349         return (items != null) ? items : Collections.emptyList();
350     }
351 
352     private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
353         return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
354     }
355 
356     private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
357         TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
358         if (upload) {
359             builder.setRequestType(TransferEvent.RequestType.PUT);
360         } else if (!peek) {
361             builder.setRequestType(TransferEvent.RequestType.GET);
362         } else {
363             builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
364         }
365         return builder;
366     }
367 
368     private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
369         return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
370     }
371 
372     @Override
373     public String toString() {
374         return String.valueOf(repository);
375     }
376 
377     abstract class TaskRunner implements Runnable {
378 
379         protected final URI path;
380 
381         protected final TransferTransportListener<?> listener;
382 
383         TaskRunner(URI path, TransferTransportListener<?> listener) {
384             this.path = path;
385             this.listener = listener;
386         }
387 
388         @Override
389         public void run() {
390             try {
391                 listener.transferInitiated();
392                 runTask();
393                 listener.transferSucceeded();
394             } catch (Exception e) {
395                 listener.transferFailed(e, transporter.classify(e));
396             }
397         }
398 
399         protected abstract void runTask() throws Exception;
400     }
401 
402     class PeekTaskRunner extends TaskRunner {
403 
404         PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
405             super(path, listener);
406         }
407 
408         @Override
409         protected void runTask() throws Exception {
410             transporter.peek(new PeekTask(path));
411         }
412     }
413 
414     class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
415 
416         private final File file;
417 
418         private final ChecksumValidator checksumValidator;
419 
420         GetTaskRunner(
421                 URI path,
422                 File file,
423                 ChecksumPolicy checksumPolicy,
424                 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
425                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
426                 Map<String, String> providedChecksums,
427                 TransferTransportListener<?> listener) {
428             super(path, listener);
429             this.file = requireNonNull(file, "destination file cannot be null");
430             checksumValidator = new ChecksumValidator(
431                     file,
432                     checksumAlgorithmFactories,
433                     fileProcessor,
434                     this,
435                     checksumPolicy,
436                     providedChecksums,
437                     safe(checksumLocations));
438         }
439 
440         @Override
441         public boolean fetchChecksum(URI remote, File local) throws Exception {
442             try {
443                 transporter.get(new GetTask(remote).setDataFile(local));
444             } catch (Exception e) {
445                 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
446                     return false;
447                 }
448                 throw e;
449             }
450             return true;
451         }
452 
453         @Override
454         protected void runTask() throws Exception {
455             try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
456                 final File tmp = tempFile.getPath().toFile();
457                 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
458                 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
459                     GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
460                     transporter.get(task);
461                     try {
462                         checksumValidator.validate(
463                                 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
464                         break;
465                     } catch (ChecksumFailureException e) {
466                         boolean retry = trial < lastTrial && e.isRetryWorthy();
467                         if (!retry && !checksumValidator.handle(e)) {
468                             throw e;
469                         }
470                         listener.transferCorrupted(e);
471                         if (retry) {
472                             checksumValidator.retry();
473                         } else {
474                             break;
475                         }
476                     }
477                 }
478                 tempFile.move();
479                 if (persistedChecksums) {
480                     checksumValidator.commit();
481                 }
482             }
483         }
484     }
485 
486     class PutTaskRunner extends TaskRunner {
487 
488         private final File file;
489 
490         private final FileTransformer fileTransformer;
491 
492         private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
493 
494         PutTaskRunner(
495                 URI path,
496                 File file,
497                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
498                 TransferTransportListener<?> listener) {
499             this(path, file, null, checksumLocations, listener);
500         }
501 
502         /**
503          * <strong>IMPORTANT</strong> When using a fileTransformer, the content of the file is stored in memory to
504          * ensure that file content and checksums stay in sync!
505          *
506          * @param path
507          * @param file
508          * @param fileTransformer
509          * @param checksumLocations
510          * @param listener
511          */
512         PutTaskRunner(
513                 URI path,
514                 File file,
515                 FileTransformer fileTransformer,
516                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
517                 TransferTransportListener<?> listener) {
518             super(path, listener);
519             this.file = requireNonNull(file, "source file cannot be null");
520             this.fileTransformer = fileTransformer;
521             this.checksumLocations = safe(checksumLocations);
522         }
523 
524         @SuppressWarnings("checkstyle:innerassignment")
525         @Override
526         protected void runTask() throws Exception {
527             if (fileTransformer != null) {
528                 // transform data once to byte array, ensure constant data for checksum
529                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
530                 byte[] buffer = new byte[1024];
531 
532                 try (InputStream transformData = fileTransformer.transformData(file)) {
533                     for (int read; (read = transformData.read(buffer, 0, buffer.length)) != -1; ) {
534                         baos.write(buffer, 0, read);
535                     }
536                 }
537 
538                 byte[] bytes = baos.toByteArray();
539                 transporter.put(new PutTask(path).setDataBytes(bytes).setListener(listener));
540                 uploadChecksums(file, bytes);
541             } else {
542                 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
543                 uploadChecksums(file, null);
544             }
545         }
546 
547         /**
548          * @param file  source
549          * @param bytes transformed data from file or {@code null}
550          */
551         private void uploadChecksums(File file, byte[] bytes) {
552             if (checksumLocations.isEmpty()) {
553                 return;
554             }
555             try {
556                 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
557                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
558                     algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
559                 }
560 
561                 Map<String, String> sumsByAlgo;
562                 if (bytes != null) {
563                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
564                 } else {
565                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
566                 }
567 
568                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
569                     uploadChecksum(
570                             checksumLocation.getLocation(),
571                             sumsByAlgo.get(checksumLocation
572                                     .getChecksumAlgorithmFactory()
573                                     .getName()));
574                 }
575             } catch (IOException e) {
576                 LOGGER.warn("Failed to upload checksums for {}", file, e);
577                 throw new UncheckedIOException(e);
578             }
579         }
580 
581         private void uploadChecksum(URI location, Object checksum) {
582             try {
583                 if (checksum instanceof Exception) {
584                     throw (Exception) checksum;
585                 }
586                 transporter.put(new PutTask(location).setDataString((String) checksum));
587             } catch (Exception e) {
588                 LOGGER.warn("Failed to upload checksum to {}", location, e);
589             }
590         }
591     }
592 }