1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.named.ipc;
20
21 import java.io.Closeable;
22 import java.io.DataInputStream;
23 import java.io.DataOutputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileWriter;
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.io.PrintWriter;
30 import java.io.RandomAccessFile;
31 import java.net.SocketAddress;
32 import java.nio.channels.ByteChannel;
33 import java.nio.channels.Channels;
34 import java.nio.channels.FileLock;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.nio.file.Files;
38 import java.nio.file.Path;
39 import java.nio.file.Paths;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.List;
44 import java.util.Locale;
45 import java.util.Map;
46 import java.util.Objects;
47 import java.util.Random;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56 import java.util.concurrent.atomic.AtomicInteger;
57
58 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66
67
68
69
70
71
72
73 public class IpcClient {
74
75 static final boolean IS_WINDOWS =
76 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77
78 protected volatile boolean initialized;
79 protected final Path lockPath;
80 protected final Path logPath;
81 protected final Path syncPath;
82 protected final boolean noFork;
83
84 protected SocketChannel socket;
85 protected DataOutputStream output;
86 protected DataInputStream input;
87 protected Thread receiver;
88
89 protected final AtomicInteger requestId = new AtomicInteger();
90 protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
91
92 IpcClient(Path lockPath, Path logPath, Path syncPath) {
93 this.lockPath = lockPath;
94 this.logPath = logPath;
95 this.syncPath = syncPath;
96 this.noFork = Boolean.parseBoolean(
97 System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
98 }
99
100 void ensureInitialized() throws IOException {
101 if (!initialized) {
102
103 synchronized (this) {
104 if (!initialized) {
105 socket = createClient();
106 ByteChannel wrapper = new ByteChannelWrapper(socket);
107 input = new DataInputStream(Channels.newInputStream(wrapper));
108 output = new DataOutputStream(Channels.newOutputStream(wrapper));
109 receiver = new Thread(this::receive);
110 receiver.setDaemon(true);
111 receiver.start();
112 initialized = true;
113 }
114 }
115 }
116 }
117
118 SocketChannel createClient() throws IOException {
119 String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
120 SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
121
122 Path lockPath = this.lockPath.toAbsolutePath().normalize();
123 Path lockFile =
124 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125 if (!Files.isRegularFile(lockFile)) {
126 if (!Files.isDirectory(lockFile.getParent())) {
127 Files.createDirectories(lockFile.getParent());
128 }
129 }
130
131 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132 try (FileLock lock = raf.getChannel().lock()) {
133 String line = raf.readLine();
134 if (line != null) {
135 try {
136 SocketAddress address = SocketFamily.fromString(line);
137 return SocketChannel.open(address);
138 } catch (IOException e) {
139
140 }
141 }
142
143 ServerSocketChannel ss = family.openServerSocket();
144 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145 String rand = Long.toHexString(new Random().nextLong());
146 String nativeName =
147 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149
150 boolean debug = Boolean.parseBoolean(
151 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152 boolean noNative = Boolean.parseBoolean(System.getProperty(
153 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154 if (!noNative) {
155 noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156 }
157 Closeable close;
158 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159 List<String> args = new ArrayList<>();
160 if (noNative) {
161 if (noFork) {
162 IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163 close = server::close;
164 } else {
165 String javaHome = System.getenv("JAVA_HOME");
166 if (javaHome == null) {
167 javaHome = System.getProperty("java.home");
168 }
169 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170 String java = Paths.get(javaHome)
171 .resolve(javaCmd)
172 .toAbsolutePath()
173 .toString();
174 args.add(java);
175 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176 args.add("-cp");
177 args.add(classpath);
178 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179 if (timeout != null) {
180 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181 }
182 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183 args.add(IpcServer.class.getName());
184 args.add(family.name());
185 args.add(tmpaddr);
186 args.add(rand);
187 ProcessBuilder processBuilder = new ProcessBuilder();
188 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189 Files.createDirectories(logPath);
190 Process process = processBuilder
191 .directory(lockFile.getParent().toFile())
192 .command(args)
193 .redirectOutput(discard)
194 .redirectError(discard)
195 .start();
196 close = process::destroyForcibly;
197 }
198 } else {
199 args.add(syncPath.resolve(syncCmd).toString());
200 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201 if (timeout != null) {
202 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203 }
204 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205 args.add(family.name());
206 args.add(tmpaddr);
207 args.add(rand);
208 ProcessBuilder processBuilder = new ProcessBuilder();
209 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210 Files.createDirectories(logPath);
211 Process process = processBuilder
212 .directory(lockFile.getParent().toFile())
213 .command(args)
214 .redirectOutput(discard)
215 .redirectError(discard)
216 .start();
217 close = process::destroyForcibly;
218 }
219
220 ExecutorService es = Executors.newSingleThreadExecutor();
221 Future<String[]> future = es.submit(() -> {
222 SocketChannel s = ss.accept();
223 DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224 String rand2 = dis.readUTF();
225 String addr2 = dis.readUTF();
226 return new String[] {rand2, addr2};
227 });
228 String[] res;
229 try {
230 res = future.get(5, TimeUnit.SECONDS);
231 } catch (Exception e) {
232 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233 writer.println("Arguments:");
234 args.forEach(writer::println);
235 writer.println();
236 writer.println("Exception:");
237 e.printStackTrace(writer);
238 }
239 close.close();
240 throw e;
241 } finally {
242 es.shutdownNow();
243 ss.close();
244 }
245 if (!Objects.equals(rand, res[0])) {
246 close.close();
247 throw new IllegalStateException("IpcServer did not respond with the correct random");
248 }
249
250 SocketAddress addr = SocketFamily.fromString(res[1]);
251 SocketChannel socket = SocketChannel.open(addr);
252
253 raf.seek(0);
254 raf.writeBytes(res[1] + "\n");
255 return socket;
256 } catch (Exception e) {
257 throw new RuntimeException("Unable to create and connect to lock server", e);
258 }
259 }
260 }
261
262 private String getJarPath(Class<?> clazz) {
263 String classpath;
264 String className = clazz.getName().replace('.', '/') + ".class";
265 String url = clazz.getClassLoader().getResource(className).toString();
266 if (url.startsWith("jar:")) {
267 url = url.substring("jar:".length(), url.indexOf("!/"));
268 if (url.startsWith("file:")) {
269 classpath = url.substring("file:".length());
270 } else {
271 throw new IllegalStateException();
272 }
273 } else if (url.startsWith("file:")) {
274 classpath = url.substring("file:".length(), url.indexOf(className));
275 } else {
276 throw new IllegalStateException();
277 }
278 if (IS_WINDOWS) {
279 if (classpath.startsWith("/")) {
280 classpath = classpath.substring(1);
281 }
282 classpath = classpath.replace('/', '\\');
283 }
284
285 return classpath;
286 }
287
288 void receive() {
289 try {
290 while (true) {
291 int id = input.readInt();
292 int sz = input.readInt();
293 List<String> s = new ArrayList<>(sz);
294 for (int i = 0; i < sz; i++) {
295 s.add(input.readUTF());
296 }
297 CompletableFuture<List<String>> f = responses.remove(id);
298 if (f == null || s.isEmpty()) {
299 throw new IllegalStateException("Protocol error");
300 }
301 f.complete(s);
302 }
303 } catch (EOFException e) {
304
305 } catch (Exception e) {
306 close(e);
307 }
308 }
309
310 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
311 ensureInitialized();
312 int id = requestId.incrementAndGet();
313 CompletableFuture<List<String>> response = new CompletableFuture<>();
314 responses.put(id, response);
315 synchronized (output) {
316 output.writeInt(id);
317 output.writeInt(request.size());
318 for (String s : request) {
319 output.writeUTF(s);
320 }
321 output.flush();
322 }
323 try {
324 return response.get(time, unit);
325 } catch (InterruptedException e) {
326 throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
327 } catch (ExecutionException e) {
328 throw new IOException("Execution error", e);
329 }
330 }
331
332 void close() {
333 if (noFork) {
334 stopServer();
335 }
336 close(new IOException("Closing"));
337 }
338
339 synchronized void close(Throwable e) {
340 if (socket != null) {
341 try {
342 socket.close();
343 } catch (IOException t) {
344 e.addSuppressed(t);
345 }
346 socket = null;
347 input = null;
348 output = null;
349 }
350 if (receiver != null) {
351 receiver.interrupt();
352 try {
353 receiver.join(1000);
354 } catch (InterruptedException t) {
355 e.addSuppressed(t);
356 }
357 }
358 responses.values().forEach(f -> f.completeExceptionally(e));
359 responses.clear();
360 }
361
362 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
363 RuntimeException error = new RuntimeException("Unable to create new sync context");
364 for (int i = 0; i < 2; i++) {
365 try {
366 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
367 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
368 throw new IOException("Unexpected response: " + response);
369 }
370 return response.get(1);
371 } catch (TimeoutException e) {
372 throw e;
373 } catch (Exception e) {
374 close(e);
375 error.addSuppressed(e);
376 }
377 }
378 throw error;
379 }
380
381 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
382 try {
383 List<String> req = new ArrayList<>(keys.size() + 2);
384 req.add(REQUEST_ACQUIRE);
385 req.add(contextId);
386 req.addAll(keys);
387 List<String> response = send(req, time, unit);
388 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
389 throw new IOException("Unexpected response: " + response);
390 }
391 } catch (TimeoutException e) {
392 throw e;
393 } catch (Exception e) {
394 close(e);
395 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
396 }
397 }
398
399 void unlock(String contextId) {
400 try {
401 List<String> response =
402 send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
403 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
404 throw new IOException("Unexpected response: " + response);
405 }
406 } catch (Exception e) {
407 close(e);
408 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
409 }
410 }
411
412
413
414
415 void stopServer() {
416 try {
417 List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
418 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
419 throw new IOException("Unexpected response: " + response);
420 }
421 } catch (Exception e) {
422 close(e);
423 throw new RuntimeException("Unable to stop server", e);
424 }
425 }
426
427 @Override
428 public String toString() {
429 return "IpcClient{"
430 + "lockPath=" + lockPath + ","
431 + "syncServerPath=" + syncPath + ","
432 + "address='" + getAddress() + "'}";
433 }
434
435 private String getAddress() {
436 try {
437 return SocketFamily.toString(socket.getLocalAddress());
438 } catch (IOException e) {
439 return "[not bound]";
440 }
441 }
442 }