1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.named.ipc;
20
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.net.SocketAddress;
25 import java.nio.channels.ByteChannel;
26 import java.nio.channels.Channels;
27 import java.nio.channels.ServerSocketChannel;
28 import java.nio.channels.SocketChannel;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40
41
42
43
44
45
46 public class IpcServer {
47
48
49
50
51
52
53
54 public static final String SYSTEM_PROP_NO_FORK = "aether.named.ipc.nofork";
55
56 public static final boolean DEFAULT_NO_FORK = false;
57
58
59
60
61
62
63
64
65 public static final String SYSTEM_PROP_IDLE_TIMEOUT = "aether.named.ipc.idleTimeout";
66
67 public static final int DEFAULT_IDLE_TIMEOUT = 60;
68
69
70
71
72
73
74
75
76 public static final String SYSTEM_PROP_FAMILY = "aether.named.ipc.family";
77
78 public static final String DEFAULT_FAMILY = "unix";
79
80
81
82
83
84
85
86
87 public static final String SYSTEM_PROP_NO_NATIVE = "aether.named.ipc.nonative";
88
89 public static final boolean DEFAULT_NO_NATIVE = true;
90
91
92
93
94
95
96
97
98 public static final String SYSTEM_PROP_NATIVE_NAME = "aether.named.ipc.nativeName";
99
100 public static final String DEFAULT_NATIVE_NAME = "ipc-sync";
101
102
103
104
105
106
107
108
109 public static final String SYSTEM_PROP_DEBUG = "aether.named.ipc.debug";
110
111 public static final boolean DEFAULT_DEBUG = false;
112
113 private final ServerSocketChannel serverSocket;
114 private final Map<SocketChannel, Thread> clients = new HashMap<>();
115 private final AtomicInteger counter = new AtomicInteger();
116 private final Map<String, Lock> locks = new ConcurrentHashMap<>();
117 private final Map<String, Context> contexts = new ConcurrentHashMap<>();
118 private static final boolean DEBUG =
119 Boolean.parseBoolean(System.getProperty(SYSTEM_PROP_DEBUG, Boolean.toString(DEFAULT_DEBUG)));
120 private final long idleTimeout;
121 private volatile long lastUsed;
122 private volatile boolean closing;
123
124 public IpcServer(SocketFamily family) throws IOException {
125 serverSocket = family.openServerSocket();
126 long timeout = TimeUnit.SECONDS.toNanos(DEFAULT_IDLE_TIMEOUT);
127 String str = System.getProperty(SYSTEM_PROP_IDLE_TIMEOUT);
128 if (str != null) {
129 try {
130 TimeUnit unit = TimeUnit.SECONDS;
131 if (str.endsWith("ms")) {
132 unit = TimeUnit.MILLISECONDS;
133 str = str.substring(0, str.length() - 2);
134 }
135 long dur = Long.parseLong(str);
136 timeout = unit.toNanos(dur);
137 } catch (NumberFormatException e) {
138 error("Property " + SYSTEM_PROP_IDLE_TIMEOUT + " specified with invalid value: " + str, e);
139 }
140 }
141 idleTimeout = timeout;
142 }
143
144 public static void main(String[] args) throws Exception {
145
146
147
148
149
150
151
152
153 try {
154 sun.misc.Signal.handle(new sun.misc.Signal("INT"), sun.misc.SignalHandler.SIG_IGN);
155 if (IpcClient.IS_WINDOWS) {
156 sun.misc.Signal.handle(new sun.misc.Signal("TSTP"), sun.misc.SignalHandler.SIG_IGN);
157 }
158 } catch (Throwable t) {
159 error("Unable to ignore INT and TSTP signals", t);
160 }
161
162 String family = args[0];
163 String tmpAddress = args[1];
164 String rand = args[2];
165
166 runServer(SocketFamily.valueOf(family), tmpAddress, rand);
167 }
168
169 static IpcServer runServer(SocketFamily family, String tmpAddress, String rand) throws IOException {
170 IpcServer server = new IpcServer(family);
171 run(server::run, false);
172 String address = SocketFamily.toString(server.getLocalAddress());
173 SocketAddress socketAddress = SocketFamily.fromString(tmpAddress);
174 try (SocketChannel socket = SocketChannel.open(socketAddress)) {
175 try (DataOutputStream dos = new DataOutputStream(Channels.newOutputStream(socket))) {
176 dos.writeUTF(rand);
177 dos.writeUTF(address);
178 dos.flush();
179 }
180 }
181
182 return server;
183 }
184
185 private static void debug(String msg, Object... args) {
186 if (DEBUG) {
187 System.out.printf("[ipc] [debug] " + msg + "\n", args);
188 }
189 }
190
191 private static void info(String msg, Object... args) {
192 System.out.printf("[ipc] [info] " + msg + "\n", args);
193 }
194
195 private static void error(String msg, Throwable t) {
196 System.out.println("[ipc] [error] " + msg);
197 t.printStackTrace(System.out);
198 }
199
200 private static void run(Runnable runnable, boolean daemon) {
201 Thread thread = new Thread(runnable);
202 if (daemon) {
203 thread.setDaemon(true);
204 }
205 thread.start();
206 }
207
208 public SocketAddress getLocalAddress() throws IOException {
209 return serverSocket.getLocalAddress();
210 }
211
212 public void run() {
213 try {
214 info("IpcServer started at %s", getLocalAddress().toString());
215 use();
216 run(this::expirationCheck, true);
217 while (!closing) {
218 SocketChannel socket = this.serverSocket.accept();
219 run(() -> client(socket), false);
220 }
221 } catch (Throwable t) {
222 if (!closing) {
223 error("Error running sync server loop", t);
224 }
225 }
226 }
227
228 private void client(SocketChannel socket) {
229 int c;
230 synchronized (clients) {
231 clients.put(socket, Thread.currentThread());
232 c = clients.size();
233 }
234 info("New client connected (%d connected)", c);
235 use();
236 Map<String, Context> clientContexts = new ConcurrentHashMap<>();
237 try {
238 ByteChannel wrapper = new ByteChannelWrapper(socket);
239 DataInputStream input = new DataInputStream(Channels.newInputStream(wrapper));
240 DataOutputStream output = new DataOutputStream(Channels.newOutputStream(wrapper));
241 while (!closing) {
242 int requestId = input.readInt();
243 int sz = input.readInt();
244 List<String> request = new ArrayList<>(sz);
245 for (int i = 0; i < sz; i++) {
246 request.add(input.readUTF());
247 }
248 if (request.isEmpty()) {
249 throw new IOException("Received invalid request");
250 }
251 use();
252 String contextId;
253 Context context;
254 String command = request.remove(0);
255 switch (command) {
256 case IpcMessages.REQUEST_CONTEXT:
257 if (request.size() != 1) {
258 throw new IOException("Expected one argument for " + command + " but got " + request);
259 }
260 boolean shared = Boolean.parseBoolean(request.remove(0));
261 context = new Context(shared);
262 contexts.put(context.id, context);
263 clientContexts.put(context.id, context);
264 synchronized (output) {
265 debug("Created context %s", context.id);
266 output.writeInt(requestId);
267 output.writeInt(2);
268 output.writeUTF(IpcMessages.RESPONSE_CONTEXT);
269 output.writeUTF(context.id);
270 output.flush();
271 }
272 break;
273 case IpcMessages.REQUEST_ACQUIRE:
274 if (request.size() < 1) {
275 throw new IOException(
276 "Expected at least one argument for " + command + " but got " + request);
277 }
278 contextId = request.remove(0);
279 context = contexts.get(contextId);
280 if (context == null) {
281 throw new IOException(
282 "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
283 }
284 context.lock(request).thenRun(() -> {
285 try {
286 synchronized (output) {
287 debug("Locking in context %s", context.id);
288 output.writeInt(requestId);
289 output.writeInt(1);
290 output.writeUTF(IpcMessages.RESPONSE_ACQUIRE);
291 output.flush();
292 }
293 } catch (IOException e) {
294 try {
295 socket.close();
296 } catch (IOException ioException) {
297 e.addSuppressed(ioException);
298 }
299 error("Error writing lock response", e);
300 }
301 });
302 break;
303 case IpcMessages.REQUEST_CLOSE:
304 if (request.size() != 1) {
305 throw new IOException("Expected one argument for " + command + " but got " + request);
306 }
307 contextId = request.remove(0);
308 context = contexts.remove(contextId);
309 clientContexts.remove(contextId);
310 if (context == null) {
311 throw new IOException(
312 "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
313 }
314 context.unlock();
315 synchronized (output) {
316 debug("Closing context %s", context.id);
317 output.writeInt(requestId);
318 output.writeInt(1);
319 output.writeUTF(IpcMessages.RESPONSE_CLOSE);
320 output.flush();
321 }
322 break;
323 case IpcMessages.REQUEST_STOP:
324 if (request.size() != 0) {
325 throw new IOException("Expected zero argument for " + command + " but got " + request);
326 }
327 synchronized (output) {
328 debug("Stopping server");
329 output.writeInt(requestId);
330 output.writeInt(1);
331 output.writeUTF(IpcMessages.RESPONSE_STOP);
332 output.flush();
333 }
334 close();
335 break;
336 default:
337 throw new IOException("Unknown request: " + request.get(0));
338 }
339 }
340 } catch (Throwable t) {
341 if (!closing) {
342 error("Error processing request", t);
343 }
344 } finally {
345 if (!closing) {
346 info("Client disconnecting...");
347 }
348 clientContexts.values().forEach(context -> {
349 contexts.remove(context.id);
350 context.unlock();
351 });
352 try {
353 socket.close();
354 } catch (IOException ioException) {
355
356 }
357 synchronized (clients) {
358 clients.remove(socket);
359 c = clients.size();
360 }
361 if (!closing) {
362 info("%d clients left", c);
363 }
364 }
365 }
366
367 private void use() {
368 lastUsed = System.nanoTime();
369 }
370
371 private void expirationCheck() {
372 while (true) {
373 long current = System.nanoTime();
374 long left = (lastUsed + idleTimeout) - current;
375 if (left < 0) {
376 info("IpcServer expired, closing");
377 close();
378 break;
379 } else {
380 try {
381 Thread.sleep(TimeUnit.NANOSECONDS.toMillis(left));
382 } catch (InterruptedException e) {
383 info("IpcServer expiration check interrupted, closing");
384 close();
385 break;
386 }
387 }
388 }
389 }
390
391 void close() {
392 closing = true;
393 try {
394 serverSocket.close();
395 } catch (IOException e) {
396 error("Error closing server socket", e);
397 }
398 clients.forEach((s, t) -> {
399 try {
400 s.close();
401 } catch (IOException e) {
402
403 }
404 t.interrupt();
405 });
406 }
407
408 static class Waiter {
409 final Context context;
410 final CompletableFuture<Void> future;
411
412 Waiter(Context context, CompletableFuture<Void> future) {
413 this.context = context;
414 this.future = future;
415 }
416 }
417
418 static class Lock {
419
420 final String key;
421
422 List<Context> holders;
423 List<Waiter> waiters;
424
425 Lock(String key) {
426 this.key = key;
427 }
428
429 public synchronized CompletableFuture<Void> lock(Context context) {
430 if (holders == null) {
431 holders = new ArrayList<>();
432 }
433 if (holders.isEmpty() || holders.get(0).shared && context.shared) {
434 holders.add(context);
435 return CompletableFuture.completedFuture(null);
436 }
437 if (waiters == null) {
438 waiters = new ArrayList<>();
439 }
440
441 CompletableFuture<Void> future = new CompletableFuture<>();
442 waiters.add(new Waiter(context, future));
443 return future;
444 }
445
446 public synchronized void unlock(Context context) {
447 if (holders.remove(context)) {
448 while (waiters != null
449 && !waiters.isEmpty()
450 && (holders.isEmpty() || holders.get(0).shared && waiters.get(0).context.shared)) {
451 Waiter waiter = waiters.remove(0);
452 holders.add(waiter.context);
453 waiter.future.complete(null);
454 }
455 } else if (waiters != null) {
456 for (Iterator<Waiter> it = waiters.iterator(); it.hasNext(); ) {
457 Waiter waiter = it.next();
458 if (waiter.context == context) {
459 it.remove();
460 waiter.future.cancel(false);
461 }
462 }
463 }
464 }
465 }
466
467 class Context {
468
469 final String id;
470 final boolean shared;
471 final List<String> locks = new CopyOnWriteArrayList<>();
472
473 Context(boolean shared) {
474 this.id = String.format("%08x", counter.incrementAndGet());
475 this.shared = shared;
476 }
477
478 public CompletableFuture<?> lock(List<String> keys) {
479 locks.addAll(keys);
480 CompletableFuture<?>[] futures = keys.stream()
481 .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
482 .map(l -> l.lock(this))
483 .toArray(CompletableFuture[]::new);
484 return CompletableFuture.allOf(futures);
485 }
486
487 public void unlock() {
488 locks.stream()
489 .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
490 .forEach(l -> l.unlock(this));
491 }
492 }
493 }