View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.named.ipc;
20  
21  import java.io.Closeable;
22  import java.io.DataInputStream;
23  import java.io.DataOutputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.FileWriter;
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.io.PrintWriter;
30  import java.io.RandomAccessFile;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.Channels;
34  import java.nio.channels.FileLock;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.nio.file.Files;
38  import java.nio.file.Path;
39  import java.nio.file.Paths;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.Collection;
43  import java.util.List;
44  import java.util.Locale;
45  import java.util.Map;
46  import java.util.Objects;
47  import java.util.Random;
48  import java.util.concurrent.CompletableFuture;
49  import java.util.concurrent.ConcurrentHashMap;
50  import java.util.concurrent.ExecutionException;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.concurrent.Future;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  import java.util.concurrent.atomic.AtomicInteger;
57  
58  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61  import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65  import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66  
67  /**
68   * Client side implementation.
69   * The client instance is bound to a given maven repository.
70   *
71   * @since 2.0.1
72   */
73  public class IpcClient {
74  
75      static final boolean IS_WINDOWS =
76              System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77  
78      protected volatile boolean initialized;
79      protected final Path lockPath;
80      protected final Path logPath;
81      protected final Path syncPath;
82      protected final boolean noFork;
83  
84      protected volatile SocketChannel socket;
85      protected volatile DataOutputStream output;
86      protected volatile DataInputStream input;
87      protected volatile Thread receiver;
88  
89      protected final AtomicInteger requestId = new AtomicInteger();
90      protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
91  
92      IpcClient(Path lockPath, Path logPath, Path syncPath) {
93          this.lockPath = lockPath;
94          this.logPath = logPath;
95          this.syncPath = syncPath;
96          this.noFork = Boolean.parseBoolean(
97                  System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
98      }
99  
100     void ensureInitialized() throws IOException {
101         if (!initialized) {
102             // caller must block on this method
103             synchronized (this) {
104                 if (!initialized) {
105                     socket = createClient();
106                     ByteChannel wrapper = new ByteChannelWrapper(socket);
107                     input = new DataInputStream(Channels.newInputStream(wrapper));
108                     output = new DataOutputStream(Channels.newOutputStream(wrapper));
109                     receiver = new Thread(this::receive);
110                     receiver.setDaemon(true);
111                     receiver.start();
112                     initialized = true;
113                 }
114             }
115         }
116     }
117 
118     SocketChannel createClient() throws IOException {
119         SocketFamily family =
120                 SocketFamily.valueOf(System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY));
121 
122         Path lockPath = this.lockPath.toAbsolutePath().normalize();
123         Path lockFile =
124                 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125         if (!Files.isRegularFile(lockFile)) {
126             if (!Files.isDirectory(lockFile.getParent())) {
127                 Files.createDirectories(lockFile.getParent());
128             }
129         }
130 
131         try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132             try (FileLock lock = raf.getChannel().lock()) {
133                 String line = raf.readLine();
134                 if (line != null) {
135                     try {
136                         SocketAddress address = SocketFamily.fromString(line);
137                         return SocketChannel.open(address);
138                     } catch (IOException e) {
139                         // ignore
140                     }
141                 }
142 
143                 ServerSocketChannel ss = family.openServerSocket();
144                 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145                 String rand = Long.toHexString(new Random().nextLong());
146                 String nativeName =
147                         System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148                 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149 
150                 boolean debug = Boolean.parseBoolean(
151                         System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152                 boolean noNative = Boolean.parseBoolean(System.getProperty(
153                         IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154                 if (!noNative) {
155                     noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156                 }
157                 Closeable close;
158                 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159                 List<String> args = new ArrayList<>();
160                 if (noNative) {
161                     if (noFork) {
162                         IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163                         close = server::close;
164                     } else {
165                         String javaHome = System.getenv("JAVA_HOME");
166                         if (javaHome == null) {
167                             javaHome = System.getProperty("java.home");
168                         }
169                         String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170                         String java = Paths.get(javaHome)
171                                 .resolve(javaCmd)
172                                 .toAbsolutePath()
173                                 .toString();
174                         args.add(java);
175                         String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176                         args.add("-cp");
177                         args.add(classpath);
178                         String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179                         if (timeout != null) {
180                             args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181                         }
182                         args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183                         args.add(IpcServer.class.getName());
184                         args.add(family.name());
185                         args.add(tmpaddr);
186                         args.add(rand);
187                         ProcessBuilder processBuilder = new ProcessBuilder();
188                         ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189                         Files.createDirectories(logPath);
190                         Process process = processBuilder
191                                 .directory(lockFile.getParent().toFile())
192                                 .command(args)
193                                 .redirectOutput(discard)
194                                 .redirectError(discard)
195                                 .start();
196                         close = process::destroyForcibly;
197                     }
198                 } else {
199                     args.add(syncPath.resolve(syncCmd).toString());
200                     String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201                     if (timeout != null) {
202                         args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203                     }
204                     args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205                     args.add(family.name());
206                     args.add(tmpaddr);
207                     args.add(rand);
208                     ProcessBuilder processBuilder = new ProcessBuilder();
209                     ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210                     Files.createDirectories(logPath);
211                     Process process = processBuilder
212                             .directory(lockFile.getParent().toFile())
213                             .command(args)
214                             .redirectOutput(discard)
215                             .redirectError(discard)
216                             .start();
217                     close = process::destroyForcibly;
218                 }
219 
220                 ExecutorService es = Executors.newSingleThreadExecutor();
221                 Future<String[]> future = es.submit(() -> {
222                     SocketChannel s = ss.accept();
223                     DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224                     String rand2 = dis.readUTF();
225                     String addr2 = dis.readUTF();
226                     return new String[] {rand2, addr2};
227                 });
228                 String[] res;
229                 try {
230                     res = future.get(5, TimeUnit.SECONDS);
231                 } catch (Exception e) {
232                     try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233                         writer.println("Arguments:");
234                         args.forEach(writer::println);
235                         writer.println();
236                         writer.println("Exception:");
237                         e.printStackTrace(writer);
238                     }
239                     close.close();
240                     throw e;
241                 } finally {
242                     es.shutdownNow();
243                     ss.close();
244                 }
245                 if (!Objects.equals(rand, res[0])) {
246                     close.close();
247                     throw new IllegalStateException("IpcServer did not respond with the correct random");
248                 }
249 
250                 SocketAddress addr = SocketFamily.fromString(res[1]);
251                 SocketChannel socket = SocketChannel.open(addr);
252 
253                 raf.seek(0);
254                 raf.writeBytes(res[1] + "\n");
255                 return socket;
256             } catch (Exception e) {
257                 throw new RuntimeException("Unable to create and connect to lock server", e);
258             }
259         }
260     }
261 
262     private String getJarPath(Class<?> clazz) {
263         String classpath;
264         String className = clazz.getName().replace('.', '/') + ".class";
265         String url = clazz.getClassLoader().getResource(className).toString();
266         if (url.startsWith("jar:")) {
267             url = url.substring("jar:".length(), url.indexOf("!/"));
268             if (url.startsWith("file:")) {
269                 classpath = url.substring("file:".length());
270             } else {
271                 throw new IllegalStateException();
272             }
273         } else if (url.startsWith("file:")) {
274             classpath = url.substring("file:".length(), url.indexOf(className));
275         } else {
276             throw new IllegalStateException();
277         }
278         if (IS_WINDOWS) {
279             if (classpath.startsWith("/")) {
280                 classpath = classpath.substring(1);
281             }
282             classpath = classpath.replace('/', '\\');
283         }
284 
285         return classpath;
286     }
287 
288     void receive() {
289         try {
290             while (true) {
291                 int id = input.readInt();
292                 int sz = input.readInt();
293                 List<String> s = new ArrayList<>(sz);
294                 for (int i = 0; i < sz; i++) {
295                     s.add(input.readUTF());
296                 }
297                 CompletableFuture<List<String>> f = responses.remove(id);
298                 if (f == null) {
299                     continue;
300                 }
301                 if (s.isEmpty()) {
302                     f.completeExceptionally(new IOException("Protocol error: empty response"));
303                     continue;
304                 }
305                 f.complete(s);
306             }
307         } catch (EOFException e) {
308             close(new IOException("Server disconnected", e));
309         } catch (Exception e) {
310             close(e);
311         }
312     }
313 
314     List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
315         ensureInitialized();
316         DataOutputStream out = output;
317         if (out == null) {
318             throw new IOException("Connection closed");
319         }
320         int id = requestId.incrementAndGet();
321         CompletableFuture<List<String>> response = new CompletableFuture<>();
322         responses.put(id, response);
323         synchronized (out) {
324             out.writeInt(id);
325             out.writeInt(request.size());
326             for (String s : request) {
327                 out.writeUTF(s);
328             }
329             out.flush();
330         }
331         try {
332             return response.get(time, unit);
333         } catch (InterruptedException e) {
334             responses.remove(id);
335             throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
336         } catch (ExecutionException e) {
337             throw new IOException("Execution error", e);
338         } catch (TimeoutException e) {
339             responses.remove(id);
340             throw e;
341         }
342     }
343 
344     void close() {
345         if (noFork) {
346             stopServer();
347         }
348         close(new IOException("Closing"));
349     }
350 
351     synchronized void close(Throwable e) {
352         initialized = false;
353         if (socket != null) {
354             try {
355                 socket.close();
356             } catch (IOException t) {
357                 e.addSuppressed(t);
358             }
359             socket = null;
360             input = null;
361             output = null;
362         }
363         if (receiver != null && Thread.currentThread() != receiver) {
364             receiver.interrupt();
365             try {
366                 receiver.join(1000);
367             } catch (InterruptedException t) {
368                 e.addSuppressed(t);
369             }
370         }
371         responses.values().forEach(f -> f.completeExceptionally(e));
372         responses.clear();
373     }
374 
375     String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
376         RuntimeException error = new RuntimeException("Unable to create new sync context");
377         for (int i = 0; i < 2; i++) {
378             try {
379                 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
380                 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
381                     throw new IOException("Unexpected response: " + response);
382                 }
383                 return response.get(1);
384             } catch (TimeoutException e) {
385                 throw e;
386             } catch (Exception e) {
387                 close(e);
388                 error.addSuppressed(e);
389             }
390         }
391         throw error;
392     }
393 
394     void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
395         try {
396             List<String> req = new ArrayList<>(keys.size() + 2);
397             req.add(REQUEST_ACQUIRE);
398             req.add(contextId);
399             req.addAll(keys);
400             List<String> response = send(req, time, unit);
401             if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
402                 throw new IOException("Unexpected response: " + response);
403             }
404         } catch (TimeoutException e) {
405             throw e;
406         } catch (Exception e) {
407             close(e);
408             throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
409         }
410     }
411 
412     void unlock(String contextId) {
413         try {
414             List<String> response = send(Arrays.asList(REQUEST_CLOSE, contextId), 10, TimeUnit.SECONDS);
415             if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
416                 throw new IOException("Unexpected response: " + response);
417             }
418         } catch (Exception e) {
419             close(e);
420             throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
421         }
422     }
423 
424     /**
425      * To be used in tests to stop server immediately. Should not be used outside of tests.
426      */
427     void stopServer() {
428         try {
429             List<String> response = send(List.of(REQUEST_STOP), 30, TimeUnit.SECONDS);
430             if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
431                 throw new IOException("Unexpected response: " + response);
432             }
433         } catch (Exception e) {
434             close(e);
435             throw new RuntimeException("Unable to stop server", e);
436         }
437     }
438 
439     @Override
440     public String toString() {
441         return "IpcClient{"
442                 + "lockPath=" + lockPath + ","
443                 + "syncServerPath=" + syncPath + ","
444                 + "address='" + getAddress() + "'}";
445     }
446 
447     private String getAddress() {
448         try {
449             return SocketFamily.toString(socket.getLocalAddress());
450         } catch (IOException e) {
451             return "[not bound]";
452         }
453     }
454 }