View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.wagon;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.lang.reflect.InvocationTargetException;
26  import java.lang.reflect.Method;
27  import java.nio.file.Files;
28  import java.nio.file.StandardCopyOption;
29  import java.util.Locale;
30  import java.util.Map;
31  import java.util.Properties;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.maven.wagon.ConnectionException;
37  import org.apache.maven.wagon.ResourceDoesNotExistException;
38  import org.apache.maven.wagon.StreamingWagon;
39  import org.apache.maven.wagon.Wagon;
40  import org.apache.maven.wagon.WagonException;
41  import org.apache.maven.wagon.authentication.AuthenticationInfo;
42  import org.apache.maven.wagon.proxy.ProxyInfo;
43  import org.apache.maven.wagon.proxy.ProxyInfoProvider;
44  import org.apache.maven.wagon.repository.Repository;
45  import org.apache.maven.wagon.repository.RepositoryPermissions;
46  import org.eclipse.aether.ConfigurationProperties;
47  import org.eclipse.aether.RepositorySystemSession;
48  import org.eclipse.aether.repository.AuthenticationContext;
49  import org.eclipse.aether.repository.Proxy;
50  import org.eclipse.aether.repository.RemoteRepository;
51  import org.eclipse.aether.spi.connector.transport.GetTask;
52  import org.eclipse.aether.spi.connector.transport.PeekTask;
53  import org.eclipse.aether.spi.connector.transport.PutTask;
54  import org.eclipse.aether.spi.connector.transport.TransportTask;
55  import org.eclipse.aether.spi.connector.transport.Transporter;
56  import org.eclipse.aether.transfer.NoTransporterException;
57  import org.eclipse.aether.util.ConfigUtils;
58  import org.eclipse.aether.util.FileUtils;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  
62  import static java.util.Objects.requireNonNull;
63  
64  /**
65   * A transporter using Maven Wagon.
66   */
67  final class WagonTransporter implements Transporter {
68  
69      private static final String CONFIG_PROP_CONFIG = "aether.connector.wagon.config";
70  
71      private static final String CONFIG_PROP_FILE_MODE = "aether.connector.perms.fileMode";
72  
73      private static final String CONFIG_PROP_DIR_MODE = "aether.connector.perms.dirMode";
74  
75      private static final String CONFIG_PROP_GROUP = "aether.connector.perms.group";
76  
77      private static final Logger LOGGER = LoggerFactory.getLogger(WagonTransporter.class);
78  
79      private final RemoteRepository repository;
80  
81      private final RepositorySystemSession session;
82  
83      private final AuthenticationContext repoAuthContext;
84  
85      private final AuthenticationContext proxyAuthContext;
86  
87      private final WagonProvider wagonProvider;
88  
89      private final WagonConfigurator wagonConfigurator;
90  
91      private final String wagonHint;
92  
93      private final Repository wagonRepo;
94  
95      private final AuthenticationInfo wagonAuth;
96  
97      private final ProxyInfoProvider wagonProxy;
98  
99      private final Properties headers;
100 
101     private final Queue<Wagon> wagons = new ConcurrentLinkedQueue<>();
102 
103     private final AtomicBoolean closed = new AtomicBoolean();
104 
105     WagonTransporter(
106             WagonProvider wagonProvider,
107             WagonConfigurator wagonConfigurator,
108             RemoteRepository repository,
109             RepositorySystemSession session)
110             throws NoTransporterException {
111         this.wagonProvider = wagonProvider;
112         this.wagonConfigurator = wagonConfigurator;
113         this.repository = repository;
114         this.session = session;
115 
116         wagonRepo = new Repository(repository.getId(), repository.getUrl());
117         wagonRepo.setPermissions(getPermissions(repository.getId(), session));
118 
119         wagonHint = wagonRepo.getProtocol().toLowerCase(Locale.ENGLISH);
120         if (wagonHint.isEmpty()) {
121             throw new NoTransporterException(repository);
122         }
123 
124         try {
125             wagons.add(lookupWagon());
126         } catch (Exception e) {
127             LOGGER.debug("No transport", e);
128             throw new NoTransporterException(repository, e);
129         }
130 
131         repoAuthContext = AuthenticationContext.forRepository(session, repository);
132         proxyAuthContext = AuthenticationContext.forProxy(session, repository);
133 
134         wagonAuth = getAuthenticationInfo(repoAuthContext);
135         wagonProxy = getProxy(repository, proxyAuthContext);
136 
137         headers = new Properties();
138         headers.put(
139                 "User-Agent",
140                 ConfigUtils.getString(
141                         session, ConfigurationProperties.DEFAULT_USER_AGENT, ConfigurationProperties.USER_AGENT));
142         Map<?, ?> headers = ConfigUtils.getMap(
143                 session,
144                 null,
145                 ConfigurationProperties.HTTP_HEADERS + "." + repository.getId(),
146                 ConfigurationProperties.HTTP_HEADERS);
147         if (headers != null) {
148             this.headers.putAll(headers);
149         }
150     }
151 
152     private static RepositoryPermissions getPermissions(String repoId, RepositorySystemSession session) {
153         RepositoryPermissions result = null;
154 
155         RepositoryPermissions perms = new RepositoryPermissions();
156 
157         String suffix = '.' + repoId;
158 
159         String fileMode = ConfigUtils.getString(session, null, CONFIG_PROP_FILE_MODE + suffix);
160         if (fileMode != null) {
161             perms.setFileMode(fileMode);
162             result = perms;
163         }
164 
165         String dirMode = ConfigUtils.getString(session, null, CONFIG_PROP_DIR_MODE + suffix);
166         if (dirMode != null) {
167             perms.setDirectoryMode(dirMode);
168             result = perms;
169         }
170 
171         String group = ConfigUtils.getString(session, null, CONFIG_PROP_GROUP + suffix);
172         if (group != null) {
173             perms.setGroup(group);
174             result = perms;
175         }
176 
177         return result;
178     }
179 
180     private AuthenticationInfo getAuthenticationInfo(final AuthenticationContext authContext) {
181         AuthenticationInfo auth = null;
182 
183         if (authContext != null) {
184             auth = new AuthenticationInfo() {
185                 @Override
186                 public String getUserName() {
187                     return authContext.get(AuthenticationContext.USERNAME);
188                 }
189 
190                 @Override
191                 public String getPassword() {
192                     return authContext.get(AuthenticationContext.PASSWORD);
193                 }
194 
195                 @Override
196                 public String getPrivateKey() {
197                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PATH);
198                 }
199 
200                 @Override
201                 public String getPassphrase() {
202                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PASSPHRASE);
203                 }
204             };
205         }
206 
207         return auth;
208     }
209 
210     private ProxyInfoProvider getProxy(RemoteRepository repository, final AuthenticationContext authContext) {
211         ProxyInfoProvider proxy = null;
212 
213         Proxy p = repository.getProxy();
214         if (p != null) {
215             final ProxyInfo prox;
216             if (authContext != null) {
217                 prox = new ProxyInfo() {
218                     @Override
219                     public String getUserName() {
220                         return authContext.get(AuthenticationContext.USERNAME);
221                     }
222 
223                     @Override
224                     public String getPassword() {
225                         return authContext.get(AuthenticationContext.PASSWORD);
226                     }
227 
228                     @Override
229                     public String getNtlmDomain() {
230                         return authContext.get(AuthenticationContext.NTLM_DOMAIN);
231                     }
232 
233                     @Override
234                     public String getNtlmHost() {
235                         return authContext.get(AuthenticationContext.NTLM_WORKSTATION);
236                     }
237                 };
238             } else {
239                 prox = new ProxyInfo();
240             }
241             prox.setType(p.getType());
242             prox.setHost(p.getHost());
243             prox.setPort(p.getPort());
244 
245             proxy = protocol -> prox;
246         }
247 
248         return proxy;
249     }
250 
251     private Wagon lookupWagon() throws Exception {
252         return wagonProvider.lookup(wagonHint);
253     }
254 
255     private void releaseWagon(Wagon wagon) {
256         wagonProvider.release(wagon);
257     }
258 
259     private void connectWagon(Wagon wagon) throws WagonException {
260         if (!headers.isEmpty()) {
261             try {
262                 Method setHttpHeaders = wagon.getClass().getMethod("setHttpHeaders", Properties.class);
263                 setHttpHeaders.invoke(wagon, headers);
264             } catch (NoSuchMethodException e) {
265                 // normal for non-http wagons
266             } catch (InvocationTargetException | IllegalAccessException | RuntimeException e) {
267                 LOGGER.debug(
268                         "Could not set user agent for Wagon {}",
269                         wagon.getClass().getName(),
270                         e);
271             }
272         }
273 
274         int connectTimeout = ConfigUtils.getInteger(
275                 session, ConfigurationProperties.DEFAULT_CONNECT_TIMEOUT, ConfigurationProperties.CONNECT_TIMEOUT);
276         int requestTimeout = ConfigUtils.getInteger(
277                 session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT, ConfigurationProperties.REQUEST_TIMEOUT);
278 
279         wagon.setTimeout(Math.max(Math.max(connectTimeout, requestTimeout), 0));
280 
281         wagon.setInteractive(ConfigUtils.getBoolean(
282                 session, ConfigurationProperties.DEFAULT_INTERACTIVE, ConfigurationProperties.INTERACTIVE));
283 
284         Object configuration = ConfigUtils.getObject(session, null, CONFIG_PROP_CONFIG + "." + repository.getId());
285         if (configuration != null && wagonConfigurator != null) {
286             try {
287                 wagonConfigurator.configure(wagon, configuration);
288             } catch (Exception e) {
289                 LOGGER.warn(
290                         "Could not apply configuration for {} to Wagon {}",
291                         repository.getId(),
292                         wagon.getClass().getName(),
293                         e);
294             }
295         }
296 
297         wagon.connect(wagonRepo, wagonAuth, wagonProxy);
298     }
299 
300     private void disconnectWagon(Wagon wagon) {
301         try {
302             if (wagon != null) {
303                 wagon.disconnect();
304             }
305         } catch (ConnectionException e) {
306             LOGGER.debug("Could not disconnect Wagon {}", wagon, e);
307         }
308     }
309 
310     private Wagon pollWagon() throws Exception {
311         Wagon wagon = wagons.poll();
312 
313         if (wagon == null) {
314             try {
315                 wagon = lookupWagon();
316                 connectWagon(wagon);
317             } catch (Exception e) {
318                 releaseWagon(wagon);
319                 throw e;
320             }
321         } else if (wagon.getRepository() == null) {
322             try {
323                 connectWagon(wagon);
324             } catch (Exception e) {
325                 wagons.add(wagon);
326                 throw e;
327             }
328         }
329 
330         return wagon;
331     }
332 
333     @Override
334     public int classify(Throwable error) {
335         if (error instanceof ResourceDoesNotExistException) {
336             return ERROR_NOT_FOUND;
337         }
338         return ERROR_OTHER;
339     }
340 
341     @Override
342     public void peek(PeekTask task) throws Exception {
343         execute(task, new PeekTaskRunner(task));
344     }
345 
346     @Override
347     public void get(GetTask task) throws Exception {
348         execute(task, new GetTaskRunner(task));
349     }
350 
351     @Override
352     public void put(PutTask task) throws Exception {
353         execute(task, new PutTaskRunner(task));
354     }
355 
356     private void execute(TransportTask task, TaskRunner runner) throws Exception {
357         requireNonNull(task, "task cannot be null");
358 
359         if (closed.get()) {
360             throw new IllegalStateException("transporter closed, cannot execute task " + task);
361         }
362         try {
363             WagonTransferListener listener = new WagonTransferListener(task.getListener());
364             Wagon wagon = pollWagon();
365             try {
366                 wagon.addTransferListener(listener);
367                 runner.run(wagon);
368             } finally {
369                 wagon.removeTransferListener(listener);
370                 wagons.add(wagon);
371             }
372         } catch (RuntimeException e) {
373             throw WagonCancelledException.unwrap(e);
374         }
375     }
376 
377     @Override
378     public void close() {
379         if (closed.compareAndSet(false, true)) {
380             AuthenticationContext.close(repoAuthContext);
381             AuthenticationContext.close(proxyAuthContext);
382 
383             for (Wagon wagon = wagons.poll(); wagon != null; wagon = wagons.poll()) {
384                 disconnectWagon(wagon);
385                 releaseWagon(wagon);
386             }
387         }
388     }
389 
390     private interface TaskRunner {
391 
392         void run(Wagon wagon) throws IOException, WagonException;
393     }
394 
395     private static class PeekTaskRunner implements TaskRunner {
396 
397         private final PeekTask task;
398 
399         PeekTaskRunner(PeekTask task) {
400             this.task = task;
401         }
402 
403         @Override
404         public void run(Wagon wagon) throws WagonException {
405             String src = task.getLocation().toString();
406             if (!wagon.resourceExists(src)) {
407                 throw new ResourceDoesNotExistException(
408                         "Could not find " + src + " in " + wagon.getRepository().getUrl());
409             }
410         }
411     }
412 
413     private static class GetTaskRunner implements TaskRunner {
414 
415         private final GetTask task;
416 
417         GetTaskRunner(GetTask task) {
418             this.task = task;
419         }
420 
421         @Override
422         public void run(Wagon wagon) throws IOException, WagonException {
423             final String src = task.getLocation().toString();
424             final File file = task.getDataFile();
425             if (file == null && wagon instanceof StreamingWagon) {
426                 try (OutputStream dst = task.newOutputStream()) {
427                     ((StreamingWagon) wagon).getToStream(src, dst);
428                 }
429             } else {
430                 // if file == null -> $TMP used, otherwise we place tmp file next to file
431                 try (FileUtils.TempFile tempFile =
432                         file == null ? FileUtils.newTempFile() : FileUtils.newTempFile(file.toPath())) {
433                     File dst = tempFile.getPath().toFile();
434                     wagon.get(src, dst);
435                     /*
436                      * NOTE: Wagon (1.0-beta-6) doesn't create the destination file when transferring a 0-byte
437                      * resource. So if the resource we asked for didn't cause any exception but doesn't show up in
438                      * the dst file either, Wagon tells us in its weird way the file is empty.
439                      */
440                     if (!dst.exists() && !dst.createNewFile()) {
441                         throw new IOException(String.format("Failure creating file '%s'.", dst.getAbsolutePath()));
442                     }
443 
444                     if (file != null) {
445                         ((FileUtils.CollocatedTempFile) tempFile).move();
446                     } else {
447                         try (OutputStream outputStream = task.newOutputStream()) {
448                             Files.copy(dst.toPath(), outputStream);
449                         }
450                     }
451                 }
452             }
453         }
454     }
455 
456     private static class PutTaskRunner implements TaskRunner {
457 
458         private final PutTask task;
459 
460         PutTaskRunner(PutTask task) {
461             this.task = task;
462         }
463 
464         @Override
465         public void run(Wagon wagon) throws WagonException, IOException {
466             final String dst = task.getLocation().toString();
467             final File file = task.getDataFile();
468             if (file == null && wagon instanceof StreamingWagon) {
469                 try (InputStream src = task.newInputStream()) {
470                     // StreamingWagon uses an internal buffer on src input stream.
471                     ((StreamingWagon) wagon).putFromStream(src, dst, task.getDataLength(), -1);
472                 }
473             } else if (file == null) {
474                 try (FileUtils.TempFile tempFile = FileUtils.newTempFile()) {
475                     try (InputStream inputStream = task.newInputStream()) {
476                         Files.copy(inputStream, tempFile.getPath(), StandardCopyOption.REPLACE_EXISTING);
477                     }
478                     wagon.put(tempFile.getPath().toFile(), dst);
479                 }
480             } else {
481                 wagon.put(file, dst);
482             }
483         }
484     }
485 }