View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.named.ipc;
20  
21  import java.io.Closeable;
22  import java.io.DataInputStream;
23  import java.io.DataOutputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.FileWriter;
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.io.PrintWriter;
30  import java.io.RandomAccessFile;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.Channels;
34  import java.nio.channels.FileLock;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.nio.file.Files;
38  import java.nio.file.Path;
39  import java.nio.file.Paths;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.Collection;
43  import java.util.List;
44  import java.util.Locale;
45  import java.util.Map;
46  import java.util.Objects;
47  import java.util.Random;
48  import java.util.concurrent.CompletableFuture;
49  import java.util.concurrent.ConcurrentHashMap;
50  import java.util.concurrent.ExecutionException;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.concurrent.Future;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicInteger;
57  
58  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66  
67  /**
68   * Client side implementation.
69   * The client instance is bound to a given maven repository.
70   *
71   * @since 2.0.1
72   */
73  public class IpcClient {
74  
75      static final boolean IS_WINDOWS =
76              System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77  
78      volatile boolean initialized;
79      Path lockPath;
80      Path logPath;
81      Path syncPath;
82      SocketChannel socket;
83      DataOutputStream output;
84      DataInputStream input;
85      Thread receiver;
86      AtomicInteger requestId = new AtomicInteger();
87      Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
88  
89      IpcClient(Path lockPath, Path logPath, Path syncPath) {
90          this.lockPath = lockPath;
91          this.logPath = logPath;
92          this.syncPath = syncPath;
93      }
94  
95      void ensureInitialized() throws IOException {
96          if (!initialized) {
97              synchronized (this) {
98                  if (!initialized) {
99                      socket = createClient();
100                     ByteChannel wrapper = new ByteChannelWrapper(socket);
101                     input = new DataInputStream(Channels.newInputStream(wrapper));
102                     output = new DataOutputStream(Channels.newOutputStream(wrapper));
103                     receiver = new Thread(this::receive);
104                     receiver.setDaemon(true);
105                     receiver.start();
106                     initialized = true;
107                 }
108             }
109         }
110     }
111 
112     SocketChannel createClient() throws IOException {
113         String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
114         SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
115 
116         Path lockPath = this.lockPath.toAbsolutePath().normalize();
117         Path lockFile =
118                 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
119         if (!Files.isRegularFile(lockFile)) {
120             if (!Files.isDirectory(lockFile.getParent())) {
121                 Files.createDirectories(lockFile.getParent());
122             }
123         }
124 
125         try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
126             try (FileLock lock = raf.getChannel().lock()) {
127                 String line = raf.readLine();
128                 if (line != null) {
129                     try {
130                         SocketAddress address = SocketFamily.fromString(line);
131                         return SocketChannel.open(address);
132                     } catch (IOException e) {
133                         // ignore
134                     }
135                 }
136 
137                 ServerSocketChannel ss = family.openServerSocket();
138                 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
139                 String rand = Long.toHexString(new Random().nextLong());
140                 String nativeName =
141                         System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
142                 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
143 
144                 boolean debug = Boolean.parseBoolean(
145                         System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
146                 boolean noNative = Boolean.parseBoolean(System.getProperty(
147                         IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
148                 if (!noNative) {
149                     noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
150                 }
151                 Closeable close;
152                 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
153                 List<String> args = new ArrayList<>();
154                 if (noNative) {
155                     boolean noFork = Boolean.parseBoolean(System.getProperty(
156                             IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
157                     if (noFork) {
158                         IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
159                         close = server::close;
160                     } else {
161                         String javaHome = System.getenv("JAVA_HOME");
162                         if (javaHome == null) {
163                             javaHome = System.getProperty("java.home");
164                         }
165                         String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
166                         String java = Paths.get(javaHome)
167                                 .resolve(javaCmd)
168                                 .toAbsolutePath()
169                                 .toString();
170                         args.add(java);
171                         String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
172                         args.add("-cp");
173                         args.add(classpath);
174                         String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
175                         if (timeout != null) {
176                             args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
177                         }
178                         args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
179                         args.add(IpcServer.class.getName());
180                         args.add(family.name());
181                         args.add(tmpaddr);
182                         args.add(rand);
183                         ProcessBuilder processBuilder = new ProcessBuilder();
184                         ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
185                         Files.createDirectories(logPath);
186                         Process process = processBuilder
187                                 .directory(lockFile.getParent().toFile())
188                                 .command(args)
189                                 .redirectOutput(discard)
190                                 .redirectError(discard)
191                                 .start();
192                         close = process::destroyForcibly;
193                     }
194                 } else {
195                     args.add(syncPath.resolve(syncCmd).toString());
196                     String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
197                     if (timeout != null) {
198                         args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
199                     }
200                     args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
201                     args.add(family.name());
202                     args.add(tmpaddr);
203                     args.add(rand);
204                     ProcessBuilder processBuilder = new ProcessBuilder();
205                     ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
206                     Files.createDirectories(logPath);
207                     Process process = processBuilder
208                             .directory(lockFile.getParent().toFile())
209                             .command(args)
210                             .redirectOutput(discard)
211                             .redirectError(discard)
212                             .start();
213                     close = process::destroyForcibly;
214                 }
215 
216                 ExecutorService es = Executors.newSingleThreadExecutor();
217                 Future<String[]> future = es.submit(() -> {
218                     SocketChannel s = ss.accept();
219                     DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
220                     String rand2 = dis.readUTF();
221                     String addr2 = dis.readUTF();
222                     return new String[] {rand2, addr2};
223                 });
224                 String[] res;
225                 try {
226                     res = future.get(5, TimeUnit.SECONDS);
227                 } catch (Exception e) {
228                     try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
229                         writer.println("Arguments:");
230                         args.forEach(writer::println);
231                         writer.println();
232                         writer.println("Exception:");
233                         e.printStackTrace(writer);
234                     }
235                     close.close();
236                     throw e;
237                 } finally {
238                     es.shutdownNow();
239                     ss.close();
240                 }
241                 if (!Objects.equals(rand, res[0])) {
242                     close.close();
243                     throw new IllegalStateException("IpcServer did not respond with the correct random");
244                 }
245 
246                 SocketAddress addr = SocketFamily.fromString(res[1]);
247                 SocketChannel socket = SocketChannel.open(addr);
248 
249                 raf.seek(0);
250                 raf.writeBytes(res[1] + "\n");
251                 return socket;
252             } catch (Exception e) {
253                 throw new RuntimeException("Unable to create and connect to lock server", e);
254             }
255         }
256     }
257 
258     private String getJarPath(Class<?> clazz) {
259         String classpath;
260         String className = clazz.getName().replace('.', '/') + ".class";
261         String url = clazz.getClassLoader().getResource(className).toString();
262         if (url.startsWith("jar:")) {
263             url = url.substring("jar:".length(), url.indexOf("!/"));
264             if (url.startsWith("file:")) {
265                 classpath = url.substring("file:".length());
266             } else {
267                 throw new IllegalStateException();
268             }
269         } else if (url.startsWith("file:")) {
270             classpath = url.substring("file:".length(), url.indexOf(className));
271         } else {
272             throw new IllegalStateException();
273         }
274         if (IS_WINDOWS) {
275             if (classpath.startsWith("/")) {
276                 classpath = classpath.substring(1);
277             }
278             classpath = classpath.replace('/', '\\');
279         }
280 
281         return classpath;
282     }
283 
284     void receive() {
285         try {
286             while (true) {
287                 int id = input.readInt();
288                 int sz = input.readInt();
289                 List<String> s = new ArrayList<>(sz);
290                 for (int i = 0; i < sz; i++) {
291                     s.add(input.readUTF());
292                 }
293                 CompletableFuture<List<String>> f = responses.remove(id);
294                 if (f == null || s.isEmpty()) {
295                     throw new IllegalStateException("Protocol error");
296                 }
297                 f.complete(s);
298             }
299         } catch (EOFException e) {
300             // server is stopped; just quit
301         } catch (Exception e) {
302             close(e);
303         }
304     }
305 
306     List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
307         ensureInitialized();
308         int id = requestId.incrementAndGet();
309         CompletableFuture<List<String>> response = new CompletableFuture<>();
310         responses.put(id, response);
311         synchronized (output) {
312             output.writeInt(id);
313             output.writeInt(request.size());
314             for (String s : request) {
315                 output.writeUTF(s);
316             }
317             output.flush();
318         }
319         try {
320             return response.get(time, unit);
321         } catch (InterruptedException e) {
322             throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
323         } catch (ExecutionException e) {
324             throw new IOException("Execution error", e);
325         }
326     }
327 
328     void close() {
329         close(new IOException("Closing"));
330     }
331 
332     synchronized void close(Throwable e) {
333         if (socket != null) {
334             try {
335                 socket.close();
336             } catch (IOException t) {
337                 e.addSuppressed(t);
338             }
339             socket = null;
340             input = null;
341             output = null;
342         }
343         if (receiver != null) {
344             receiver.interrupt();
345             try {
346                 receiver.join(1000);
347             } catch (InterruptedException t) {
348                 e.addSuppressed(t);
349             }
350         }
351         responses.values().forEach(f -> f.completeExceptionally(e));
352         responses.clear();
353     }
354 
355     String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
356         RuntimeException error = new RuntimeException("Unable to create new sync context");
357         for (int i = 0; i < 2; i++) {
358             try {
359                 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
360                 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
361                     throw new IOException("Unexpected response: " + response);
362                 }
363                 return response.get(1);
364             } catch (TimeoutException e) {
365                 throw e;
366             } catch (Exception e) {
367                 close(e);
368                 error.addSuppressed(e);
369             }
370         }
371         throw error;
372     }
373 
374     void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
375         try {
376             List<String> req = new ArrayList<>(keys.size() + 2);
377             req.add(REQUEST_ACQUIRE);
378             req.add(contextId);
379             req.addAll(keys);
380             List<String> response = send(req, time, unit);
381             if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
382                 throw new IOException("Unexpected response: " + response);
383             }
384         } catch (TimeoutException e) {
385             throw e;
386         } catch (Exception e) {
387             close(e);
388             throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
389         }
390     }
391 
392     void unlock(String contextId) {
393         try {
394             List<String> response =
395                     send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
396             if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
397                 throw new IOException("Unexpected response: " + response);
398             }
399         } catch (Exception e) {
400             close(e);
401             throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
402         }
403     }
404 
405     /**
406      * To be used in tests to stop server immediately. Should not be used outside of tests.
407      */
408     void stopServer() {
409         try {
410             List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
411             if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
412                 throw new IOException("Unexpected response: " + response);
413             }
414         } catch (Exception e) {
415             close(e);
416             throw new RuntimeException("Unable to stop server", e);
417         }
418     }
419 
420     @Override
421     public String toString() {
422         return "IpcClient{"
423                 + "lockPath=" + lockPath + ","
424                 + "syncServerPath=" + syncPath + ","
425                 + "address='" + getAddress() + "'}";
426     }
427 
428     private String getAddress() {
429         try {
430             return SocketFamily.toString(socket.getLocalAddress());
431         } catch (IOException e) {
432             return "[not bound]";
433         }
434     }
435 }