001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.named.ipc;
020
021import java.io.Closeable;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.EOFException;
025import java.io.File;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.io.PrintWriter;
030import java.io.RandomAccessFile;
031import java.net.SocketAddress;
032import java.nio.channels.ByteChannel;
033import java.nio.channels.Channels;
034import java.nio.channels.FileLock;
035import java.nio.channels.ServerSocketChannel;
036import java.nio.channels.SocketChannel;
037import java.nio.file.Files;
038import java.nio.file.Path;
039import java.nio.file.Paths;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.List;
044import java.util.Locale;
045import java.util.Map;
046import java.util.Objects;
047import java.util.Random;
048import java.util.concurrent.CompletableFuture;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ExecutionException;
051import java.util.concurrent.ExecutorService;
052import java.util.concurrent.Executors;
053import java.util.concurrent.Future;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.TimeoutException;
056import java.util.concurrent.atomic.AtomicInteger;
057
058import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_ACQUIRE;
059import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CLOSE;
060import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_CONTEXT;
061import static org.eclipse.aether.named.ipc.IpcMessages.REQUEST_STOP;
062import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_ACQUIRE;
063import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CLOSE;
064import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_CONTEXT;
065import static org.eclipse.aether.named.ipc.IpcMessages.RESPONSE_STOP;
066
067/**
068 * Client side implementation.
069 * The client instance is bound to a given maven repository.
070 *
071 * @since 2.0.1
072 */
073public class IpcClient {
074
075    static final boolean IS_WINDOWS =
076            System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
077
078    protected volatile boolean initialized;
079    protected final Path lockPath;
080    protected final Path logPath;
081    protected final Path syncPath;
082    protected final boolean noFork;
083
084    protected SocketChannel socket;
085    protected DataOutputStream output;
086    protected DataInputStream input;
087    protected Thread receiver;
088
089    protected final AtomicInteger requestId = new AtomicInteger();
090    protected final Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
091
092    IpcClient(Path lockPath, Path logPath, Path syncPath) {
093        this.lockPath = lockPath;
094        this.logPath = logPath;
095        this.syncPath = syncPath;
096        this.noFork = Boolean.parseBoolean(
097                System.getProperty(IpcServer.SYSTEM_PROP_NO_FORK, Boolean.toString(IpcServer.DEFAULT_NO_FORK)));
098    }
099
100    void ensureInitialized() throws IOException {
101        if (!initialized) {
102            // caller must block on this method
103            synchronized (this) {
104                if (!initialized) {
105                    socket = createClient();
106                    ByteChannel wrapper = new ByteChannelWrapper(socket);
107                    input = new DataInputStream(Channels.newInputStream(wrapper));
108                    output = new DataOutputStream(Channels.newOutputStream(wrapper));
109                    receiver = new Thread(this::receive);
110                    receiver.setDaemon(true);
111                    receiver.start();
112                    initialized = true;
113                }
114            }
115        }
116    }
117
118    SocketChannel createClient() throws IOException {
119        String familyProp = System.getProperty(IpcServer.SYSTEM_PROP_FAMILY, IpcServer.DEFAULT_FAMILY);
120        SocketFamily family = familyProp != null ? SocketFamily.valueOf(familyProp) : SocketFamily.inet;
121
122        Path lockPath = this.lockPath.toAbsolutePath().normalize();
123        Path lockFile =
124                lockPath.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase(Locale.ENGLISH));
125        if (!Files.isRegularFile(lockFile)) {
126            if (!Files.isDirectory(lockFile.getParent())) {
127                Files.createDirectories(lockFile.getParent());
128            }
129        }
130
131        try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
132            try (FileLock lock = raf.getChannel().lock()) {
133                String line = raf.readLine();
134                if (line != null) {
135                    try {
136                        SocketAddress address = SocketFamily.fromString(line);
137                        return SocketChannel.open(address);
138                    } catch (IOException e) {
139                        // ignore
140                    }
141                }
142
143                ServerSocketChannel ss = family.openServerSocket();
144                String tmpaddr = SocketFamily.toString(ss.getLocalAddress());
145                String rand = Long.toHexString(new Random().nextLong());
146                String nativeName =
147                        System.getProperty(IpcServer.SYSTEM_PROP_NATIVE_NAME, IpcServer.DEFAULT_NATIVE_NAME);
148                String syncCmd = IS_WINDOWS ? nativeName + ".exe" : nativeName;
149
150                boolean debug = Boolean.parseBoolean(
151                        System.getProperty(IpcServer.SYSTEM_PROP_DEBUG, Boolean.toString(IpcServer.DEFAULT_DEBUG)));
152                boolean noNative = Boolean.parseBoolean(System.getProperty(
153                        IpcServer.SYSTEM_PROP_NO_NATIVE, Boolean.toString(IpcServer.DEFAULT_NO_NATIVE)));
154                if (!noNative) {
155                    noNative = !Files.isExecutable(syncPath.resolve(syncCmd));
156                }
157                Closeable close;
158                Path logFile = logPath.resolve("resolver-ipcsync-" + rand + ".log");
159                List<String> args = new ArrayList<>();
160                if (noNative) {
161                    if (noFork) {
162                        IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
163                        close = server::close;
164                    } else {
165                        String javaHome = System.getenv("JAVA_HOME");
166                        if (javaHome == null) {
167                            javaHome = System.getProperty("java.home");
168                        }
169                        String javaCmd = IS_WINDOWS ? "bin\\java.exe" : "bin/java";
170                        String java = Paths.get(javaHome)
171                                .resolve(javaCmd)
172                                .toAbsolutePath()
173                                .toString();
174                        args.add(java);
175                        String classpath = getJarPath(getClass()) + File.pathSeparator + getJarPath(IpcServer.class);
176                        args.add("-cp");
177                        args.add(classpath);
178                        String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
179                        if (timeout != null) {
180                            args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
181                        }
182                        args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
183                        args.add(IpcServer.class.getName());
184                        args.add(family.name());
185                        args.add(tmpaddr);
186                        args.add(rand);
187                        ProcessBuilder processBuilder = new ProcessBuilder();
188                        ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
189                        Files.createDirectories(logPath);
190                        Process process = processBuilder
191                                .directory(lockFile.getParent().toFile())
192                                .command(args)
193                                .redirectOutput(discard)
194                                .redirectError(discard)
195                                .start();
196                        close = process::destroyForcibly;
197                    }
198                } else {
199                    args.add(syncPath.resolve(syncCmd).toString());
200                    String timeout = System.getProperty(IpcServer.SYSTEM_PROP_IDLE_TIMEOUT);
201                    if (timeout != null) {
202                        args.add("-D" + IpcServer.SYSTEM_PROP_IDLE_TIMEOUT + "=" + timeout);
203                    }
204                    args.add("-D" + IpcServer.SYSTEM_PROP_DEBUG + "=" + debug);
205                    args.add(family.name());
206                    args.add(tmpaddr);
207                    args.add(rand);
208                    ProcessBuilder processBuilder = new ProcessBuilder();
209                    ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(logFile.toFile());
210                    Files.createDirectories(logPath);
211                    Process process = processBuilder
212                            .directory(lockFile.getParent().toFile())
213                            .command(args)
214                            .redirectOutput(discard)
215                            .redirectError(discard)
216                            .start();
217                    close = process::destroyForcibly;
218                }
219
220                ExecutorService es = Executors.newSingleThreadExecutor();
221                Future<String[]> future = es.submit(() -> {
222                    SocketChannel s = ss.accept();
223                    DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
224                    String rand2 = dis.readUTF();
225                    String addr2 = dis.readUTF();
226                    return new String[] {rand2, addr2};
227                });
228                String[] res;
229                try {
230                    res = future.get(5, TimeUnit.SECONDS);
231                } catch (Exception e) {
232                    try (PrintWriter writer = new PrintWriter(new FileWriter(logFile.toFile(), true))) {
233                        writer.println("Arguments:");
234                        args.forEach(writer::println);
235                        writer.println();
236                        writer.println("Exception:");
237                        e.printStackTrace(writer);
238                    }
239                    close.close();
240                    throw e;
241                } finally {
242                    es.shutdownNow();
243                    ss.close();
244                }
245                if (!Objects.equals(rand, res[0])) {
246                    close.close();
247                    throw new IllegalStateException("IpcServer did not respond with the correct random");
248                }
249
250                SocketAddress addr = SocketFamily.fromString(res[1]);
251                SocketChannel socket = SocketChannel.open(addr);
252
253                raf.seek(0);
254                raf.writeBytes(res[1] + "\n");
255                return socket;
256            } catch (Exception e) {
257                throw new RuntimeException("Unable to create and connect to lock server", e);
258            }
259        }
260    }
261
262    private String getJarPath(Class<?> clazz) {
263        String classpath;
264        String className = clazz.getName().replace('.', '/') + ".class";
265        String url = clazz.getClassLoader().getResource(className).toString();
266        if (url.startsWith("jar:")) {
267            url = url.substring("jar:".length(), url.indexOf("!/"));
268            if (url.startsWith("file:")) {
269                classpath = url.substring("file:".length());
270            } else {
271                throw new IllegalStateException();
272            }
273        } else if (url.startsWith("file:")) {
274            classpath = url.substring("file:".length(), url.indexOf(className));
275        } else {
276            throw new IllegalStateException();
277        }
278        if (IS_WINDOWS) {
279            if (classpath.startsWith("/")) {
280                classpath = classpath.substring(1);
281            }
282            classpath = classpath.replace('/', '\\');
283        }
284
285        return classpath;
286    }
287
288    void receive() {
289        try {
290            while (true) {
291                int id = input.readInt();
292                int sz = input.readInt();
293                List<String> s = new ArrayList<>(sz);
294                for (int i = 0; i < sz; i++) {
295                    s.add(input.readUTF());
296                }
297                CompletableFuture<List<String>> f = responses.remove(id);
298                if (f == null || s.isEmpty()) {
299                    throw new IllegalStateException("Protocol error");
300                }
301                f.complete(s);
302            }
303        } catch (EOFException e) {
304            // server is stopped; just quit
305        } catch (Exception e) {
306            close(e);
307        }
308    }
309
310    List<String> send(List<String> request, long time, TimeUnit unit) throws TimeoutException, IOException {
311        ensureInitialized();
312        int id = requestId.incrementAndGet();
313        CompletableFuture<List<String>> response = new CompletableFuture<>();
314        responses.put(id, response);
315        synchronized (output) {
316            output.writeInt(id);
317            output.writeInt(request.size());
318            for (String s : request) {
319                output.writeUTF(s);
320            }
321            output.flush();
322        }
323        try {
324            return response.get(time, unit);
325        } catch (InterruptedException e) {
326            throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
327        } catch (ExecutionException e) {
328            throw new IOException("Execution error", e);
329        }
330    }
331
332    void close() {
333        if (noFork) {
334            stopServer();
335        }
336        close(new IOException("Closing"));
337    }
338
339    synchronized void close(Throwable e) {
340        if (socket != null) {
341            try {
342                socket.close();
343            } catch (IOException t) {
344                e.addSuppressed(t);
345            }
346            socket = null;
347            input = null;
348            output = null;
349        }
350        if (receiver != null) {
351            receiver.interrupt();
352            try {
353                receiver.join(1000);
354            } catch (InterruptedException t) {
355                e.addSuppressed(t);
356            }
357        }
358        responses.values().forEach(f -> f.completeExceptionally(e));
359        responses.clear();
360    }
361
362    String newContext(boolean shared, long time, TimeUnit unit) throws TimeoutException {
363        RuntimeException error = new RuntimeException("Unable to create new sync context");
364        for (int i = 0; i < 2; i++) {
365            try {
366                List<String> response = send(Arrays.asList(REQUEST_CONTEXT, Boolean.toString(shared)), time, unit);
367                if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
368                    throw new IOException("Unexpected response: " + response);
369                }
370                return response.get(1);
371            } catch (TimeoutException e) {
372                throw e;
373            } catch (Exception e) {
374                close(e);
375                error.addSuppressed(e);
376            }
377        }
378        throw error;
379    }
380
381    void lock(String contextId, Collection<String> keys, long time, TimeUnit unit) throws TimeoutException {
382        try {
383            List<String> req = new ArrayList<>(keys.size() + 2);
384            req.add(REQUEST_ACQUIRE);
385            req.add(contextId);
386            req.addAll(keys);
387            List<String> response = send(req, time, unit);
388            if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
389                throw new IOException("Unexpected response: " + response);
390            }
391        } catch (TimeoutException e) {
392            throw e;
393        } catch (Exception e) {
394            close(e);
395            throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
396        }
397    }
398
399    void unlock(String contextId) {
400        try {
401            List<String> response =
402                    send(Arrays.asList(REQUEST_CLOSE, contextId), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
403            if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
404                throw new IOException("Unexpected response: " + response);
405            }
406        } catch (Exception e) {
407            close(e);
408            throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
409        }
410    }
411
412    /**
413     * To be used in tests to stop server immediately. Should not be used outside of tests.
414     */
415    void stopServer() {
416        try {
417            List<String> response = send(Arrays.asList(REQUEST_STOP), Long.MAX_VALUE, TimeUnit.MILLISECONDS);
418            if (response.size() != 1 || !RESPONSE_STOP.equals(response.get(0))) {
419                throw new IOException("Unexpected response: " + response);
420            }
421        } catch (Exception e) {
422            close(e);
423            throw new RuntimeException("Unable to stop server", e);
424        }
425    }
426
427    @Override
428    public String toString() {
429        return "IpcClient{"
430                + "lockPath=" + lockPath + ","
431                + "syncServerPath=" + syncPath + ","
432                + "address='" + getAddress() + "'}";
433    }
434
435    private String getAddress() {
436        try {
437            return SocketFamily.toString(socket.getLocalAddress());
438        } catch (IOException e) {
439            return "[not bound]";
440        }
441    }
442}