View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.wagon;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.lang.reflect.InvocationTargetException;
26  import java.lang.reflect.Method;
27  import java.nio.file.Files;
28  import java.nio.file.StandardCopyOption;
29  import java.util.Locale;
30  import java.util.Map;
31  import java.util.Properties;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.maven.wagon.ConnectionException;
37  import org.apache.maven.wagon.ResourceDoesNotExistException;
38  import org.apache.maven.wagon.StreamingWagon;
39  import org.apache.maven.wagon.Wagon;
40  import org.apache.maven.wagon.WagonException;
41  import org.apache.maven.wagon.authentication.AuthenticationInfo;
42  import org.apache.maven.wagon.proxy.ProxyInfo;
43  import org.apache.maven.wagon.proxy.ProxyInfoProvider;
44  import org.apache.maven.wagon.repository.Repository;
45  import org.apache.maven.wagon.repository.RepositoryPermissions;
46  import org.eclipse.aether.ConfigurationProperties;
47  import org.eclipse.aether.RepositorySystemSession;
48  import org.eclipse.aether.repository.AuthenticationContext;
49  import org.eclipse.aether.repository.Proxy;
50  import org.eclipse.aether.repository.RemoteRepository;
51  import org.eclipse.aether.spi.connector.transport.GetTask;
52  import org.eclipse.aether.spi.connector.transport.PeekTask;
53  import org.eclipse.aether.spi.connector.transport.PutTask;
54  import org.eclipse.aether.spi.connector.transport.TransportTask;
55  import org.eclipse.aether.spi.connector.transport.Transporter;
56  import org.eclipse.aether.spi.io.PathProcessor;
57  import org.eclipse.aether.transfer.NoTransporterException;
58  import org.eclipse.aether.util.ConfigUtils;
59  import org.eclipse.aether.util.connector.transport.http.HttpTransporterUtils;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  import static java.util.Objects.requireNonNull;
64  import static org.eclipse.aether.transport.wagon.WagonTransporterConfigurationKeys.CONFIG_PROP_CONFIG;
65  import static org.eclipse.aether.transport.wagon.WagonTransporterConfigurationKeys.CONFIG_PROP_DIR_MODE;
66  import static org.eclipse.aether.transport.wagon.WagonTransporterConfigurationKeys.CONFIG_PROP_FILE_MODE;
67  import static org.eclipse.aether.transport.wagon.WagonTransporterConfigurationKeys.CONFIG_PROP_GROUP;
68  
69  /**
70   * A transporter using Maven Wagon.
71   */
72  final class WagonTransporter implements Transporter {
73      private static final Logger LOGGER = LoggerFactory.getLogger(WagonTransporter.class);
74  
75      private final RemoteRepository repository;
76  
77      private final RepositorySystemSession session;
78  
79      private final PathProcessor pathProcessor;
80  
81      private final AuthenticationContext repoAuthContext;
82  
83      private final AuthenticationContext proxyAuthContext;
84  
85      private final WagonProvider wagonProvider;
86  
87      private final WagonConfigurator wagonConfigurator;
88  
89      private final String wagonHint;
90  
91      private final Repository wagonRepo;
92  
93      private final AuthenticationInfo wagonAuth;
94  
95      private final ProxyInfoProvider wagonProxy;
96  
97      private final Properties headers;
98  
99      private final Queue<Wagon> wagons = new ConcurrentLinkedQueue<>();
100 
101     private final AtomicBoolean closed = new AtomicBoolean();
102 
103     WagonTransporter(
104             WagonProvider wagonProvider,
105             WagonConfigurator wagonConfigurator,
106             RemoteRepository repository,
107             RepositorySystemSession session,
108             PathProcessor pathProcessor)
109             throws NoTransporterException {
110         this.wagonProvider = wagonProvider;
111         this.wagonConfigurator = wagonConfigurator;
112         this.repository = repository;
113         this.session = session;
114         this.pathProcessor = pathProcessor;
115 
116         wagonRepo = new Repository(repository.getId(), repository.getUrl());
117         wagonRepo.setPermissions(getPermissions(repository.getId(), session));
118 
119         wagonHint = wagonRepo.getProtocol().toLowerCase(Locale.ENGLISH);
120         if (wagonHint.isEmpty()) {
121             throw new NoTransporterException(repository);
122         }
123 
124         try {
125             wagons.add(lookupWagon());
126         } catch (Exception e) {
127             LOGGER.debug("No transport", e);
128             throw new NoTransporterException(repository, e);
129         }
130 
131         repoAuthContext = AuthenticationContext.forRepository(session, repository);
132         proxyAuthContext = AuthenticationContext.forProxy(session, repository);
133 
134         wagonAuth = getAuthenticationInfo(repoAuthContext);
135         wagonProxy = getProxy(repository, proxyAuthContext);
136 
137         headers = new Properties();
138         headers.put("User-Agent", HttpTransporterUtils.getUserAgent(session, repository));
139         Map<String, String> headers = HttpTransporterUtils.getHttpHeaders(session, repository);
140         if (headers != null) {
141             this.headers.putAll(headers);
142         }
143     }
144 
145     private static RepositoryPermissions getPermissions(String repoId, RepositorySystemSession session) {
146         RepositoryPermissions result = null;
147 
148         RepositoryPermissions perms = new RepositoryPermissions();
149 
150         String suffix = '.' + repoId;
151 
152         String fileMode = ConfigUtils.getString(session, null, CONFIG_PROP_FILE_MODE + suffix);
153         if (fileMode != null) {
154             perms.setFileMode(fileMode);
155             result = perms;
156         }
157 
158         String dirMode = ConfigUtils.getString(session, null, CONFIG_PROP_DIR_MODE + suffix);
159         if (dirMode != null) {
160             perms.setDirectoryMode(dirMode);
161             result = perms;
162         }
163 
164         String group = ConfigUtils.getString(session, null, CONFIG_PROP_GROUP + suffix);
165         if (group != null) {
166             perms.setGroup(group);
167             result = perms;
168         }
169 
170         return result;
171     }
172 
173     private AuthenticationInfo getAuthenticationInfo(final AuthenticationContext authContext) {
174         AuthenticationInfo auth = null;
175 
176         if (authContext != null) {
177             auth = new AuthenticationInfo() {
178                 @Override
179                 public String getUserName() {
180                     return authContext.get(AuthenticationContext.USERNAME);
181                 }
182 
183                 @Override
184                 public String getPassword() {
185                     return authContext.get(AuthenticationContext.PASSWORD);
186                 }
187 
188                 @Override
189                 public String getPrivateKey() {
190                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PATH);
191                 }
192 
193                 @Override
194                 public String getPassphrase() {
195                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PASSPHRASE);
196                 }
197             };
198         }
199 
200         return auth;
201     }
202 
203     private ProxyInfoProvider getProxy(RemoteRepository repository, final AuthenticationContext authContext) {
204         ProxyInfoProvider proxy = null;
205 
206         Proxy p = repository.getProxy();
207         if (p != null) {
208             final ProxyInfo prox;
209             if (authContext != null) {
210                 prox = new ProxyInfo() {
211                     @Override
212                     public String getUserName() {
213                         return authContext.get(AuthenticationContext.USERNAME);
214                     }
215 
216                     @Override
217                     public String getPassword() {
218                         return authContext.get(AuthenticationContext.PASSWORD);
219                     }
220 
221                     @Override
222                     public String getNtlmDomain() {
223                         return authContext.get(AuthenticationContext.NTLM_DOMAIN);
224                     }
225 
226                     @Override
227                     public String getNtlmHost() {
228                         return authContext.get(AuthenticationContext.NTLM_WORKSTATION);
229                     }
230                 };
231             } else {
232                 prox = new ProxyInfo();
233             }
234             prox.setType(p.getType());
235             prox.setHost(p.getHost());
236             prox.setPort(p.getPort());
237 
238             proxy = protocol -> prox;
239         }
240 
241         return proxy;
242     }
243 
244     private Wagon lookupWagon() throws Exception {
245         return wagonProvider.lookup(wagonHint);
246     }
247 
248     private void releaseWagon(Wagon wagon) {
249         wagonProvider.release(wagon);
250     }
251 
252     private void connectWagon(Wagon wagon) throws WagonException {
253         if (!headers.isEmpty()) {
254             try {
255                 Method setHttpHeaders = wagon.getClass().getMethod("setHttpHeaders", Properties.class);
256                 setHttpHeaders.invoke(wagon, headers);
257             } catch (NoSuchMethodException e) {
258                 // normal for non-http wagons
259             } catch (InvocationTargetException | IllegalAccessException | RuntimeException e) {
260                 LOGGER.debug(
261                         "Could not set user agent for Wagon {}",
262                         wagon.getClass().getName(),
263                         e);
264             }
265         }
266 
267         int connectTimeout = HttpTransporterUtils.getHttpConnectTimeout(session, repository);
268         int requestTimeout = HttpTransporterUtils.getHttpRequestTimeout(session, repository);
269 
270         wagon.setTimeout(Math.max(Math.max(connectTimeout, requestTimeout), 0));
271         wagon.setInteractive(ConfigUtils.getBoolean(
272                 session, ConfigurationProperties.DEFAULT_INTERACTIVE, ConfigurationProperties.INTERACTIVE));
273 
274         Object configuration = ConfigUtils.getObject(session, null, CONFIG_PROP_CONFIG + "." + repository.getId());
275         if (configuration != null && wagonConfigurator != null) {
276             try {
277                 wagonConfigurator.configure(wagon, configuration);
278             } catch (Exception e) {
279                 LOGGER.warn(
280                         "Could not apply configuration for {} to Wagon {}",
281                         repository.getId(),
282                         wagon.getClass().getName(),
283                         e);
284             }
285         }
286 
287         wagon.connect(wagonRepo, wagonAuth, wagonProxy);
288     }
289 
290     private void disconnectWagon(Wagon wagon) {
291         try {
292             if (wagon != null) {
293                 wagon.disconnect();
294             }
295         } catch (ConnectionException e) {
296             LOGGER.debug("Could not disconnect Wagon {}", wagon, e);
297         }
298     }
299 
300     private Wagon pollWagon() throws Exception {
301         Wagon wagon = wagons.poll();
302 
303         if (wagon == null) {
304             try {
305                 wagon = lookupWagon();
306                 connectWagon(wagon);
307             } catch (Exception e) {
308                 releaseWagon(wagon);
309                 throw e;
310             }
311         } else if (wagon.getRepository() == null) {
312             try {
313                 connectWagon(wagon);
314             } catch (Exception e) {
315                 wagons.add(wagon);
316                 throw e;
317             }
318         }
319 
320         return wagon;
321     }
322 
323     @Override
324     public int classify(Throwable error) {
325         if (error instanceof ResourceDoesNotExistException) {
326             return ERROR_NOT_FOUND;
327         }
328         return ERROR_OTHER;
329     }
330 
331     @Override
332     public void peek(PeekTask task) throws Exception {
333         execute(task, new PeekTaskRunner(task));
334     }
335 
336     @Override
337     public void get(GetTask task) throws Exception {
338         execute(task, new GetTaskRunner(pathProcessor, task));
339     }
340 
341     @Override
342     public void put(PutTask task) throws Exception {
343         execute(task, new PutTaskRunner(pathProcessor, task));
344     }
345 
346     private void execute(TransportTask task, TaskRunner runner) throws Exception {
347         requireNonNull(task, "task cannot be null");
348 
349         if (closed.get()) {
350             throw new IllegalStateException("transporter closed, cannot execute task " + task);
351         }
352         try {
353             WagonTransferListener listener = new WagonTransferListener(task.getListener());
354             Wagon wagon = pollWagon();
355             try {
356                 wagon.addTransferListener(listener);
357                 runner.run(wagon);
358             } finally {
359                 wagon.removeTransferListener(listener);
360                 wagons.add(wagon);
361             }
362         } catch (RuntimeException e) {
363             throw WagonCancelledException.unwrap(e);
364         }
365     }
366 
367     @Override
368     public void close() {
369         if (closed.compareAndSet(false, true)) {
370             AuthenticationContext.close(repoAuthContext);
371             AuthenticationContext.close(proxyAuthContext);
372 
373             for (Wagon wagon = wagons.poll(); wagon != null; wagon = wagons.poll()) {
374                 disconnectWagon(wagon);
375                 releaseWagon(wagon);
376             }
377         }
378     }
379 
380     private interface TaskRunner {
381 
382         void run(Wagon wagon) throws IOException, WagonException;
383     }
384 
385     private static class PeekTaskRunner implements TaskRunner {
386 
387         private final PeekTask task;
388 
389         PeekTaskRunner(PeekTask task) {
390             this.task = task;
391         }
392 
393         @Override
394         public void run(Wagon wagon) throws WagonException {
395             String src = task.getLocation().toString();
396             if (!wagon.resourceExists(src)) {
397                 throw new ResourceDoesNotExistException(
398                         "Could not find " + src + " in " + wagon.getRepository().getUrl());
399             }
400         }
401     }
402 
403     private static class GetTaskRunner implements TaskRunner {
404 
405         private final PathProcessor pathProcessor;
406 
407         private final GetTask task;
408 
409         GetTaskRunner(PathProcessor pathProcessor, GetTask task) {
410             this.pathProcessor = pathProcessor;
411             this.task = task;
412         }
413 
414         @Override
415         public void run(Wagon wagon) throws IOException, WagonException {
416             final String src = task.getLocation().toString();
417             final File file = task.getDataFile();
418             if (file == null && wagon instanceof StreamingWagon) {
419                 try (OutputStream dst = task.newOutputStream()) {
420                     ((StreamingWagon) wagon).getToStream(src, dst);
421                 }
422             } else {
423                 // if file == null -> $TMP used, otherwise we place tmp file next to file
424                 try (PathProcessor.TempFile tempFile =
425                         file == null ? pathProcessor.newTempFile() : pathProcessor.newTempFile(file.toPath())) {
426                     File dst = tempFile.getPath().toFile();
427                     wagon.get(src, dst);
428                     /*
429                      * NOTE: Wagon (1.0-beta-6) doesn't create the destination file when transferring a 0-byte
430                      * resource. So if the resource we asked for didn't cause any exception but doesn't show up in
431                      * the dst file either, Wagon tells us in its weird way the file is empty.
432                      */
433                     if (!dst.exists() && !dst.createNewFile()) {
434                         throw new IOException(String.format("Failure creating file '%s'.", dst.getAbsolutePath()));
435                     }
436 
437                     if (file != null) {
438                         ((PathProcessor.CollocatedTempFile) tempFile).move();
439                     } else {
440                         try (OutputStream outputStream = task.newOutputStream()) {
441                             Files.copy(dst.toPath(), outputStream);
442                         }
443                     }
444                 }
445             }
446         }
447     }
448 
449     private static class PutTaskRunner implements TaskRunner {
450 
451         private final PathProcessor pathProcessor;
452 
453         private final PutTask task;
454 
455         PutTaskRunner(PathProcessor pathProcessor, PutTask task) {
456             this.pathProcessor = pathProcessor;
457             this.task = task;
458         }
459 
460         @Override
461         public void run(Wagon wagon) throws WagonException, IOException {
462             final String dst = task.getLocation().toString();
463             final File file = task.getDataFile();
464             if (file == null && wagon instanceof StreamingWagon) {
465                 try (InputStream src = task.newInputStream()) {
466                     // StreamingWagon uses an internal buffer on src input stream.
467                     ((StreamingWagon) wagon).putFromStream(src, dst, task.getDataLength(), -1);
468                 }
469             } else if (file == null) {
470                 try (PathProcessor.TempFile tempFile = pathProcessor.newTempFile()) {
471                     try (InputStream inputStream = task.newInputStream()) {
472                         Files.copy(inputStream, tempFile.getPath(), StandardCopyOption.REPLACE_EXISTING);
473                     }
474                     wagon.put(tempFile.getPath().toFile(), dst);
475                 }
476             } else {
477                 wagon.put(file, dst);
478             }
479         }
480     }
481 }