001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.named.ipc;
020
021import java.io.Closeable;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.EOFException;
025import java.io.File;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.io.PrintWriter;
030import java.io.RandomAccessFile;
031import java.net.SocketAddress;
032import java.nio.channels.ByteChannel;
033import java.nio.channels.Channels;
034import java.nio.channels.FileLock;
035import java.nio.channels.ServerSocketChannel;
036import java.nio.channels.SocketChannel;
037import java.nio.file.Files;
038import java.nio.file.Path;
039import java.nio.file.Paths;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.List;
044import java.util.Locale;
045import java.util.Map;
046import java.util.Objects;
047import java.util.Random;
048import java.util.concurrent.CompletableFuture;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ExecutionException;
051import java.util.concurrent.ExecutorService;
052import java.util.concurrent.Executors;
053import java.util.concurrent.Future;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.TimeoutException;
056import java.util.concurrent.atomic.AtomicInteger;
057
058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
066
067/**
068 * Client side implementation.
069 * The client instance is bound to a given maven repository.
070 *
071 * @since 2.0.1
072 */
073public class IpcClient {
074
075    static final boolean IS_WINDOWS =
076            System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
077
078    protected volatile boolean initialized;
079    protected final Path lockPath;
080    protected final Path logPath;
081    protected final Path syncPath;
082    protected final boolean noFork;
083
084    protected volatile SocketChannel socket;
085    protected volatile DataOutputStream output;
086    protected volatile DataInputStream input;
087    protected volatile Thread receiver;
088
089    protected final AtomicInteger requestId = new AtomicInteger();
090    protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
091
092    IpcClient(Path lockPath, Path logPath, Path syncPath) {
093        this.lockPath = lockPath;
094        this.logPath = logPath;
095        this.syncPath = syncPath;
096        this.noFork = Boolean.parseBoolean(
097                System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
098    }
099
100    void ensureInitialized() throws IOException {
101        if (!initialized) {
102            // caller must block on this method
103            synchronized (this) {
104                if (!initialized) {
105                    socket = createClient();
106                    ByteChannel wrapper = new ByteChannelWrapper(socket);
107                    input = new DataInputStream(Channels.newInputStream(wrapper));
108                    output = new DataOutputStream(Channels.newOutputStream(wrapper));
109                    receiver = new Thread(this::receive);
110                    receiver.setDaemon(true);
111                    receiver.start();
112                    initialized = true;
113                }
114            }
115        }
116    }
117
118    SocketChannel createClient() throws IOException {
119        SocketFamily family =
120                SocketFamily.valueOf(System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY));
121
122        Path lockPath = this.lockPath.toAbsolutePath().normalize();
123        Path lockFile =
124                lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125        if (!Files.isRegularFile(lockFile)) {
126            if (!Files.isDirectory(lockFile.getParent())) {
127                Files.createDirectories(lockFile.getParent());
128            }
129        }
130
131        try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132            try (FileLock lock = raf.getChannel().lock()) {
133                String line = raf.readLine();
134                if (line != null) {
135                    try {
136                        SocketAddress address = SocketFamily.fromString(line);
137                        return SocketChannel.open(address);
138                    } catch (IOException e) {
139                        // ignore
140                    }
141                }
142
143                ServerSocketChannel ss = family.openServerSocket();
144                String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145                String rand = Long.toHexString(new Random().nextLong());
146                String nativeName =
147                        System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148                String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149
150                boolean debug = Boolean.parseBoolean(
151                        System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152                boolean noNative = Boolean.parseBoolean(System.getProperty(
153                        IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154                if (!noNative) {
155                    noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156                }
157                Closeable close;
158                Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159                List<String> args = new ArrayList<>();
160                if (noNative) {
161                    if (noFork) {
162                        IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163                        close = server::close;
164                    } else {
165                        String javaHome = System.getenv("JAVA_HOME");
166                        if (javaHome == null) {
167                            javaHome = System.getProperty("java.home");
168                        }
169                        String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170                        String java = Paths.get(javaHome)
171                                .resolve(javaCmd)
172                                .toAbsolutePath()
173                                .toString();
174                        args.add(java);
175                        String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176                        args.add("-cp");
177                        args.add(classpath);
178                        String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179                        if (timeout != null) {
180                            args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181                        }
182                        args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183                        args.add(IpcServer.class.getName());
184                        args.add(family.name());
185                        args.add(tmpaddr);
186                        args.add(rand);
187                        ProcessBuilder processBuilder = new ProcessBuilder();
188                        ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189                        Files.createDirectories(logPath);
190                        Process process = processBuilder
191                                .directory(lockFile.getParent().toFile())
192                                .command(args)
193                                .redirectOutput(discard)
194                                .redirectError(discard)
195                                .start();
196                        close = process::destroyForcibly;
197                    }
198                } else {
199                    args.add(syncPath.resolve(syncCmd).toString());
200                    String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201                    if (timeout != null) {
202                        args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203                    }
204                    args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205                    args.add(family.name());
206                    args.add(tmpaddr);
207                    args.add(rand);
208                    ProcessBuilder processBuilder = new ProcessBuilder();
209                    ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210                    Files.createDirectories(logPath);
211                    Process process = processBuilder
212                            .directory(lockFile.getParent().toFile())
213                            .command(args)
214                            .redirectOutput(discard)
215                            .redirectError(discard)
216                            .start();
217                    close = process::destroyForcibly;
218                }
219
220                ExecutorService es = Executors.newSingleThreadExecutor();
221                Future<String[]> future = es.submit(() -> {
222                    SocketChannel s = ss.accept();
223                    DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224                    String rand2 = dis.readUTF();
225                    String addr2 = dis.readUTF();
226                    return new String[] {rand2, addr2};
227                });
228                String[] res;
229                try {
230                    res = future.get(5, TimeUnit.SECONDS);
231                } catch (Exception e) {
232                    try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233                        writer.println("Arguments:");
234                        args.forEach(writer::println);
235                        writer.println();
236                        writer.println("Exception:");
237                        e.printStackTrace(writer);
238                    }
239                    close.close();
240                    throw e;
241                } finally {
242                    es.shutdownNow();
243                    ss.close();
244                }
245                if (!Objects.equals(rand, res[0])) {
246                    close.close();
247                    throw new IllegalStateException("IpcServer did not respond with the correct random");
248                }
249
250                SocketAddress addr = SocketFamily.fromString(res[1]);
251                SocketChannel socket = SocketChannel.open(addr);
252
253                raf.seek(0);
254                raf.writeBytes(res[1] + "\n");
255                return socket;
256            } catch (Exception e) {
257                throw new RuntimeException("Unable to create and connect to lock server", e);
258            }
259        }
260    }
261
262    private String getJarPath(Class<?> clazz) {
263        String classpath;
264        String className = clazz.getName().replace('.', '/') + ".class";
265        String url = clazz.getClassLoader().getResource(className).toString();
266        if (url.startsWith("jar:")) {
267            url = url.substring("jar:".length(), url.indexOf("!/"));
268            if (url.startsWith("file:")) {
269                classpath = url.substring("file:".length());
270            } else {
271                throw new IllegalStateException();
272            }
273        } else if (url.startsWith("file:")) {
274            classpath = url.substring("file:".length(), url.indexOf(className));
275        } else {
276            throw new IllegalStateException();
277        }
278        if (IS_WINDOWS) {
279            if (classpath.startsWith("/")) {
280                classpath = classpath.substring(1);
281            }
282            classpath = classpath.replace('/', '\\');
283        }
284
285        return classpath;
286    }
287
288    void receive() {
289        try {
290            while (true) {
291                int id = input.readInt();
292                int sz = input.readInt();
293                List<String> s = new ArrayList<>(sz);
294                for (int i = 0; i < sz; i++) {
295                    s.add(input.readUTF());
296                }
297                CompletableFuture<List<String>> f = responses.remove(id);
298                if (f == null) {
299                    continue;
300                }
301                if (s.isEmpty()) {
302                    f.completeExceptionally(new IOException("Protocol error: empty response"));
303                    continue;
304                }
305                f.complete(s);
306            }
307        } catch (EOFException e) {
308            close(new IOException("Server disconnected", e));
309        } catch (Exception e) {
310            close(e);
311        }
312    }
313
314    List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
315        ensureInitialized();
316        DataOutputStream out = output;
317        if (out == null) {
318            throw new IOException("Connection closed");
319        }
320        int id = requestId.incrementAndGet();
321        CompletableFuture<List<String>> response = new CompletableFuture<>();
322        responses.put(id, response);
323        synchronized (out) {
324            out.writeInt(id);
325            out.writeInt(request.size());
326            for (String s : request) {
327                out.writeUTF(s);
328            }
329            out.flush();
330        }
331        try {
332            return response.get(time, unit);
333        } catch (InterruptedException e) {
334            responses.remove(id);
335            throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
336        } catch (ExecutionException e) {
337            throw new IOException("Execution error", e);
338        } catch (TimeoutException e) {
339            responses.remove(id);
340            throw e;
341        }
342    }
343
344    void close() {
345        if (noFork) {
346            stopServer();
347        }
348        close(new IOException("Closing"));
349    }
350
351    synchronized void close(Throwable e) {
352        initialized = false;
353        if (socket != null) {
354            try {
355                socket.close();
356            } catch (IOException t) {
357                e.addSuppressed(t);
358            }
359            socket = null;
360            input = null;
361            output = null;
362        }
363        if (receiver != null && Thread.currentThread() != receiver) {
364            receiver.interrupt();
365            try {
366                receiver.join(1000);
367            } catch (InterruptedException t) {
368                e.addSuppressed(t);
369            }
370        }
371        responses.values().forEach(f -> f.completeExceptionally(e));
372        responses.clear();
373    }
374
375    String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
376        RuntimeException error = new RuntimeException("Unable to create new sync context");
377        for (int i = 0; i < 2; i++) {
378            try {
379                List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
380                if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
381                    throw new IOException("Unexpected response: " + response);
382                }
383                return response.get(1);
384            } catch (TimeoutException e) {
385                throw e;
386            } catch (Exception e) {
387                close(e);
388                error.addSuppressed(e);
389            }
390        }
391        throw error;
392    }
393
394    void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
395        try {
396            List<String> req = new ArrayList<>(keys.size() + 2);
397            req.add(REQUEST_ACQUIRE);
398            req.add(contextId);
399            req.addAll(keys);
400            List<String> response = send(req, time, unit);
401            if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
402                throw new IOException("Unexpected response: " + response);
403            }
404        } catch (TimeoutException e) {
405            throw e;
406        } catch (Exception e) {
407            close(e);
408            throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
409        }
410    }
411
412    void unlock(String contextId) {
413        try {
414            List<String> response = send(Arrays.asList(REQUEST_CLOSE, contextId), 10, TimeUnit.SECONDS);
415            if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
416                throw new IOException("Unexpected response: " + response);
417            }
418        } catch (Exception e) {
419            close(e);
420            throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
421        }
422    }
423
424    /**
425     * To be used in tests to stop server immediately. Should not be used outside of tests.
426     */
427    void stopServer() {
428        try {
429            List<String> response = send(List.of(REQUEST_STOP), 30, TimeUnit.SECONDS);
430            if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
431                throw new IOException("Unexpected response: " + response);
432            }
433        } catch (Exception e) {
434            close(e);
435            throw new RuntimeException("Unable to stop server", e);
436        }
437    }
438
439    @Override
440    public String toString() {
441        return "IpcClient{"
442                + "lockPath=" + lockPath + ","
443                + "syncServerPath=" + syncPath + ","
444                + "address='" + getAddress() + "'}";
445    }
446
447    private String getAddress() {
448        try {
449            return SocketFamily.toString(socket.getLocalAddress());
450        } catch (IOException e) {
451            return "[not bound]";
452        }
453    }
454}