001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.named.ipc; 020 021import java.io.Closeable; 022import java.io.DataInputStream; 023import java.io.DataOutputStream; 024import java.io.EOFException; 025import java.io.File; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.io.PrintWriter; 030import java.io.RandomAccessFile; 031import java.net.SocketAddress; 032import java.nio.channels.ByteChannel; 033import java.nio.channels.Channels; 034import java.nio.channels.FileLock; 035import java.nio.channels.ServerSocketChannel; 036import java.nio.channels.SocketChannel; 037import java.nio.file.Files; 038import java.nio.file.Path; 039import java.nio.file.Paths; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.List; 044import java.util.Locale; 045import java.util.Map; 046import java.util.Objects; 047import java.util.Random; 048import java.util.concurrent.CompletableFuture; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ExecutionException; 051import java.util.concurrent.ExecutorService; 052import java.util.concurrent.Executors; 053import java.util.concurrent.Future; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.TimeoutException; 056import java.util.concurrent.atomic.AtomicInteger; 057 058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE; 059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE; 060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT; 061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP; 062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE; 063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE; 064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT; 065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP; 066 067/** 068 * Client side implementation. 069 * The client instance is bound to a given maven repository. 070 * 071 * @since 2.0.1 072 */ 073public class IpcClient { 074 075 static final boolean IS_WINDOWS = 076 System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); 077 078 volatile boolean initialized; 079 Path lockPath; 080 Path logPath; 081 Path syncPath; 082 SocketChannel socket; 083 DataOutputStream output; 084 DataInputStream input; 085 Thread receiver; 086 AtomicInteger requestId = new AtomicInteger(); 087 Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>(); 088 089 IpcClient(Path lockPath, Path logPath, Path syncPath) { 090 this.lockPath = lockPath; 091 this.logPath = logPath; 092 this.syncPath = syncPath; 093 } 094 095 void ensureInitialized() throws IOException { 096 if (!initialized) { 097 synchronized (this) { 098 if (!initialized) { 099 socket = createClient(); 100 ByteChannel wrapper = new ByteChannelWrapper(socket); 101 input = new DataInputStream(Channels.newInputStream(wrapper)); 102 output = new DataOutputStream(Channels.newOutputStream(wrapper)); 103 receiver = new Thread(this::receive); 104 receiver.setDaemon(true); 105 receiver.start(); 106 initialized = true; 107 } 108 } 109 } 110 } 111 112 SocketChannel createClient() throws IOException { 113 String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY); 114 SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet; 115 116 Path lockPath = this.lockPath.toAbsolutePath().normalize(); 117 Path lockFile = 118 lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH)); 119 if (!Files.isRegularFile(lockFile)) { 120 if (!Files.isDirectory(lockFile.getParent())) { 121 Files.createDirectories(lockFile.getParent()); 122 } 123 } 124 125 try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) { 126 try (FileLock lock = raf.getChannel().lock()) { 127 String line = raf.readLine(); 128 if (line != null) { 129 try { 130 SocketAddress address = SocketFamily.fromString(line); 131 return SocketChannel.open(address); 132 } catch (IOException e) { 133 // ignore 134 } 135 } 136 137 ServerSocketChannel ss = family.openServerSocket(); 138 String tmpaddr = SocketFamily.toString(ss.getLocalAddress()); 139 String rand = Long.toHexString(new Random().nextLong()); 140 String nativeName = 141 System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME); 142 String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName; 143 144 boolean debug = Boolean.parseBoolean( 145 System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG))); 146 boolean noNative = Boolean.parseBoolean(System.getProperty( 147 IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE))); 148 if (!noNative) { 149 noNative = !Files.isExecutable(syncPath.resolve(syncCmd)); 150 } 151 Closeable close; 152 Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log"); 153 List<String> args = new ArrayList<>(); 154 if (noNative) { 155 boolean noFork = Boolean.parseBoolean(System.getProperty( 156 IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK))); 157 if (noFork) { 158 IpcServer server = IpcServer.runServer(family, tmpaddr, rand); 159 close = server::close; 160 } else { 161 String javaHome = System.getenv("JAVA_HOME"); 162 if (javaHome == null) { 163 javaHome = System.getProperty("java.home"); 164 } 165 String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java"; 166 String java = Paths.get(javaHome) 167 .resolve(javaCmd) 168 .toAbsolutePath() 169 .toString(); 170 args.add(java); 171 String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class); 172 args.add("-cp"); 173 args.add(classpath); 174 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 175 if (timeout != null) { 176 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 177 } 178 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 179 args.add(IpcServer.class.getName()); 180 args.add(family.name()); 181 args.add(tmpaddr); 182 args.add(rand); 183 ProcessBuilder processBuilder = new ProcessBuilder(); 184 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 185 Files.createDirectories(logPath); 186 Process process = processBuilder 187 .directory(lockFile.getParent().toFile()) 188 .command(args) 189 .redirectOutput(discard) 190 .redirectError(discard) 191 .start(); 192 close = process::destroyForcibly; 193 } 194 } else { 195 args.add(syncPath.resolve(syncCmd).toString()); 196 String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT); 197 if (timeout != null) { 198 args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout); 199 } 200 args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug); 201 args.add(family.name()); 202 args.add(tmpaddr); 203 args.add(rand); 204 ProcessBuilder processBuilder = new ProcessBuilder(); 205 ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile()); 206 Files.createDirectories(logPath); 207 Process process = processBuilder 208 .directory(lockFile.getParent().toFile()) 209 .command(args) 210 .redirectOutput(discard) 211 .redirectError(discard) 212 .start(); 213 close = process::destroyForcibly; 214 } 215 216 ExecutorService es = Executors.newSingleThreadExecutor(); 217 Future<String[]> future = es.submit(() -> { 218 SocketChannel s = ss.accept(); 219 DataInputStream dis = new DataInputStream(Channels.newInputStream(s)); 220 String rand2 = dis.readUTF(); 221 String addr2 = dis.readUTF(); 222 return new String[] {rand2, addr2}; 223 }); 224 String[] res; 225 try { 226 res = future.get(5, TimeUnit.SECONDS); 227 } catch (Exception e) { 228 try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) { 229 writer.println("Arguments:"); 230 args.forEach(writer::println); 231 writer.println(); 232 writer.println("Exception:"); 233 e.printStackTrace(writer); 234 } 235 close.close(); 236 throw e; 237 } finally { 238 es.shutdownNow(); 239 ss.close(); 240 } 241 if (!Objects.equals(rand, res[0])) { 242 close.close(); 243 throw new IllegalStateException("IpcServer did not respond with the correct random"); 244 } 245 246 SocketAddress addr = SocketFamily.fromString(res[1]); 247 SocketChannel socket = SocketChannel.open(addr); 248 249 raf.seek(0); 250 raf.writeBytes(res[1] + "\n"); 251 return socket; 252 } catch (Exception e) { 253 throw new RuntimeException("Unable to create and connect to lock server", e); 254 } 255 } 256 } 257 258 private String getJarPath(Class<?> clazz) { 259 String classpath; 260 String className = clazz.getName().replace('.', '/') + ".class"; 261 String url = clazz.getClassLoader().getResource(className).toString(); 262 if (url.startsWith("jar:")) { 263 url = url.substring("jar:".length(), url.indexOf("!/")); 264 if (url.startsWith("file:")) { 265 classpath = url.substring("file:".length()); 266 } else { 267 throw new IllegalStateException(); 268 } 269 } else if (url.startsWith("file:")) { 270 classpath = url.substring("file:".length(), url.indexOf(className)); 271 } else { 272 throw new IllegalStateException(); 273 } 274 if (IS_WINDOWS) { 275 if (classpath.startsWith("/")) { 276 classpath = classpath.substring(1); 277 } 278 classpath = classpath.replace('/', '\\'); 279 } 280 281 return classpath; 282 } 283 284 void receive() { 285 try { 286 while (true) { 287 int id = input.readInt(); 288 int sz = input.readInt(); 289 List<String> s = new ArrayList<>(sz); 290 for (int i = 0; i < sz; i++) { 291 s.add(input.readUTF()); 292 } 293 CompletableFuture<List<String>> f = responses.remove(id); 294 if (f == null || s.isEmpty()) { 295 throw new IllegalStateException("Protocol error"); 296 } 297 f.complete(s); 298 } 299 } catch (EOFException e) { 300 // server is stopped; just quit 301 } catch (Exception e) { 302 close(e); 303 } 304 } 305 306 List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException { 307 ensureInitialized(); 308 int id = requestId.incrementAndGet(); 309 CompletableFuture<List<String>> response = new CompletableFuture<>(); 310 responses.put(id, response); 311 synchronized (output) { 312 output.writeInt(id); 313 output.writeInt(request.size()); 314 for (String s : request) { 315 output.writeUTF(s); 316 } 317 output.flush(); 318 } 319 try { 320 return response.get(time, unit); 321 } catch (InterruptedException e) { 322 throw (IOException) new InterruptedIOException("Interrupted").initCause(e); 323 } catch (ExecutionException e) { 324 throw new IOException("Execution error", e); 325 } 326 } 327 328 void close() { 329 close(new IOException("Closing")); 330 } 331 332 synchronized void close(Throwable e) { 333 if (socket != null) { 334 try { 335 socket.close(); 336 } catch (IOException t) { 337 e.addSuppressed(t); 338 } 339 socket = null; 340 input = null; 341 output = null; 342 } 343 if (receiver != null) { 344 receiver.interrupt(); 345 try { 346 receiver.join(1000); 347 } catch (InterruptedException t) { 348 e.addSuppressed(t); 349 } 350 } 351 responses.values().forEach(f -> f.completeExceptionally(e)); 352 responses.clear(); 353 } 354 355 String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException { 356 RuntimeException error = new RuntimeException("Unable to create new sync context"); 357 for (int i = 0; i < 2; i++) { 358 try { 359 List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit); 360 if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) { 361 throw new IOException("Unexpected response: " + response); 362 } 363 return response.get(1); 364 } catch (TimeoutException e) { 365 throw e; 366 } catch (Exception e) { 367 close(e); 368 error.addSuppressed(e); 369 } 370 } 371 throw error; 372 } 373 374 void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException { 375 try { 376 List<String> req = new ArrayList<>(keys.size() + 2); 377 req.add(REQUEST_ACQUIRE); 378 req.add(contextId); 379 req.addAll(keys); 380 List<String> response = send(req, time, unit); 381 if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) { 382 throw new IOException("Unexpected response: " + response); 383 } 384 } catch (TimeoutException e) { 385 throw e; 386 } catch (Exception e) { 387 close(e); 388 throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e); 389 } 390 } 391 392 void unlock(String contextId) { 393 try { 394 List<String> response = 395 send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS); 396 if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) { 397 throw new IOException("Unexpected response: " + response); 398 } 399 } catch (Exception e) { 400 close(e); 401 throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e); 402 } 403 } 404 405 /** 406 * To be used in tests to stop server immediately. Should not be used outside of tests. 407 */ 408 void stopServer() { 409 try { 410 List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS); 411 if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) { 412 throw new IOException("Unexpected response: " + response); 413 } 414 } catch (Exception e) { 415 close(e); 416 throw new RuntimeException("Unable to stop server", e); 417 } 418 } 419 420 @Override 421 public String toString() { 422 return "IpcClient{" 423 + "lockPath=" + lockPath + "," 424 + "syncServerPath=" + syncPath + "," 425 + "address='" + getAddress() + "'}"; 426 } 427 428 private String getAddress() { 429 try { 430 return SocketFamily.toString(socket.getLocalAddress()); 431 } catch (IOException e) { 432 return "[not bound]"; 433 } 434 } 435}