001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.named.ipc; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.nio.channels.ByteChannel; 026import java.nio.channels.Channels; 027import java.nio.channels.ServerSocketChannel; 028import java.nio.channels.SocketChannel; 029import java.util.ArrayList; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.CopyOnWriteArrayList; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038 039/** 040 * Implementation of the server side. 041 * The server instance is bound to a given maven repository. 042 * 043 * @since 2.0.1 044 */ 045public class IpcServer { 046 /** 047 * Should the IPC server not fork? (i.e. for testing purposes) 048 * 049 * @configurationSource {@link System#getProperty(String, String)} 050 * @configurationType {@link java.lang.Boolean} 051 * @configurationDefaultValue {@link #DEFAULT_NO_FORK} 052 */ 053 public static final String SYSTEM_PROP_NO_FORK = "aether.named.ipc.nofork"; 054 055 public static final boolean DEFAULT_NO_FORK = false; 056 057 /** 058 * IPC idle timeout in seconds. If there is no IPC request during idle time, it will stop. 059 * 060 * @configurationSource {@link System#getProperty(String, String)} 061 * @configurationType {@link java.lang.Integer} 062 * @configurationDefaultValue {@link #DEFAULT_IDLE_TIMEOUT} 063 */ 064 public static final String SYSTEM_PROP_IDLE_TIMEOUT = "aether.named.ipc.idleTimeout"; 065 066 public static final int DEFAULT_IDLE_TIMEOUT = 300; 067 068 /** 069 * IPC socket family to use. 070 * 071 * @configurationSource {@link System#getProperty(String, String)} 072 * @configurationType {@link java.lang.String} 073 * @configurationDefaultValue {@link #DEFAULT_FAMILY} 074 */ 075 public static final String SYSTEM_PROP_FAMILY = "aether.named.ipc.family"; 076 077 public static final String DEFAULT_FAMILY = "unix"; 078 079 /** 080 * Should the IPC server not use native executable? 081 * 082 * @configurationSource {@link System#getProperty(String, String)} 083 * @configurationType {@link java.lang.Boolean} 084 * @configurationDefaultValue {@link #DEFAULT_NO_NATIVE} 085 */ 086 public static final String SYSTEM_PROP_NO_NATIVE = "aether.named.ipc.nonative"; 087 088 public static final boolean DEFAULT_NO_NATIVE = true; 089 090 /** 091 * The name if the IPC server native executable (without file extension like ".exe") 092 * 093 * @configurationSource {@link System#getProperty(String, String)} 094 * @configurationType {@link java.lang.String} 095 * @configurationDefaultValue {@link #DEFAULT_NATIVE_NAME} 096 */ 097 public static final String SYSTEM_PROP_NATIVE_NAME = "aether.named.ipc.nativeName"; 098 099 public static final String DEFAULT_NATIVE_NAME = "ipc-sync"; 100 101 /** 102 * Should the IPC server log debug messages? (i.e. for testing purposes) 103 * 104 * @configurationSource {@link System#getProperty(String, String)} 105 * @configurationType {@link java.lang.Boolean} 106 * @configurationDefaultValue {@link #DEFAULT_DEBUG} 107 */ 108 public static final String SYSTEM_PROP_DEBUG = "aether.named.ipc.debug"; 109 110 public static final boolean DEFAULT_DEBUG = false; 111 112 private final ServerSocketChannel serverSocket; 113 private final Map<SocketChannel, Thread> clients = new ConcurrentHashMap<>(); 114 private final AtomicInteger counter = new AtomicInteger(); 115 private final Map<String, Lock> locks = new ConcurrentHashMap<>(); 116 private final Map<String, Context> contexts = new ConcurrentHashMap<>(); 117 private static final boolean DEBUG = 118 Boolean.parseBoolean(System.getProperty(SYSTEM_PROP_DEBUG, Boolean.toString(DEFAULT_DEBUG))); 119 private final long idleTimeout; 120 private volatile long lastUsed; 121 private volatile boolean closing; 122 123 public IpcServer(SocketFamily family) throws IOException { 124 serverSocket = family.openServerSocket(); 125 long timeout = TimeUnit.SECONDS.toNanos(DEFAULT_IDLE_TIMEOUT); 126 String str = System.getProperty(SYSTEM_PROP_IDLE_TIMEOUT); 127 if (str != null) { 128 try { 129 TimeUnit unit = TimeUnit.SECONDS; 130 if (str.endsWith("ms")) { 131 unit = TimeUnit.MILLISECONDS; 132 str = str.substring(0, str.length() - 2); 133 } 134 long dur = Long.parseLong(str); 135 timeout = unit.toNanos(dur); 136 } catch (NumberFormatException e) { 137 error("Property " + SYSTEM_PROP_IDLE_TIMEOUT + " specified with invalid value: " + str, e); 138 } 139 } 140 idleTimeout = timeout; 141 } 142 143 public static void main(String[] args) throws Exception { 144 // When spawning a new process, the child process is create within 145 // the same process group. This means that a few signals are sent 146 // to the whole group. This is the case for SIGINT (Ctrl-C) and 147 // SIGTSTP (Ctrl-Z) which are both sent to all the processed in the 148 // group when initiated from the controlling terminal. 149 // This is only a problem when the client creates the daemon, but 150 // without ignoring those signals, a client being interrupted will 151 // also interrupt and kill the daemon. 152 try { 153 sun.misc.Signal.handle(new sun.misc.Signal("INT"), sun.misc.SignalHandler.SIG_IGN); 154 if (!IpcClient.IS_WINDOWS) { 155 sun.misc.Signal.handle(new sun.misc.Signal("TSTP"), sun.misc.SignalHandler.SIG_IGN); 156 } 157 } catch (Throwable t) { 158 error("Unable to ignore INT and TSTP signals", t); 159 } 160 161 String family = args[0]; 162 String tmpAddress = args[1]; 163 String rand = args[2]; 164 165 runServer(SocketFamily.valueOf(family), tmpAddress, rand); 166 } 167 168 static IpcServer runServer(SocketFamily family, String tmpAddress, String rand) throws IOException { 169 IpcServer server = new IpcServer(family); 170 run(server::run, false); // this is one-off 171 String address = SocketFamily.toString(server.getLocalAddress()); 172 SocketAddress socketAddress = SocketFamily.fromString(tmpAddress); 173 try (SocketChannel socket = SocketChannel.open(socketAddress)) { 174 try (DataOutputStream dos = new DataOutputStream(Channels.newOutputStream(socket))) { 175 dos.writeUTF(rand); 176 dos.writeUTF(address); 177 dos.flush(); 178 } 179 } 180 181 return server; 182 } 183 184 private static void debug(String msg, Object... args) { 185 if (DEBUG) { 186 System.out.printf("[ipc] [debug] " + msg + "\n", args); 187 } 188 } 189 190 private static void info(String msg, Object... args) { 191 System.out.printf("[ipc] [info] " + msg + "\n", args); 192 } 193 194 private static void error(String msg, Throwable t) { 195 System.out.println("[ipc] [error] " + msg); 196 t.printStackTrace(System.out); 197 } 198 199 private static void run(Runnable runnable, boolean daemon) { 200 Thread thread = new Thread(runnable); 201 if (daemon) { 202 thread.setDaemon(true); 203 } 204 thread.start(); 205 } 206 207 public SocketAddress getLocalAddress() throws IOException { 208 return serverSocket.getLocalAddress(); 209 } 210 211 public void run() { 212 try { 213 info("IpcServer started at %s", getLocalAddress().toString()); 214 use(); 215 run(this::expirationCheck, true); 216 while (!closing) { 217 SocketChannel socket = this.serverSocket.accept(); 218 run(() -> client(socket), false); 219 } 220 } catch (Throwable t) { 221 if (!closing) { 222 error("Error running sync server loop", t); 223 } 224 } 225 } 226 227 private void client(SocketChannel socket) { 228 int c; 229 synchronized (clients) { 230 clients.put(socket, Thread.currentThread()); 231 c = clients.size(); 232 } 233 info("New client connected (%d connected)", c); 234 use(); 235 Map<String, Context> clientContexts = new ConcurrentHashMap<>(); 236 try { 237 ByteChannel wrapper = new ByteChannelWrapper(socket); 238 DataInputStream input = new DataInputStream(Channels.newInputStream(wrapper)); 239 DataOutputStream output = new DataOutputStream(Channels.newOutputStream(wrapper)); 240 while (!closing) { 241 int requestId = input.readInt(); 242 int sz = input.readInt(); 243 List<String> request = new ArrayList<>(sz); 244 for (int i = 0; i < sz; i++) { 245 request.add(input.readUTF()); 246 } 247 if (request.isEmpty()) { 248 throw new IOException("Received invalid request"); 249 } 250 use(); 251 String contextId; 252 Context context; 253 String command = request.remove(0); 254 switch (command) { 255 case IpcMessages.REQUEST_CONTEXT: 256 if (request.size() != 1) { 257 throw new IOException("Expected one argument for " + command + " but got " + request); 258 } 259 boolean shared = Boolean.parseBoolean(request.remove(0)); 260 context = new Context(shared); 261 contexts.put(context.id, context); 262 clientContexts.put(context.id, context); 263 synchronized (output) { 264 debug("Created context %s", context.id); 265 output.writeInt(requestId); 266 output.writeInt(2); 267 output.writeUTF(IpcMessages.RESPONSE_CONTEXT); 268 output.writeUTF(context.id); 269 output.flush(); 270 } 271 break; 272 case IpcMessages.REQUEST_ACQUIRE: 273 if (request.isEmpty()) { 274 throw new IOException( 275 "Expected at least one argument for " + command + " but got " + request); 276 } 277 contextId = request.remove(0); 278 context = contexts.get(contextId); 279 if (context == null) { 280 throw new IOException( 281 "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet()); 282 } 283 context.lock(request).thenRun(() -> { 284 try { 285 synchronized (output) { 286 debug("Locking in context %s", context.id); 287 output.writeInt(requestId); 288 output.writeInt(1); 289 output.writeUTF(IpcMessages.RESPONSE_ACQUIRE); 290 output.flush(); 291 } 292 } catch (IOException e) { 293 try { 294 socket.close(); 295 } catch (IOException ioException) { 296 e.addSuppressed(ioException); 297 } 298 error("Error writing lock response", e); 299 } 300 }); 301 break; 302 case IpcMessages.REQUEST_CLOSE: 303 if (request.size() != 1) { 304 throw new IOException("Expected one argument for " + command + " but got " + request); 305 } 306 contextId = request.remove(0); 307 context = contexts.remove(contextId); 308 clientContexts.remove(contextId); 309 if (context == null) { 310 throw new IOException( 311 "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet()); 312 } 313 context.unlock(); 314 synchronized (output) { 315 debug("Closing context %s", context.id); 316 output.writeInt(requestId); 317 output.writeInt(1); 318 output.writeUTF(IpcMessages.RESPONSE_CLOSE); 319 output.flush(); 320 } 321 break; 322 case IpcMessages.REQUEST_STOP: 323 if (!request.isEmpty()) { 324 throw new IOException("Expected zero argument for " + command + " but got " + request); 325 } 326 synchronized (output) { 327 debug("Stopping server"); 328 output.writeInt(requestId); 329 output.writeInt(1); 330 output.writeUTF(IpcMessages.RESPONSE_STOP); 331 output.flush(); 332 } 333 close(); 334 break; 335 default: 336 throw new IOException("Unknown request: " + request.get(0)); 337 } 338 } 339 } catch (Throwable t) { 340 if (!closing) { 341 error("Error processing request", t); 342 } 343 } finally { 344 if (!closing) { 345 info("Client disconnecting..."); 346 } 347 clientContexts.values().forEach(context -> { 348 contexts.remove(context.id); 349 context.unlock(); 350 }); 351 try { 352 socket.close(); 353 } catch (IOException ioException) { 354 // ignore 355 } 356 synchronized (clients) { 357 clients.remove(socket); 358 c = clients.size(); 359 } 360 if (!closing) { 361 info("%d clients remained", c); 362 } 363 } 364 } 365 366 private void use() { 367 lastUsed = System.nanoTime(); 368 } 369 370 private void expirationCheck() { 371 while (true) { 372 long current = System.nanoTime(); 373 long left = (lastUsed + idleTimeout) - current; 374 if (clients.isEmpty() && left < 0) { 375 info("IpcServer expired, closing"); 376 close(); 377 break; 378 } else { 379 try { 380 Thread.sleep(Math.max(1, TimeUnit.NANOSECONDS.toMillis(left))); 381 } catch (InterruptedException e) { 382 info("IpcServer expiration check interrupted, closing"); 383 close(); 384 break; 385 } 386 } 387 } 388 } 389 390 void close() { 391 closing = true; 392 try { 393 serverSocket.close(); 394 } catch (IOException e) { 395 error("Error closing server socket", e); 396 } 397 clients.forEach((s, t) -> { 398 try { 399 s.close(); 400 } catch (IOException e) { 401 // ignore 402 } 403 t.interrupt(); 404 }); 405 } 406 407 static class Waiter { 408 final Context context; 409 final CompletableFuture<Void> future; 410 411 Waiter(Context context, CompletableFuture<Void> future) { 412 this.context = context; 413 this.future = future; 414 } 415 } 416 417 static class Lock { 418 419 final String key; 420 421 List<Context> holders; 422 List<Waiter> waiters; 423 424 Lock(String key) { 425 this.key = key; 426 } 427 428 public synchronized CompletableFuture<Void> lock(Context context) { 429 if (holders == null) { 430 holders = new ArrayList<>(); 431 } 432 if (holders.isEmpty() || holders.get(0).shared && context.shared) { 433 holders.add(context); 434 return CompletableFuture.completedFuture(null); 435 } 436 if (waiters == null) { 437 waiters = new ArrayList<>(); 438 } 439 440 CompletableFuture<Void> future = new CompletableFuture<>(); 441 waiters.add(new Waiter(context, future)); 442 return future; 443 } 444 445 public void unlock(Context context) { 446 List<CompletableFuture<Void>> toComplete; 447 synchronized (this) { 448 toComplete = new ArrayList<>(); 449 if (holders.remove(context)) { 450 while (waiters != null 451 && !waiters.isEmpty() 452 && (holders.isEmpty() || holders.get(0).shared && waiters.get(0).context.shared)) { 453 Waiter waiter = waiters.remove(0); 454 holders.add(waiter.context); 455 toComplete.add(waiter.future); 456 } 457 } else if (waiters != null) { 458 for (Iterator<Waiter> it = waiters.iterator(); it.hasNext(); ) { 459 Waiter waiter = it.next(); 460 if (waiter.context == context) { 461 it.remove(); 462 waiter.future.cancel(false); 463 } 464 } 465 } 466 } 467 toComplete.forEach(f -> f.complete(null)); 468 } 469 470 public synchronized boolean isEmpty() { 471 return (holders == null || holders.isEmpty()) && (waiters == null || waiters.isEmpty()); 472 } 473 } 474 475 class Context { 476 477 final String id; 478 final boolean shared; 479 final List<String> locks = new CopyOnWriteArrayList<>(); 480 481 Context(boolean shared) { 482 this.id = String.format("%08x", counter.incrementAndGet()); 483 this.shared = shared; 484 } 485 486 public CompletableFuture<?> lock(List<String> keys) { 487 locks.addAll(keys); 488 CompletableFuture<?>[] futures = keys.stream() 489 .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new)) 490 .map(l -> l.lock(this)) 491 .toArray(CompletableFuture[]::new); 492 return CompletableFuture.allOf(futures); 493 } 494 495 public void unlock() { 496 locks.stream() 497 .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new)) 498 .forEach(l -> { 499 l.unlock(this); 500 IpcServer.this.locks.compute(l.key, (k, v) -> (v == l && v.isEmpty()) ? null : v); 501 }); 502 } 503 } 504}