1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.named.ipc;
20
21 import java.io.Closeable;
22 import java.io.DataInputStream;
23 import java.io.DataOutputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileWriter;
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.io.PrintWriter;
30 import java.io.RandomAccessFile;
31 import java.net.SocketAddress;
32 import java.nio.channels.ByteChannel;
33 import java.nio.channels.Channels;
34 import java.nio.channels.FileLock;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.nio.file.Files;
38 import java.nio.file.Path;
39 import java.nio.file.Paths;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.List;
44 import java.util.Locale;
45 import java.util.Map;
46 import java.util.Objects;
47 import java.util.Random;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56 import java.util.concurrent.atomic.AtomicInteger;
57
58 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
59 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
60 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
61 import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
62 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
63 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
64 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
65 import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
66
67
68
69
70
71
72
73 public class IpcClient {
74
75 static final boolean IS_WINDOWS =
76 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
77
78 volatile boolean initialized;
79 Path lockPath;
80 Path logPath;
81 Path syncPath;
82 SocketChannel socket;
83 DataOutputStream output;
84 DataInputStream input;
85 Thread receiver;
86 AtomicInteger requestId = new AtomicInteger();
87 Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
88
89 IpcClient(Path lockPath, Path logPath, Path syncPath) {
90 this.lockPath = lockPath;
91 this.logPath = logPath;
92 this.syncPath = syncPath;
93 }
94
95 void ensureInitialized() throws IOException {
96 if (!initialized) {
97 synchronized (this) {
98 if (!initialized) {
99 socket = createClient();
100 ByteChannel wrapper = new ByteChannelWrapper(socket);
101 input = new DataInputStream(Channels.newInputStream(wrapper));
102 output = new DataOutputStream(Channels.newOutputStream(wrapper));
103 receiver = new Thread(this::receive);
104 receiver.setDaemon(true);
105 receiver.start();
106 initialized = true;
107 }
108 }
109 }
110 }
111
112 SocketChannel createClient() throws IOException {
113 String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
114 SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
115
116 Path lockPath = this.lockPath.toAbsolutePath().normalize();
117 Path lockFile =
118 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
119 if (!Files.isRegularFile(lockFile)) {
120 if (!Files.isDirectory(lockFile.getParent())) {
121 Files.createDirectories(lockFile.getParent());
122 }
123 }
124
125 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
126 try (FileLock lock = raf.getChannel().lock()) {
127 String line = raf.readLine();
128 if (line != null) {
129 try {
130 SocketAddress address = SocketFamily.fromString(line);
131 return SocketChannel.open(address);
132 } catch (IOException e) {
133
134 }
135 }
136
137 ServerSocketChannel ss = family.openServerSocket();
138 String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
139 String rand = Long.toHexString(new Random().nextLong());
140 String nativeName =
141 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
142 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
143
144 boolean debug = Boolean.parseBoolean(
145 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
146 boolean noNative = Boolean.parseBoolean(System.getProperty(
147 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
148 if (!noNative) {
149 noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
150 }
151 Closeable close;
152 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
153 List<String> args = new ArrayList<>();
154 if (noNative) {
155 boolean noFork = Boolean.parseBoolean(System.getProperty(
156 IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
157 if (noFork) {
158 IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
159 close = server::close;
160 } else {
161 String javaHome = System.getenv("JAVA_HOME");
162 if (javaHome == null) {
163 javaHome = System.getProperty("java.home");
164 }
165 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
166 String java = Paths.get(javaHome)
167 .resolve(javaCmd)
168 .toAbsolutePath()
169 .toString();
170 args.add(java);
171 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
172 args.add("-cp");
173 args.add(classpath);
174 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
175 if (timeout != null) {
176 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
177 }
178 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
179 args.add(IpcServer.class.getName());
180 args.add(family.name());
181 args.add(tmpaddr);
182 args.add(rand);
183 ProcessBuilder processBuilder = new ProcessBuilder();
184 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
185 Files.createDirectories(logPath);
186 Process process = processBuilder
187 .directory(lockFile.getParent().toFile())
188 .command(args)
189 .redirectOutput(discard)
190 .redirectError(discard)
191 .start();
192 close = process::destroyForcibly;
193 }
194 } else {
195 args.add(syncPath.resolve(syncCmd).toString());
196 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
197 if (timeout != null) {
198 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
199 }
200 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
201 args.add(family.name());
202 args.add(tmpaddr);
203 args.add(rand);
204 ProcessBuilder processBuilder = new ProcessBuilder();
205 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
206 Files.createDirectories(logPath);
207 Process process = processBuilder
208 .directory(lockFile.getParent().toFile())
209 .command(args)
210 .redirectOutput(discard)
211 .redirectError(discard)
212 .start();
213 close = process::destroyForcibly;
214 }
215
216 ExecutorService es = Executors.newSingleThreadExecutor();
217 Future<String[]> future = es.submit(() -> {
218 SocketChannel s = ss.accept();
219 DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
220 String rand2 = dis.readUTF();
221 String addr2 = dis.readUTF();
222 return new String[] {rand2, addr2};
223 });
224 String[] res;
225 try {
226 res = future.get(5, TimeUnit.SECONDS);
227 } catch (Exception e) {
228 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
229 writer.println("Arguments:");
230 args.forEach(writer::println);
231 writer.println();
232 writer.println("Exception:");
233 e.printStackTrace(writer);
234 }
235 close.close();
236 throw e;
237 } finally {
238 es.shutdownNow();
239 ss.close();
240 }
241 if (!Objects.equals(rand, res[0])) {
242 close.close();
243 throw new IllegalStateException("IpcServer did not respond with the correct random");
244 }
245
246 SocketAddress addr = SocketFamily.fromString(res[1]);
247 SocketChannel socket = SocketChannel.open(addr);
248
249 raf.seek(0);
250 raf.writeBytes(res[1] + "\n");
251 return socket;
252 } catch (Exception e) {
253 throw new RuntimeException("Unable to create and connect to lock server", e);
254 }
255 }
256 }
257
258 private String getJarPath(Class<?> clazz) {
259 String classpath;
260 String className = clazz.getName().replace('.', '/') + ".class";
261 String url = clazz.getClassLoader().getResource(className).toString();
262 if (url.startsWith("jar:")) {
263 url = url.substring("jar:".length(), url.indexOf("!/"));
264 if (url.startsWith("file:")) {
265 classpath = url.substring("file:".length());
266 } else {
267 throw new IllegalStateException();
268 }
269 } else if (url.startsWith("file:")) {
270 classpath = url.substring("file:".length(), url.indexOf(className));
271 } else {
272 throw new IllegalStateException();
273 }
274 if (IS_WINDOWS) {
275 if (classpath.startsWith("/")) {
276 classpath = classpath.substring(1);
277 }
278 classpath = classpath.replace('/', '\\');
279 }
280
281 return classpath;
282 }
283
284 void receive() {
285 try {
286 while (true) {
287 int id = input.readInt();
288 int sz = input.readInt();
289 List<String> s = new ArrayList<>(sz);
290 for (int i = 0; i < sz; i++) {
291 s.add(input.readUTF());
292 }
293 CompletableFuture<List<String>> f = responses.remove(id);
294 if (f == null || s.isEmpty()) {
295 throw new IllegalStateException("Protocol error");
296 }
297 f.complete(s);
298 }
299 } catch (EOFException e) {
300
301 } catch (Exception e) {
302 close(e);
303 }
304 }
305
306 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
307 ensureInitialized();
308 int id = requestId.incrementAndGet();
309 CompletableFuture<List<String>> response = new CompletableFuture<>();
310 responses.put(id, response);
311 synchronized (output) {
312 output.writeInt(id);
313 output.writeInt(request.size());
314 for (String s : request) {
315 output.writeUTF(s);
316 }
317 output.flush();
318 }
319 try {
320 return response.get(time, unit);
321 } catch (InterruptedException e) {
322 throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
323 } catch (ExecutionException e) {
324 throw new IOException("Execution error", e);
325 }
326 }
327
328 void close() {
329 close(new IOException("Closing"));
330 }
331
332 synchronized void close(Throwable e) {
333 if (socket != null) {
334 try {
335 socket.close();
336 } catch (IOException t) {
337 e.addSuppressed(t);
338 }
339 socket = null;
340 input = null;
341 output = null;
342 }
343 if (receiver != null) {
344 receiver.interrupt();
345 try {
346 receiver.join(1000);
347 } catch (InterruptedException t) {
348 e.addSuppressed(t);
349 }
350 }
351 responses.values().forEach(f -> f.completeExceptionally(e));
352 responses.clear();
353 }
354
355 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
356 RuntimeException error = new RuntimeException("Unable to create new sync context");
357 for (int i = 0; i < 2; i++) {
358 try {
359 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
360 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
361 throw new IOException("Unexpected response: " + response);
362 }
363 return response.get(1);
364 } catch (TimeoutException e) {
365 throw e;
366 } catch (Exception e) {
367 close(e);
368 error.addSuppressed(e);
369 }
370 }
371 throw error;
372 }
373
374 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
375 try {
376 List<String> req = new ArrayList<>(keys.size() + 2);
377 req.add(REQUEST_ACQUIRE);
378 req.add(contextId);
379 req.addAll(keys);
380 List<String> response = send(req, time, unit);
381 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
382 throw new IOException("Unexpected response: " + response);
383 }
384 } catch (TimeoutException e) {
385 throw e;
386 } catch (Exception e) {
387 close(e);
388 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
389 }
390 }
391
392 void unlock(String contextId) {
393 try {
394 List<String> response =
395 send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
396 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
397 throw new IOException("Unexpected response: " + response);
398 }
399 } catch (Exception e) {
400 close(e);
401 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
402 }
403 }
404
405
406
407
408 void stopServer() {
409 try {
410 List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
411 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
412 throw new IOException("Unexpected response: " + response);
413 }
414 } catch (Exception e) {
415 close(e);
416 throw new RuntimeException("Unable to stop server", e);
417 }
418 }
419
420 @Override
421 public String toString() {
422 return "IpcClient{"
423 + "lockPath=" + lockPath + ","
424 + "syncServerPath=" + syncPath + ","
425 + "address='" + getAddress() + "'}";
426 }
427
428 private String getAddress() {
429 try {
430 return SocketFamily.toString(socket.getLocalAddress());
431 } catch (IOException e) {
432 return "[not bound]";
433 }
434 }
435 }