001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.named.ipc; 020 021import java.io.Closeable; 022import java.io.DataInputStream; 023import java.io.DataOutputStream; 024import java.io.EOFException; 025import java.io.File; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.io.PrintWriter; 030import java.io.RandomAccessFile; 031import java.net.SocketAddress; 032import java.nio.channels.ByteChannel; 033import java.nio.channels.Channels; 034import java.nio.channels.FileLock; 035import java.nio.channels.ServerSocketChannel; 036import java.nio.channels.SocketChannel; 037import java.nio.file.Files; 038import java.nio.file.Path; 039import java.nio.file.Paths; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.List; 044import java.util.Locale; 045import java.util.Map; 046import java.util.Objects; 047import java.util.Random; 048import java.util.concurrent.CompletableFuture; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ExecutionException; 051import java.util.concurrent.ExecutorService; 052import java.util.concurrent.Executors; 053import java.util.concurrent.Future; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.TimeoutException; 056import java.util.concurrent.atomic.AtomicInteger; 057 058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE; 059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE; 060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT; 061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP; 062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE; 063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE; 064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT; 065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP; 066 067/** 068 * Client side implementation. 069 * The client instance is bound to a given maven repository. 070 * 071 * @since 2.0.1 072 */ 073public class IpcClient { 074 075 static final boolean IS_WINDOWS = 076 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); 077 078 protected volatile boolean initialized; 079 protected final Path lockPath; 080 protected final Path logPath; 081 protected final Path syncPath; 082 protected final boolean noFork; 083 084 protected volatile SocketChannel socket; 085 protected volatile DataOutputStream output; 086 protected volatile DataInputStream input; 087 protected volatile Thread receiver; 088 089 protected final AtomicInteger requestId = new AtomicInteger(); 090 protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>(); 091 092 IpcClient(Path lockPath, Path logPath, Path syncPath) { 093 this.lockPath = lockPath; 094 this.logPath = logPath; 095 this.syncPath = syncPath; 096 this.noFork = Boolean.parseBoolean( 097 System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK))); 098 } 099 100 void ensureInitialized() throws IOException { 101 if (!initialized) { 102 // caller must block on this method 103 synchronized (this) { 104 if (!initialized) { 105 socket = createClient(); 106 ByteChannel wrapper = new ByteChannelWrapper(socket); 107 input = new DataInputStream(Channels.newInputStream(wrapper)); 108 output = new DataOutputStream(Channels.newOutputStream(wrapper)); 109 receiver = new Thread(this::receive); 110 receiver.setDaemon(true); 111 receiver.start(); 112 initialized = true; 113 } 114 } 115 } 116 } 117 118 SocketChannel createClient() throws IOException { 119 SocketFamily family = 120 SocketFamily.valueOf(System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY)); 121 122 Path lockPath = this.lockPath.toAbsolutePath().normalize(); 123 Path lockFile = 124 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH)); 125 if (!Files.isRegularFile(lockFile)) { 126 if (!Files.isDirectory(lockFile.getParent())) { 127 Files.createDirectories(lockFile.getParent()); 128 } 129 } 130 131 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) { 132 try (FileLock lock = raf.getChannel().lock()) { 133 String line = raf.readLine(); 134 if (line != null) { 135 try { 136 SocketAddress address = SocketFamily.fromString(line); 137 return SocketChannel.open(address); 138 } catch (IOException e) { 139 // ignore 140 } 141 } 142 143 ServerSocketChannel ss = family.openServerSocket(); 144 String tmpaddr = SocketFamily.toString(ss.getLocalAddress()); 145 String rand = Long.toHexString(new Random().nextLong()); 146 String nativeName = 147 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME); 148 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName; 149 150 boolean debug = Boolean.parseBoolean( 151 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG))); 152 boolean noNative = Boolean.parseBoolean(System.getProperty( 153 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE))); 154 if (!noNative) { 155 noNative = !Files.isExecutable(syncPath.resolve(syncCmd)); 156 } 157 Closeable close; 158 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log"); 159 List<String> args = new ArrayList<>(); 160 if (noNative) { 161 if (noFork) { 162 IpcServer server = IpcServer.runServer(family, tmpaddr, rand); 163 close = server::close; 164 } else { 165 String javaHome = System.getenv("JAVA_HOME"); 166 if (javaHome == null) { 167 javaHome = System.getProperty("java.home"); 168 } 169 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java"; 170 String java = Paths.get(javaHome) 171 .resolve(javaCmd) 172 .toAbsolutePath() 173 .toString(); 174 args.add(java); 175 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class); 176 args.add("-cp"); 177 args.add(classpath); 178 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 179 if (timeout != null) { 180 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 181 } 182 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 183 args.add(IpcServer.class.getName()); 184 args.add(family.name()); 185 args.add(tmpaddr); 186 args.add(rand); 187 ProcessBuilder processBuilder = new ProcessBuilder(); 188 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 189 Files.createDirectories(logPath); 190 Process process = processBuilder 191 .directory(lockFile.getParent().toFile()) 192 .command(args) 193 .redirectOutput(discard) 194 .redirectError(discard) 195 .start(); 196 close = process::destroyForcibly; 197 } 198 } else { 199 args.add(syncPath.resolve(syncCmd).toString()); 200 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 201 if (timeout != null) { 202 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 203 } 204 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 205 args.add(family.name()); 206 args.add(tmpaddr); 207 args.add(rand); 208 ProcessBuilder processBuilder = new ProcessBuilder(); 209 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 210 Files.createDirectories(logPath); 211 Process process = processBuilder 212 .directory(lockFile.getParent().toFile()) 213 .command(args) 214 .redirectOutput(discard) 215 .redirectError(discard) 216 .start(); 217 close = process::destroyForcibly; 218 } 219 220 ExecutorService es = Executors.newSingleThreadExecutor(); 221 Future<String[]> future = es.submit(() -> { 222 SocketChannel s = ss.accept(); 223 DataInputStream dis = new DataInputStream(Channels.newInputStream(s)); 224 String rand2 = dis.readUTF(); 225 String addr2 = dis.readUTF(); 226 return new String[] {rand2, addr2}; 227 }); 228 String[] res; 229 try { 230 res = future.get(5, TimeUnit.SECONDS); 231 } catch (Exception e) { 232 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) { 233 writer.println("Arguments:"); 234 args.forEach(writer::println); 235 writer.println(); 236 writer.println("Exception:"); 237 e.printStackTrace(writer); 238 } 239 close.close(); 240 throw e; 241 } finally { 242 es.shutdownNow(); 243 ss.close(); 244 } 245 if (!Objects.equals(rand, res[0])) { 246 close.close(); 247 throw new IllegalStateException("IpcServer did not respond with the correct random"); 248 } 249 250 SocketAddress addr = SocketFamily.fromString(res[1]); 251 SocketChannel socket = SocketChannel.open(addr); 252 253 raf.seek(0); 254 raf.writeBytes(res[1] + "\n"); 255 return socket; 256 } catch (Exception e) { 257 throw new RuntimeException("Unable to create and connect to lock server", e); 258 } 259 } 260 } 261 262 private String getJarPath(Class<?> clazz) { 263 String classpath; 264 String className = clazz.getName().replace('.', '/') + ".class"; 265 String url = clazz.getClassLoader().getResource(className).toString(); 266 if (url.startsWith("jar:")) { 267 url = url.substring("jar:".length(), url.indexOf("!/")); 268 if (url.startsWith("file:")) { 269 classpath = url.substring("file:".length()); 270 } else { 271 throw new IllegalStateException(); 272 } 273 } else if (url.startsWith("file:")) { 274 classpath = url.substring("file:".length(), url.indexOf(className)); 275 } else { 276 throw new IllegalStateException(); 277 } 278 if (IS_WINDOWS) { 279 if (classpath.startsWith("/")) { 280 classpath = classpath.substring(1); 281 } 282 classpath = classpath.replace('/', '\\'); 283 } 284 285 return classpath; 286 } 287 288 void receive() { 289 try { 290 while (true) { 291 int id = input.readInt(); 292 int sz = input.readInt(); 293 List<String> s = new ArrayList<>(sz); 294 for (int i = 0; i < sz; i++) { 295 s.add(input.readUTF()); 296 } 297 CompletableFuture<List<String>> f = responses.remove(id); 298 if (f == null) { 299 continue; 300 } 301 if (s.isEmpty()) { 302 f.completeExceptionally(new IOException("Protocol error: empty response")); 303 continue; 304 } 305 f.complete(s); 306 } 307 } catch (EOFException e) { 308 close(new IOException("Server disconnected", e)); 309 } catch (Exception e) { 310 close(e); 311 } 312 } 313 314 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException { 315 ensureInitialized(); 316 DataOutputStream out = output; 317 if (out == null) { 318 throw new IOException("Connection closed"); 319 } 320 int id = requestId.incrementAndGet(); 321 CompletableFuture<List<String>> response = new CompletableFuture<>(); 322 responses.put(id, response); 323 synchronized (out) { 324 out.writeInt(id); 325 out.writeInt(request.size()); 326 for (String s : request) { 327 out.writeUTF(s); 328 } 329 out.flush(); 330 } 331 try { 332 return response.get(time, unit); 333 } catch (InterruptedException e) { 334 responses.remove(id); 335 throw (IOException) new InterruptedIOException("Interrupted").initCause(e); 336 } catch (ExecutionException e) { 337 throw new IOException("Execution error", e); 338 } catch (TimeoutException e) { 339 responses.remove(id); 340 throw e; 341 } 342 } 343 344 void close() { 345 if (noFork) { 346 stopServer(); 347 } 348 close(new IOException("Closing")); 349 } 350 351 synchronized void close(Throwable e) { 352 initialized = false; 353 if (socket != null) { 354 try { 355 socket.close(); 356 } catch (IOException t) { 357 e.addSuppressed(t); 358 } 359 socket = null; 360 input = null; 361 output = null; 362 } 363 if (receiver != null && Thread.currentThread() != receiver) { 364 receiver.interrupt(); 365 try { 366 receiver.join(1000); 367 } catch (InterruptedException t) { 368 e.addSuppressed(t); 369 } 370 } 371 responses.values().forEach(f -> f.completeExceptionally(e)); 372 responses.clear(); 373 } 374 375 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException { 376 RuntimeException error = new RuntimeException("Unable to create new sync context"); 377 for (int i = 0; i < 2; i++) { 378 try { 379 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit); 380 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) { 381 throw new IOException("Unexpected response: " + response); 382 } 383 return response.get(1); 384 } catch (TimeoutException e) { 385 throw e; 386 } catch (Exception e) { 387 close(e); 388 error.addSuppressed(e); 389 } 390 } 391 throw error; 392 } 393 394 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException { 395 try { 396 List<String> req = new ArrayList<>(keys.size() + 2); 397 req.add(REQUEST_ACQUIRE); 398 req.add(contextId); 399 req.addAll(keys); 400 List<String> response = send(req, time, unit); 401 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) { 402 throw new IOException("Unexpected response: " + response); 403 } 404 } catch (TimeoutException e) { 405 throw e; 406 } catch (Exception e) { 407 close(e); 408 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e); 409 } 410 } 411 412 void unlock(String contextId) { 413 try { 414 List<String> response = send(Arrays.asList(REQUEST_CLOSE, contextId), 10, TimeUnit.SECONDS); 415 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) { 416 throw new IOException("Unexpected response: " + response); 417 } 418 } catch (Exception e) { 419 close(e); 420 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e); 421 } 422 } 423 424 /** 425 * To be used in tests to stop server immediately. Should not be used outside of tests. 426 */ 427 void stopServer() { 428 try { 429 List<String> response = send(List.of(REQUEST_STOP), 30, TimeUnit.SECONDS); 430 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) { 431 throw new IOException("Unexpected response: " + response); 432 } 433 } catch (Exception e) { 434 close(e); 435 throw new RuntimeException("Unable to stop server", e); 436 } 437 } 438 439 @Override 440 public String toString() { 441 return "IpcClient{" 442 + "lockPath=" + lockPath + "," 443 + "syncServerPath=" + syncPath + "," 444 + "address='" + getAddress() + "'}"; 445 } 446 447 private String getAddress() { 448 try { 449 return SocketFamily.toString(socket.getLocalAddress()); 450 } catch (IOException e) { 451 return "[not bound]"; 452 } 453 } 454}