1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.named.ipc;
20
21 import java.io.Closeable;
22 import java.io.DataInputStream;
23 import java.io.DataOutputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileWriter;
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.io.PrintWriter;
30 import java.io.RandomAccessFile;
31 import java.net.SocketAddress;
32 import java.nio.channels.ByteChannel;
33 import java.nio.channels.Channels;
34 import java.nio.channels.FileLock;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.nio.file.Files;
38 import java.nio.file.Path;
39 import java.nio.file.Paths;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.List;
44 import java.util.Locale;
45 import java.util.Map;
46 import java.util.Objects;
47 import java.util.Random;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56 import java.util.concurrent.atomic.AtomicInteger;
57
58 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66
67
68
69
70
71
72
73 public class IpcClient {
74
75 static final boolean IS_WINDOWS =
76 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77
78 protected volatile boolean initialized;
79 protected final Path lockPath;
80 protected final Path logPath;
81 protected final Path syncPath;
82 protected final boolean noFork;
83
84 protected volatile SocketChannel socket;
85 protected volatile DataOutputStream output;
86 protected volatile DataInputStream input;
87 protected volatile Thread receiver;
88
89 protected final AtomicInteger requestId = new AtomicInteger();
90 protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
91
92 IpcClient(Path lockPath, Path logPath, Path syncPath) {
93 this.lockPath = lockPath;
94 this.logPath = logPath;
95 this.syncPath = syncPath;
96 this.noFork = Boolean.parseBoolean(
97 System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
98 }
99
100 void ensureInitialized() throws IOException {
101 if (!initialized) {
102
103 synchronized (this) {
104 if (!initialized) {
105 socket = createClient();
106 ByteChannel wrapper = new ByteChannelWrapper(socket);
107 input = new DataInputStream(Channels.newInputStream(wrapper));
108 output = new DataOutputStream(Channels.newOutputStream(wrapper));
109 receiver = new Thread(this::receive);
110 receiver.setDaemon(true);
111 receiver.start();
112 initialized = true;
113 }
114 }
115 }
116 }
117
118 SocketChannel createClient() throws IOException {
119 SocketFamily family =
120 SocketFamily.valueOf(System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY));
121
122 Path lockPath = this.lockPath.toAbsolutePath().normalize();
123 Path lockFile =
124 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125 if (!Files.isRegularFile(lockFile)) {
126 if (!Files.isDirectory(lockFile.getParent())) {
127 Files.createDirectories(lockFile.getParent());
128 }
129 }
130
131 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132 try (FileLock lock = raf.getChannel().lock()) {
133 String line = raf.readLine();
134 if (line != null) {
135 try {
136 SocketAddress address = SocketFamily.fromString(line);
137 return SocketChannel.open(address);
138 } catch (IOException e) {
139
140 }
141 }
142
143 ServerSocketChannel ss = family.openServerSocket();
144 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145 String rand = Long.toHexString(new Random().nextLong());
146 String nativeName =
147 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149
150 boolean debug = Boolean.parseBoolean(
151 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152 boolean noNative = Boolean.parseBoolean(System.getProperty(
153 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154 if (!noNative) {
155 noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156 }
157 Closeable close;
158 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159 List<String> args = new ArrayList<>();
160 if (noNative) {
161 if (noFork) {
162 IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163 close = server::close;
164 } else {
165 String javaHome = System.getenv("JAVA_HOME");
166 if (javaHome == null) {
167 javaHome = System.getProperty("java.home");
168 }
169 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170 String java = Paths.get(javaHome)
171 .resolve(javaCmd)
172 .toAbsolutePath()
173 .toString();
174 args.add(java);
175 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176 args.add("-cp");
177 args.add(classpath);
178 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179 if (timeout != null) {
180 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181 }
182 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183 args.add(IpcServer.class.getName());
184 args.add(family.name());
185 args.add(tmpaddr);
186 args.add(rand);
187 ProcessBuilder processBuilder = new ProcessBuilder();
188 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189 Files.createDirectories(logPath);
190 Process process = processBuilder
191 .directory(lockFile.getParent().toFile())
192 .command(args)
193 .redirectOutput(discard)
194 .redirectError(discard)
195 .start();
196 close = process::destroyForcibly;
197 }
198 } else {
199 args.add(syncPath.resolve(syncCmd).toString());
200 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201 if (timeout != null) {
202 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203 }
204 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205 args.add(family.name());
206 args.add(tmpaddr);
207 args.add(rand);
208 ProcessBuilder processBuilder = new ProcessBuilder();
209 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210 Files.createDirectories(logPath);
211 Process process = processBuilder
212 .directory(lockFile.getParent().toFile())
213 .command(args)
214 .redirectOutput(discard)
215 .redirectError(discard)
216 .start();
217 close = process::destroyForcibly;
218 }
219
220 ExecutorService es = Executors.newSingleThreadExecutor();
221 Future<String[]> future = es.submit(() -> {
222 SocketChannel s = ss.accept();
223 DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224 String rand2 = dis.readUTF();
225 String addr2 = dis.readUTF();
226 return new String[] {rand2, addr2};
227 });
228 String[] res;
229 try {
230 res = future.get(5, TimeUnit.SECONDS);
231 } catch (Exception e) {
232 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233 writer.println("Arguments:");
234 args.forEach(writer::println);
235 writer.println();
236 writer.println("Exception:");
237 e.printStackTrace(writer);
238 }
239 close.close();
240 throw e;
241 } finally {
242 es.shutdownNow();
243 ss.close();
244 }
245 if (!Objects.equals(rand, res[0])) {
246 close.close();
247 throw new IllegalStateException("IpcServer did not respond with the correct random");
248 }
249
250 SocketAddress addr = SocketFamily.fromString(res[1]);
251 SocketChannel socket = SocketChannel.open(addr);
252
253 raf.seek(0);
254 raf.writeBytes(res[1] + "\n");
255 return socket;
256 } catch (Exception e) {
257 throw new RuntimeException("Unable to create and connect to lock server", e);
258 }
259 }
260 }
261
262 private String getJarPath(Class<?> clazz) {
263 String classpath;
264 String className = clazz.getName().replace('.', '/') + ".class";
265 String url = clazz.getClassLoader().getResource(className).toString();
266 if (url.startsWith("jar:")) {
267 url = url.substring("jar:".length(), url.indexOf("!/"));
268 if (url.startsWith("file:")) {
269 classpath = url.substring("file:".length());
270 } else {
271 throw new IllegalStateException();
272 }
273 } else if (url.startsWith("file:")) {
274 classpath = url.substring("file:".length(), url.indexOf(className));
275 } else {
276 throw new IllegalStateException();
277 }
278 if (IS_WINDOWS) {
279 if (classpath.startsWith("/")) {
280 classpath = classpath.substring(1);
281 }
282 classpath = classpath.replace('/', '\\');
283 }
284
285 return classpath;
286 }
287
288 void receive() {
289 try {
290 while (true) {
291 int id = input.readInt();
292 int sz = input.readInt();
293 List<String> s = new ArrayList<>(sz);
294 for (int i = 0; i < sz; i++) {
295 s.add(input.readUTF());
296 }
297 CompletableFuture<List<String>> f = responses.remove(id);
298 if (f == null) {
299 continue;
300 }
301 if (s.isEmpty()) {
302 f.completeExceptionally(new IOException("Protocol error: empty response"));
303 continue;
304 }
305 f.complete(s);
306 }
307 } catch (EOFException e) {
308 close(new IOException("Server disconnected", e));
309 } catch (Exception e) {
310 close(e);
311 }
312 }
313
314 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
315 ensureInitialized();
316 DataOutputStream out = output;
317 if (out == null) {
318 throw new IOException("Connection closed");
319 }
320 int id = requestId.incrementAndGet();
321 CompletableFuture<List<String>> response = new CompletableFuture<>();
322 responses.put(id, response);
323 synchronized (out) {
324 out.writeInt(id);
325 out.writeInt(request.size());
326 for (String s : request) {
327 out.writeUTF(s);
328 }
329 out.flush();
330 }
331 try {
332 return response.get(time, unit);
333 } catch (InterruptedException e) {
334 responses.remove(id);
335 throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
336 } catch (ExecutionException e) {
337 throw new IOException("Execution error", e);
338 } catch (TimeoutException e) {
339 responses.remove(id);
340 throw e;
341 }
342 }
343
344 void close() {
345 if (noFork) {
346 stopServer();
347 }
348 close(new IOException("Closing"));
349 }
350
351 synchronized void close(Throwable e) {
352 initialized = false;
353 if (socket != null) {
354 try {
355 socket.close();
356 } catch (IOException t) {
357 e.addSuppressed(t);
358 }
359 socket = null;
360 input = null;
361 output = null;
362 }
363 if (receiver != null && Thread.currentThread() != receiver) {
364 receiver.interrupt();
365 try {
366 receiver.join(1000);
367 } catch (InterruptedException t) {
368 e.addSuppressed(t);
369 }
370 }
371 responses.values().forEach(f -> f.completeExceptionally(e));
372 responses.clear();
373 }
374
375 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
376 RuntimeException error = new RuntimeException("Unable to create new sync context");
377 for (int i = 0; i < 2; i++) {
378 try {
379 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
380 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
381 throw new IOException("Unexpected response: " + response);
382 }
383 return response.get(1);
384 } catch (TimeoutException e) {
385 throw e;
386 } catch (Exception e) {
387 close(e);
388 error.addSuppressed(e);
389 }
390 }
391 throw error;
392 }
393
394 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
395 try {
396 List<String> req = new ArrayList<>(keys.size() + 2);
397 req.add(REQUEST_ACQUIRE);
398 req.add(contextId);
399 req.addAll(keys);
400 List<String> response = send(req, time, unit);
401 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
402 throw new IOException("Unexpected response: " + response);
403 }
404 } catch (TimeoutException e) {
405 throw e;
406 } catch (Exception e) {
407 close(e);
408 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
409 }
410 }
411
412 void unlock(String contextId) {
413 try {
414 List<String> response = send(Arrays.asList(REQUEST_CLOSE, contextId), 10, TimeUnit.SECONDS);
415 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
416 throw new IOException("Unexpected response: " + response);
417 }
418 } catch (Exception e) {
419 close(e);
420 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
421 }
422 }
423
424
425
426
427 void stopServer() {
428 try {
429 List<String> response = send(List.of(REQUEST_STOP), 30, TimeUnit.SECONDS);
430 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
431 throw new IOException("Unexpected response: " + response);
432 }
433 } catch (Exception e) {
434 close(e);
435 throw new RuntimeException("Unable to stop server", e);
436 }
437 }
438
439 @Override
440 public String toString() {
441 return "IpcClient{"
442 + "lockPath=" + lockPath + ","
443 + "syncServerPath=" + syncPath + ","
444 + "address='" + getAddress() + "'}";
445 }
446
447 private String getAddress() {
448 try {
449 return SocketFamily.toString(socket.getLocalAddress());
450 } catch (IOException e) {
451 return "[not bound]";
452 }
453 }
454 }