View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.named.ipc;
20  
21  import java.io.Closeable;
22  import java.io.DataInputStream;
23  import java.io.DataOutputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.FileWriter;
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.io.PrintWriter;
30  import java.io.RandomAccessFile;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.Channels;
34  import java.nio.channels.FileLock;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.nio.file.Files;
38  import java.nio.file.Path;
39  import java.nio.file.Paths;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.Collection;
43  import java.util.List;
44  import java.util.Locale;
45  import java.util.Map;
46  import java.util.Objects;
47  import java.util.Random;
48  import java.util.concurrent.CompletableFuture;
49  import java.util.concurrent.ConcurrentHashMap;
50  import java.util.concurrent.ExecutionException;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.concurrent.Future;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicInteger;
57  
58  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66  
67  /**
68   * Client side implementation.
69   * The client instance is bound to a given maven repository.
70   *
71   * @since 2.0.1
72   */
73  public class IpcClient {
74  
75      static final boolean IS_WINDOWS =
76              System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77  
78      protected volatile boolean initialized;
79      protected final Path lockPath;
80      protected final Path logPath;
81      protected final Path syncPath;
82      protected final boolean noFork;
83  
84      protected SocketChannel socket;
85      protected DataOutputStream output;
86      protected DataInputStream input;
87      protected Thread receiver;
88  
89      protected final AtomicInteger requestId = new AtomicInteger();
90      protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
91  
92      IpcClient(Path lockPath, Path logPath, Path syncPath) {
93          this.lockPath = lockPath;
94          this.logPath = logPath;
95          this.syncPath = syncPath;
96          this.noFork = Boolean.parseBoolean(
97                  System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
98      }
99  
100     void ensureInitialized() throws IOException {
101         if (!initialized) {
102             // caller must block on this method
103             synchronized (this) {
104                 if (!initialized) {
105                     socket = createClient();
106                     ByteChannel wrapper = new ByteChannelWrapper(socket);
107                     input = new DataInputStream(Channels.newInputStream(wrapper));
108                     output = new DataOutputStream(Channels.newOutputStream(wrapper));
109                     receiver = new Thread(this::receive);
110                     receiver.setDaemon(true);
111                     receiver.start();
112                     initialized = true;
113                 }
114             }
115         }
116     }
117 
118     SocketChannel createClient() throws IOException {
119         String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
120         SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
121 
122         Path lockPath = this.lockPath.toAbsolutePath().normalize();
123         Path lockFile =
124                 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125         if (!Files.isRegularFile(lockFile)) {
126             if (!Files.isDirectory(lockFile.getParent())) {
127                 Files.createDirectories(lockFile.getParent());
128             }
129         }
130 
131         try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132             try (FileLock lock = raf.getChannel().lock()) {
133                 String line = raf.readLine();
134                 if (line != null) {
135                     try {
136                         SocketAddress address = SocketFamily.fromString(line);
137                         return SocketChannel.open(address);
138                     } catch (IOException e) {
139                         // ignore
140                     }
141                 }
142 
143                 ServerSocketChannel ss = family.openServerSocket();
144                 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145                 String rand = Long.toHexString(new Random().nextLong());
146                 String nativeName =
147                         System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148                 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149 
150                 boolean debug = Boolean.parseBoolean(
151                         System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152                 boolean noNative = Boolean.parseBoolean(System.getProperty(
153                         IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154                 if (!noNative) {
155                     noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156                 }
157                 Closeable close;
158                 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159                 List<String> args = new ArrayList<>();
160                 if (noNative) {
161                     if (noFork) {
162                         IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163                         close = server::close;
164                     } else {
165                         String javaHome = System.getenv("JAVA_HOME");
166                         if (javaHome == null) {
167                             javaHome = System.getProperty("java.home");
168                         }
169                         String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170                         String java = Paths.get(javaHome)
171                                 .resolve(javaCmd)
172                                 .toAbsolutePath()
173                                 .toString();
174                         args.add(java);
175                         String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176                         args.add("-cp");
177                         args.add(classpath);
178                         String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179                         if (timeout != null) {
180                             args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181                         }
182                         args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183                         args.add(IpcServer.class.getName());
184                         args.add(family.name());
185                         args.add(tmpaddr);
186                         args.add(rand);
187                         ProcessBuilder processBuilder = new ProcessBuilder();
188                         ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189                         Files.createDirectories(logPath);
190                         Process process = processBuilder
191                                 .directory(lockFile.getParent().toFile())
192                                 .command(args)
193                                 .redirectOutput(discard)
194                                 .redirectError(discard)
195                                 .start();
196                         close = process::destroyForcibly;
197                     }
198                 } else {
199                     args.add(syncPath.resolve(syncCmd).toString());
200                     String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201                     if (timeout != null) {
202                         args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203                     }
204                     args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205                     args.add(family.name());
206                     args.add(tmpaddr);
207                     args.add(rand);
208                     ProcessBuilder processBuilder = new ProcessBuilder();
209                     ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210                     Files.createDirectories(logPath);
211                     Process process = processBuilder
212                             .directory(lockFile.getParent().toFile())
213                             .command(args)
214                             .redirectOutput(discard)
215                             .redirectError(discard)
216                             .start();
217                     close = process::destroyForcibly;
218                 }
219 
220                 ExecutorService es = Executors.newSingleThreadExecutor();
221                 Future<String[]> future = es.submit(() -> {
222                     SocketChannel s = ss.accept();
223                     DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224                     String rand2 = dis.readUTF();
225                     String addr2 = dis.readUTF();
226                     return new String[] {rand2, addr2};
227                 });
228                 String[] res;
229                 try {
230                     res = future.get(5, TimeUnit.SECONDS);
231                 } catch (Exception e) {
232                     try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233                         writer.println("Arguments:");
234                         args.forEach(writer::println);
235                         writer.println();
236                         writer.println("Exception:");
237                         e.printStackTrace(writer);
238                     }
239                     close.close();
240                     throw e;
241                 } finally {
242                     es.shutdownNow();
243                     ss.close();
244                 }
245                 if (!Objects.equals(rand, res[0])) {
246                     close.close();
247                     throw new IllegalStateException("IpcServer did not respond with the correct random");
248                 }
249 
250                 SocketAddress addr = SocketFamily.fromString(res[1]);
251                 SocketChannel socket = SocketChannel.open(addr);
252 
253                 raf.seek(0);
254                 raf.writeBytes(res[1] + "\n");
255                 return socket;
256             } catch (Exception e) {
257                 throw new RuntimeException("Unable to create and connect to lock server", e);
258             }
259         }
260     }
261 
262     private String getJarPath(Class<?> clazz) {
263         String classpath;
264         String className = clazz.getName().replace('.', '/') + ".class";
265         String url = clazz.getClassLoader().getResource(className).toString();
266         if (url.startsWith("jar:")) {
267             url = url.substring("jar:".length(), url.indexOf("!/"));
268             if (url.startsWith("file:")) {
269                 classpath = url.substring("file:".length());
270             } else {
271                 throw new IllegalStateException();
272             }
273         } else if (url.startsWith("file:")) {
274             classpath = url.substring("file:".length(), url.indexOf(className));
275         } else {
276             throw new IllegalStateException();
277         }
278         if (IS_WINDOWS) {
279             if (classpath.startsWith("/")) {
280                 classpath = classpath.substring(1);
281             }
282             classpath = classpath.replace('/', '\\');
283         }
284 
285         return classpath;
286     }
287 
288     void receive() {
289         try {
290             while (true) {
291                 int id = input.readInt();
292                 int sz = input.readInt();
293                 List<String> s = new ArrayList<>(sz);
294                 for (int i = 0; i < sz; i++) {
295                     s.add(input.readUTF());
296                 }
297                 CompletableFuture<List<String>> f = responses.remove(id);
298                 if (f == null || s.isEmpty()) {
299                     throw new IllegalStateException("Protocol error");
300                 }
301                 f.complete(s);
302             }
303         } catch (EOFException e) {
304             // server is stopped; just quit
305         } catch (Exception e) {
306             close(e);
307         }
308     }
309 
310     List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
311         ensureInitialized();
312         int id = requestId.incrementAndGet();
313         CompletableFuture<List<String>> response = new CompletableFuture<>();
314         responses.put(id, response);
315         synchronized (output) {
316             output.writeInt(id);
317             output.writeInt(request.size());
318             for (String s : request) {
319                 output.writeUTF(s);
320             }
321             output.flush();
322         }
323         try {
324             return response.get(time, unit);
325         } catch (InterruptedException e) {
326             throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
327         } catch (ExecutionException e) {
328             throw new IOException("Execution error", e);
329         }
330     }
331 
332     void close() {
333         if (noFork) {
334             stopServer();
335         }
336         close(new IOException("Closing"));
337     }
338 
339     synchronized void close(Throwable e) {
340         if (socket != null) {
341             try {
342                 socket.close();
343             } catch (IOException t) {
344                 e.addSuppressed(t);
345             }
346             socket = null;
347             input = null;
348             output = null;
349         }
350         if (receiver != null) {
351             receiver.interrupt();
352             try {
353                 receiver.join(1000);
354             } catch (InterruptedException t) {
355                 e.addSuppressed(t);
356             }
357         }
358         responses.values().forEach(f -> f.completeExceptionally(e));
359         responses.clear();
360     }
361 
362     String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
363         RuntimeException error = new RuntimeException("Unable to create new sync context");
364         for (int i = 0; i < 2; i++) {
365             try {
366                 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
367                 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
368                     throw new IOException("Unexpected response: " + response);
369                 }
370                 return response.get(1);
371             } catch (TimeoutException e) {
372                 throw e;
373             } catch (Exception e) {
374                 close(e);
375                 error.addSuppressed(e);
376             }
377         }
378         throw error;
379     }
380 
381     void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
382         try {
383             List<String> req = new ArrayList<>(keys.size() + 2);
384             req.add(REQUEST_ACQUIRE);
385             req.add(contextId);
386             req.addAll(keys);
387             List<String> response = send(req, time, unit);
388             if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
389                 throw new IOException("Unexpected response: " + response);
390             }
391         } catch (TimeoutException e) {
392             throw e;
393         } catch (Exception e) {
394             close(e);
395             throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
396         }
397     }
398 
399     void unlock(String contextId) {
400         try {
401             List<String> response =
402                     send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
403             if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
404                 throw new IOException("Unexpected response: " + response);
405             }
406         } catch (Exception e) {
407             close(e);
408             throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
409         }
410     }
411 
412     /**
413      * To be used in tests to stop server immediately. Should not be used outside of tests.
414      */
415     void stopServer() {
416         try {
417             List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
418             if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
419                 throw new IOException("Unexpected response: " + response);
420             }
421         } catch (Exception e) {
422             close(e);
423             throw new RuntimeException("Unable to stop server", e);
424         }
425     }
426 
427     @Override
428     public String toString() {
429         return "IpcClient{"
430                 + "lockPath=" + lockPath + ","
431                 + "syncServerPath=" + syncPath + ","
432                 + "address='" + getAddress() + "'}";
433     }
434 
435     private String getAddress() {
436         try {
437             return SocketFamily.toString(socket.getLocalAddress());
438         } catch (IOException e) {
439             return "[not bound]";
440         }
441     }
442 }