001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.named.ipc;
020
021import java.io.Closeable;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.EOFException;
025import java.io.File;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.io.PrintWriter;
030import java.io.RandomAccessFile;
031import java.net.SocketAddress;
032import java.nio.channels.ByteChannel;
033import java.nio.channels.Channels;
034import java.nio.channels.FileLock;
035import java.nio.channels.ServerSocketChannel;
036import java.nio.channels.SocketChannel;
037import java.nio.file.Files;
038import java.nio.file.Path;
039import java.nio.file.Paths;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.List;
044import java.util.Locale;
045import java.util.Map;
046import java.util.Objects;
047import java.util.Random;
048import java.util.concurrent.CompletableFuture;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ExecutionException;
051import java.util.concurrent.ExecutorService;
052import java.util.concurrent.Executors;
053import java.util.concurrent.Future;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.TimeoutException;
056import java.util.concurrent.atomic.AtomicInteger;
057
058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
066
067/**
068 * Client side implementation.
069 * The client instance is bound to a given maven repository.
070 *
071 * @since 2.0.1
072 */
073public class IpcClient {
074
075    static final boolean IS_WINDOWS =
076            System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
077
078    volatile boolean initialized;
079    Path lockPath;
080    Path logPath;
081    Path syncPath;
082    SocketChannel socket;
083    DataOutputStream output;
084    DataInputStream input;
085    Thread receiver;
086    AtomicInteger requestId = new AtomicInteger();
087    Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
088
089    IpcClient(Path lockPath, Path logPath, Path syncPath) {
090        this.lockPath = lockPath;
091        this.logPath = logPath;
092        this.syncPath = syncPath;
093    }
094
095    void ensureInitialized() throws IOException {
096        if (!initialized) {
097            synchronized (this) {
098                if (!initialized) {
099                    socket = createClient();
100                    ByteChannel wrapper = new ByteChannelWrapper(socket);
101                    input = new DataInputStream(Channels.newInputStream(wrapper));
102                    output = new DataOutputStream(Channels.newOutputStream(wrapper));
103                    receiver = new Thread(this::receive);
104                    receiver.setDaemon(true);
105                    receiver.start();
106                    initialized = true;
107                }
108            }
109        }
110    }
111
112    SocketChannel createClient() throws IOException {
113        String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
114        SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
115
116        Path lockPath = this.lockPath.toAbsolutePath().normalize();
117        Path lockFile =
118                lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
119        if (!Files.isRegularFile(lockFile)) {
120            if (!Files.isDirectory(lockFile.getParent())) {
121                Files.createDirectories(lockFile.getParent());
122            }
123        }
124
125        try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
126            try (FileLock lock = raf.getChannel().lock()) {
127                String line = raf.readLine();
128                if (line != null) {
129                    try {
130                        SocketAddress address = SocketFamily.fromString(line);
131                        return SocketChannel.open(address);
132                    } catch (IOException e) {
133                        // ignore
134                    }
135                }
136
137                ServerSocketChannel ss = family.openServerSocket();
138                String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
139                String rand = Long.toHexString(new Random().nextLong());
140                String nativeName =
141                        System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
142                String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
143
144                boolean debug = Boolean.parseBoolean(
145                        System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
146                boolean noNative = Boolean.parseBoolean(System.getProperty(
147                        IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
148                if (!noNative) {
149                    noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
150                }
151                Closeable close;
152                Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
153                List<String> args = new ArrayList<>();
154                if (noNative) {
155                    boolean noFork = Boolean.parseBoolean(System.getProperty(
156                            IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
157                    if (noFork) {
158                        IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
159                        close = server::close;
160                    } else {
161                        String javaHome = System.getenv("JAVA_HOME");
162                        if (javaHome == null) {
163                            javaHome = System.getProperty("java.home");
164                        }
165                        String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
166                        String java = Paths.get(javaHome)
167                                .resolve(javaCmd)
168                                .toAbsolutePath()
169                                .toString();
170                        args.add(java);
171                        String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
172                        args.add("-cp");
173                        args.add(classpath);
174                        String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
175                        if (timeout != null) {
176                            args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
177                        }
178                        args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
179                        args.add(IpcServer.class.getName());
180                        args.add(family.name());
181                        args.add(tmpaddr);
182                        args.add(rand);
183                        ProcessBuilder processBuilder = new ProcessBuilder();
184                        ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
185                        Files.createDirectories(logPath);
186                        Process process = processBuilder
187                                .directory(lockFile.getParent().toFile())
188                                .command(args)
189                                .redirectOutput(discard)
190                                .redirectError(discard)
191                                .start();
192                        close = process::destroyForcibly;
193                    }
194                } else {
195                    args.add(syncPath.resolve(syncCmd).toString());
196                    String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
197                    if (timeout != null) {
198                        args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
199                    }
200                    args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
201                    args.add(family.name());
202                    args.add(tmpaddr);
203                    args.add(rand);
204                    ProcessBuilder processBuilder = new ProcessBuilder();
205                    ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
206                    Files.createDirectories(logPath);
207                    Process process = processBuilder
208                            .directory(lockFile.getParent().toFile())
209                            .command(args)
210                            .redirectOutput(discard)
211                            .redirectError(discard)
212                            .start();
213                    close = process::destroyForcibly;
214                }
215
216                ExecutorService es = Executors.newSingleThreadExecutor();
217                Future<String[]> future = es.submit(() -> {
218                    SocketChannel s = ss.accept();
219                    DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
220                    String rand2 = dis.readUTF();
221                    String addr2 = dis.readUTF();
222                    return new String[] {rand2, addr2};
223                });
224                String[] res;
225                try {
226                    res = future.get(5, TimeUnit.SECONDS);
227                } catch (Exception e) {
228                    try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
229                        writer.println("Arguments:");
230                        args.forEach(writer::println);
231                        writer.println();
232                        writer.println("Exception:");
233                        e.printStackTrace(writer);
234                    }
235                    close.close();
236                    throw e;
237                } finally {
238                    es.shutdownNow();
239                    ss.close();
240                }
241                if (!Objects.equals(rand, res[0])) {
242                    close.close();
243                    throw new IllegalStateException("IpcServer did not respond with the correct random");
244                }
245
246                SocketAddress addr = SocketFamily.fromString(res[1]);
247                SocketChannel socket = SocketChannel.open(addr);
248
249                raf.seek(0);
250                raf.writeBytes(res[1] + "\n");
251                return socket;
252            } catch (Exception e) {
253                throw new RuntimeException("Unable to create and connect to lock server", e);
254            }
255        }
256    }
257
258    private String getJarPath(Class<?> clazz) {
259        String classpath;
260        String className = clazz.getName().replace('.', '/') + ".class";
261        String url = clazz.getClassLoader().getResource(className).toString();
262        if (url.startsWith("jar:")) {
263            url = url.substring("jar:".length(), url.indexOf("!/"));
264            if (url.startsWith("file:")) {
265                classpath = url.substring("file:".length());
266            } else {
267                throw new IllegalStateException();
268            }
269        } else if (url.startsWith("file:")) {
270            classpath = url.substring("file:".length(), url.indexOf(className));
271        } else {
272            throw new IllegalStateException();
273        }
274        if (IS_WINDOWS) {
275            if (classpath.startsWith("/")) {
276                classpath = classpath.substring(1);
277            }
278            classpath = classpath.replace('/', '\\');
279        }
280
281        return classpath;
282    }
283
284    void receive() {
285        try {
286            while (true) {
287                int id = input.readInt();
288                int sz = input.readInt();
289                List<String> s = new ArrayList<>(sz);
290                for (int i = 0; i < sz; i++) {
291                    s.add(input.readUTF());
292                }
293                CompletableFuture<List<String>> f = responses.remove(id);
294                if (f == null || s.isEmpty()) {
295                    throw new IllegalStateException("Protocol error");
296                }
297                f.complete(s);
298            }
299        } catch (EOFException e) {
300            // server is stopped; just quit
301        } catch (Exception e) {
302            close(e);
303        }
304    }
305
306    List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
307        ensureInitialized();
308        int id = requestId.incrementAndGet();
309        CompletableFuture<List<String>> response = new CompletableFuture<>();
310        responses.put(id, response);
311        synchronized (output) {
312            output.writeInt(id);
313            output.writeInt(request.size());
314            for (String s : request) {
315                output.writeUTF(s);
316            }
317            output.flush();
318        }
319        try {
320            return response.get(time, unit);
321        } catch (InterruptedException e) {
322            throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
323        } catch (ExecutionException e) {
324            throw new IOException("Execution error", e);
325        }
326    }
327
328    void close() {
329        close(new IOException("Closing"));
330    }
331
332    synchronized void close(Throwable e) {
333        if (socket != null) {
334            try {
335                socket.close();
336            } catch (IOException t) {
337                e.addSuppressed(t);
338            }
339            socket = null;
340            input = null;
341            output = null;
342        }
343        if (receiver != null) {
344            receiver.interrupt();
345            try {
346                receiver.join(1000);
347            } catch (InterruptedException t) {
348                e.addSuppressed(t);
349            }
350        }
351        responses.values().forEach(f -> f.completeExceptionally(e));
352        responses.clear();
353    }
354
355    String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
356        RuntimeException error = new RuntimeException("Unable to create new sync context");
357        for (int i = 0; i < 2; i++) {
358            try {
359                List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
360                if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
361                    throw new IOException("Unexpected response: " + response);
362                }
363                return response.get(1);
364            } catch (TimeoutException e) {
365                throw e;
366            } catch (Exception e) {
367                close(e);
368                error.addSuppressed(e);
369            }
370        }
371        throw error;
372    }
373
374    void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
375        try {
376            List<String> req = new ArrayList<>(keys.size() + 2);
377            req.add(REQUEST_ACQUIRE);
378            req.add(contextId);
379            req.addAll(keys);
380            List<String> response = send(req, time, unit);
381            if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
382                throw new IOException("Unexpected response: " + response);
383            }
384        } catch (TimeoutException e) {
385            throw e;
386        } catch (Exception e) {
387            close(e);
388            throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
389        }
390    }
391
392    void unlock(String contextId) {
393        try {
394            List<String> response =
395                    send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
396            if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
397                throw new IOException("Unexpected response: " + response);
398            }
399        } catch (Exception e) {
400            close(e);
401            throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
402        }
403    }
404
405    /**
406     * To be used in tests to stop server immediately. Should not be used outside of tests.
407     */
408    void stopServer() {
409        try {
410            List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
411            if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
412                throw new IOException("Unexpected response: " + response);
413            }
414        } catch (Exception e) {
415            close(e);
416            throw new RuntimeException("Unable to stop server", e);
417        }
418    }
419
420    @Override
421    public String toString() {
422        return "IpcClient{"
423                + "lockPath=" + lockPath + ","
424                + "syncServerPath=" + syncPath + ","
425                + "address='" + getAddress() + "'}";
426    }
427
428    private String getAddress() {
429        try {
430            return SocketFamily.toString(socket.getLocalAddress());
431        } catch (IOException e) {
432            return "[not bound]";
433        }
434    }
435}