001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.named.ipc; 020 021import java.io.Closeable; 022import java.io.DataInputStream; 023import java.io.DataOutputStream; 024import java.io.EOFException; 025import java.io.File; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.io.PrintWriter; 030import java.io.RandomAccessFile; 031import java.net.SocketAddress; 032import java.nio.channels.ByteChannel; 033import java.nio.channels.Channels; 034import java.nio.channels.FileLock; 035import java.nio.channels.ServerSocketChannel; 036import java.nio.channels.SocketChannel; 037import java.nio.file.Files; 038import java.nio.file.Path; 039import java.nio.file.Paths; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.List; 044import java.util.Locale; 045import java.util.Map; 046import java.util.Objects; 047import java.util.Random; 048import java.util.concurrent.CompletableFuture; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ExecutionException; 051import java.util.concurrent.ExecutorService; 052import java.util.concurrent.Executors; 053import java.util.concurrent.Future; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.TimeoutException; 056import java.util.concurrent.atomic.AtomicInteger; 057 058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE; 059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE; 060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT; 061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP; 062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE; 063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE; 064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT; 065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP; 066 067/** 068 * Client side implementation. 069 * The client instance is bound to a given maven repository. 070 * 071 * @since 2.0.1 072 */ 073public class IpcClient { 074 075 static final boolean IS_WINDOWS = 076 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); 077 078 protected volatile boolean initialized; 079 protected final Path lockPath; 080 protected final Path logPath; 081 protected final Path syncPath; 082 protected final boolean noFork; 083 084 protected SocketChannel socket; 085 protected DataOutputStream output; 086 protected DataInputStream input; 087 protected Thread receiver; 088 089 protected final AtomicInteger requestId = new AtomicInteger(); 090 protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>(); 091 092 IpcClient(Path lockPath, Path logPath, Path syncPath) { 093 this.lockPath = lockPath; 094 this.logPath = logPath; 095 this.syncPath = syncPath; 096 this.noFork = Boolean.parseBoolean( 097 System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK))); 098 } 099 100 void ensureInitialized() throws IOException { 101 if (!initialized) { 102 // caller must block on this method 103 synchronized (this) { 104 if (!initialized) { 105 socket = createClient(); 106 ByteChannel wrapper = new ByteChannelWrapper(socket); 107 input = new DataInputStream(Channels.newInputStream(wrapper)); 108 output = new DataOutputStream(Channels.newOutputStream(wrapper)); 109 receiver = new Thread(this::receive); 110 receiver.setDaemon(true); 111 receiver.start(); 112 initialized = true; 113 } 114 } 115 } 116 } 117 118 SocketChannel createClient() throws IOException { 119 String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY); 120 SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet; 121 122 Path lockPath = this.lockPath.toAbsolutePath().normalize(); 123 Path lockFile = 124 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH)); 125 if (!Files.isRegularFile(lockFile)) { 126 if (!Files.isDirectory(lockFile.getParent())) { 127 Files.createDirectories(lockFile.getParent()); 128 } 129 } 130 131 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) { 132 try (FileLock lock = raf.getChannel().lock()) { 133 String line = raf.readLine(); 134 if (line != null) { 135 try { 136 SocketAddress address = SocketFamily.fromString(line); 137 return SocketChannel.open(address); 138 } catch (IOException e) { 139 // ignore 140 } 141 } 142 143 ServerSocketChannel ss = family.openServerSocket(); 144 String tmpaddr = SocketFamily.toString(ss.getLocalAddress()); 145 String rand = Long.toHexString(new Random().nextLong()); 146 String nativeName = 147 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME); 148 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName; 149 150 boolean debug = Boolean.parseBoolean( 151 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG))); 152 boolean noNative = Boolean.parseBoolean(System.getProperty( 153 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE))); 154 if (!noNative) { 155 noNative = !Files.isExecutable(syncPath.resolve(syncCmd)); 156 } 157 Closeable close; 158 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log"); 159 List<String> args = new ArrayList<>(); 160 if (noNative) { 161 if (noFork) { 162 IpcServer server = IpcServer.runServer(family, tmpaddr, rand); 163 close = server::close; 164 } else { 165 String javaHome = System.getenv("JAVA_HOME"); 166 if (javaHome == null) { 167 javaHome = System.getProperty("java.home"); 168 } 169 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java"; 170 String java = Paths.get(javaHome) 171 .resolve(javaCmd) 172 .toAbsolutePath() 173 .toString(); 174 args.add(java); 175 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class); 176 args.add("-cp"); 177 args.add(classpath); 178 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 179 if (timeout != null) { 180 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 181 } 182 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 183 args.add(IpcServer.class.getName()); 184 args.add(family.name()); 185 args.add(tmpaddr); 186 args.add(rand); 187 ProcessBuilder processBuilder = new ProcessBuilder(); 188 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 189 Files.createDirectories(logPath); 190 Process process = processBuilder 191 .directory(lockFile.getParent().toFile()) 192 .command(args) 193 .redirectOutput(discard) 194 .redirectError(discard) 195 .start(); 196 close = process::destroyForcibly; 197 } 198 } else { 199 args.add(syncPath.resolve(syncCmd).toString()); 200 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 201 if (timeout != null) { 202 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 203 } 204 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 205 args.add(family.name()); 206 args.add(tmpaddr); 207 args.add(rand); 208 ProcessBuilder processBuilder = new ProcessBuilder(); 209 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 210 Files.createDirectories(logPath); 211 Process process = processBuilder 212 .directory(lockFile.getParent().toFile()) 213 .command(args) 214 .redirectOutput(discard) 215 .redirectError(discard) 216 .start(); 217 close = process::destroyForcibly; 218 } 219 220 ExecutorService es = Executors.newSingleThreadExecutor(); 221 Future<String[]> future = es.submit(() -> { 222 SocketChannel s = ss.accept(); 223 DataInputStream dis = new DataInputStream(Channels.newInputStream(s)); 224 String rand2 = dis.readUTF(); 225 String addr2 = dis.readUTF(); 226 return new String[] {rand2, addr2}; 227 }); 228 String[] res; 229 try { 230 res = future.get(5, TimeUnit.SECONDS); 231 } catch (Exception e) { 232 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) { 233 writer.println("Arguments:"); 234 args.forEach(writer::println); 235 writer.println(); 236 writer.println("Exception:"); 237 e.printStackTrace(writer); 238 } 239 close.close(); 240 throw e; 241 } finally { 242 es.shutdownNow(); 243 ss.close(); 244 } 245 if (!Objects.equals(rand, res[0])) { 246 close.close(); 247 throw new IllegalStateException("IpcServer did not respond with the correct random"); 248 } 249 250 SocketAddress addr = SocketFamily.fromString(res[1]); 251 SocketChannel socket = SocketChannel.open(addr); 252 253 raf.seek(0); 254 raf.writeBytes(res[1] + "\n"); 255 return socket; 256 } catch (Exception e) { 257 throw new RuntimeException("Unable to create and connect to lock server", e); 258 } 259 } 260 } 261 262 private String getJarPath(Class<?> clazz) { 263 String classpath; 264 String className = clazz.getName().replace('.', '/') + ".class"; 265 String url = clazz.getClassLoader().getResource(className).toString(); 266 if (url.startsWith("jar:")) { 267 url = url.substring("jar:".length(), url.indexOf("!/")); 268 if (url.startsWith("file:")) { 269 classpath = url.substring("file:".length()); 270 } else { 271 throw new IllegalStateException(); 272 } 273 } else if (url.startsWith("file:")) { 274 classpath = url.substring("file:".length(), url.indexOf(className)); 275 } else { 276 throw new IllegalStateException(); 277 } 278 if (IS_WINDOWS) { 279 if (classpath.startsWith("/")) { 280 classpath = classpath.substring(1); 281 } 282 classpath = classpath.replace('/', '\\'); 283 } 284 285 return classpath; 286 } 287 288 void receive() { 289 try { 290 while (true) { 291 int id = input.readInt(); 292 int sz = input.readInt(); 293 List<String> s = new ArrayList<>(sz); 294 for (int i = 0; i < sz; i++) { 295 s.add(input.readUTF()); 296 } 297 CompletableFuture<List<String>> f = responses.remove(id); 298 if (f == null || s.isEmpty()) { 299 throw new IllegalStateException("Protocol error"); 300 } 301 f.complete(s); 302 } 303 } catch (EOFException e) { 304 // server is stopped; just quit 305 } catch (Exception e) { 306 close(e); 307 } 308 } 309 310 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException { 311 ensureInitialized(); 312 int id = requestId.incrementAndGet(); 313 CompletableFuture<List<String>> response = new CompletableFuture<>(); 314 responses.put(id, response); 315 synchronized (output) { 316 output.writeInt(id); 317 output.writeInt(request.size()); 318 for (String s : request) { 319 output.writeUTF(s); 320 } 321 output.flush(); 322 } 323 try { 324 return response.get(time, unit); 325 } catch (InterruptedException e) { 326 throw (IOException) new InterruptedIOException("Interrupted").initCause(e); 327 } catch (ExecutionException e) { 328 throw new IOException("Execution error", e); 329 } 330 } 331 332 void close() { 333 if (noFork) { 334 stopServer(); 335 } 336 close(new IOException("Closing")); 337 } 338 339 synchronized void close(Throwable e) { 340 if (socket != null) { 341 try { 342 socket.close(); 343 } catch (IOException t) { 344 e.addSuppressed(t); 345 } 346 socket = null; 347 input = null; 348 output = null; 349 } 350 if (receiver != null) { 351 receiver.interrupt(); 352 try { 353 receiver.join(1000); 354 } catch (InterruptedException t) { 355 e.addSuppressed(t); 356 } 357 } 358 responses.values().forEach(f -> f.completeExceptionally(e)); 359 responses.clear(); 360 } 361 362 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException { 363 RuntimeException error = new RuntimeException("Unable to create new sync context"); 364 for (int i = 0; i < 2; i++) { 365 try { 366 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit); 367 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) { 368 throw new IOException("Unexpected response: " + response); 369 } 370 return response.get(1); 371 } catch (TimeoutException e) { 372 throw e; 373 } catch (Exception e) { 374 close(e); 375 error.addSuppressed(e); 376 } 377 } 378 throw error; 379 } 380 381 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException { 382 try { 383 List<String> req = new ArrayList<>(keys.size() + 2); 384 req.add(REQUEST_ACQUIRE); 385 req.add(contextId); 386 req.addAll(keys); 387 List<String> response = send(req, time, unit); 388 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) { 389 throw new IOException("Unexpected response: " + response); 390 } 391 } catch (TimeoutException e) { 392 throw e; 393 } catch (Exception e) { 394 close(e); 395 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e); 396 } 397 } 398 399 void unlock(String contextId) { 400 try { 401 List<String> response = 402 send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS); 403 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) { 404 throw new IOException("Unexpected response: " + response); 405 } 406 } catch (Exception e) { 407 close(e); 408 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e); 409 } 410 } 411 412 /** 413 * To be used in tests to stop server immediately. Should not be used outside of tests. 414 */ 415 void stopServer() { 416 try { 417 List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS); 418 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) { 419 throw new IOException("Unexpected response: " + response); 420 } 421 } catch (Exception e) { 422 close(e); 423 throw new RuntimeException("Unable to stop server", e); 424 } 425 } 426 427 @Override 428 public String toString() { 429 return "IpcClient{" 430 + "lockPath=" + lockPath + "," 431 + "syncServerPath=" + syncPath + "," 432 + "address='" + getAddress() + "'}"; 433 } 434 435 private String getAddress() { 436 try { 437 return SocketFamily.toString(socket.getLocalAddress()); 438 } catch (IOException e) { 439 return "[not bound]"; 440 } 441 } 442}