001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.util.concurrency;
020
021import java.util.concurrent.Callable;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.Semaphore;
027
028/**
029 * Utilities for executors and sizing them.
030 * <em>Big fat note:</em> Do not use this class outside of resolver. This and related classes are not meant as "drop
031 * in replacement" for Jave Executors, is used in very controlled fashion only.
032 *
033 * @since 2.0.11
034 */
035public interface SmartExecutor extends AutoCloseable {
036    /**
037     * Submits a {@link Runnable} to execution.
038     */
039    void submit(Runnable runnable);
040
041    /**
042     * Submits a {@link Callable} to execution, returns a {@link CompletableFuture}.
043     */
044    <T> Future<T> submit(Callable<T> callable);
045
046    /**
047     * Shut down this instance (ideally used in try-with-resource construct).
048     */
049    void close();
050
051    /**
052     * Direct executor (caller executes).
053     */
054    class Direct implements SmartExecutor {
055        @Override
056        public void submit(Runnable runnable) {
057            runnable.run();
058        }
059
060        @Override
061        public <T> CompletableFuture<T> submit(Callable<T> callable) {
062            CompletableFuture<T> future = new CompletableFuture<>();
063            try {
064                future.complete(callable.call());
065            } catch (Exception e) {
066                future.completeExceptionally(e);
067            }
068            return future;
069        }
070
071        @Override
072        public void close() {}
073    }
074
075    /**
076     * Pooled executor backed by {@link ExecutorService}.
077     */
078    class Pooled implements SmartExecutor {
079        private final ExecutorService executor;
080
081        Pooled(ExecutorService executor) {
082            this.executor = executor;
083        }
084
085        @Override
086        public void submit(Runnable runnable) {
087            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
088            try {
089                executor.submit(() -> {
090                    ClassLoader old = Thread.currentThread().getContextClassLoader();
091                    Thread.currentThread().setContextClassLoader(tccl);
092                    try {
093                        runnable.run();
094                    } finally {
095                        Thread.currentThread().setContextClassLoader(old);
096                    }
097                });
098            } catch (RejectedExecutionException e) {
099                try {
100                    runnable.run();
101                } catch (RuntimeException | Error t) {
102                    // swallow to match async submit() semantics where exceptions
103                    // are captured by the Future; callers like RunnableErrorForwarder
104                    // already record the error before re-throwing
105                }
106            }
107        }
108
109        @Override
110        public <T> Future<T> submit(Callable<T> callable) {
111            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
112            CompletableFuture<T> future = new CompletableFuture<>();
113            try {
114                executor.submit(() -> {
115                    ClassLoader old = Thread.currentThread().getContextClassLoader();
116                    Thread.currentThread().setContextClassLoader(tccl);
117                    try {
118                        future.complete(callable.call());
119                    } catch (Exception e) {
120                        future.completeExceptionally(e);
121                    } finally {
122                        Thread.currentThread().setContextClassLoader(old);
123                    }
124                });
125            } catch (RejectedExecutionException e) {
126                try {
127                    future.complete(callable.call());
128                } catch (Exception ex) {
129                    future.completeExceptionally(ex);
130                }
131            }
132            return future;
133        }
134
135        @Override
136        public void close() {
137            executor.shutdown();
138        }
139    }
140
141    /**
142     * Limited executor, where the actual goal is to protect accessed resource, like when virtual threads
143     * are being used, so the "pool" itself does not provide any kind of back-pressure.
144     */
145    class Limited implements SmartExecutor {
146        private final SmartExecutor executor;
147        private final Semaphore semaphore;
148
149        Limited(SmartExecutor executor, int limit) {
150            this.executor = executor;
151            this.semaphore = new Semaphore(limit);
152        }
153
154        @Override
155        public void submit(Runnable runnable) {
156            try {
157                semaphore.acquire();
158                try {
159                    executor.submit(() -> {
160                        try {
161                            runnable.run();
162                        } finally {
163                            semaphore.release();
164                        }
165                    });
166                } catch (RejectedExecutionException e) {
167                    try {
168                        runnable.run();
169                    } catch (RuntimeException | Error t) {
170                        // swallow to match async submit() semantics where exceptions
171                        // are captured by the Future; callers like RunnableErrorForwarder
172                        // already record the error before re-throwing
173                    } finally {
174                        semaphore.release();
175                    }
176                }
177            } catch (InterruptedException e) {
178                Thread.currentThread().interrupt();
179            }
180        }
181
182        @Override
183        public <T> Future<T> submit(Callable<T> callable) {
184            try {
185                semaphore.acquire();
186                CompletableFuture<T> future = new CompletableFuture<>();
187                try {
188                    executor.submit(() -> {
189                        try {
190                            future.complete(callable.call());
191                        } catch (Exception e) {
192                            future.completeExceptionally(e);
193                        } finally {
194                            semaphore.release();
195                        }
196                    });
197                } catch (RejectedExecutionException e) {
198                    try {
199                        future.complete(callable.call());
200                    } catch (Exception ex) {
201                        future.completeExceptionally(ex);
202                    } finally {
203                        semaphore.release();
204                    }
205                }
206                return future;
207            } catch (InterruptedException e) {
208                Thread.currentThread().interrupt();
209                CompletableFuture<T> failed = new CompletableFuture<>();
210                failed.completeExceptionally(e);
211                return failed;
212            }
213        }
214
215        @Override
216        public void close() {
217            executor.close();
218        }
219    }
220
221    /**
222     * Wrapper to prevent closing.
223     */
224    class NonClosing implements SmartExecutor {
225        private final SmartExecutor smartExecutor;
226
227        NonClosing(SmartExecutor smartExecutor) {
228            this.smartExecutor = smartExecutor;
229        }
230
231        @Override
232        public void submit(Runnable runnable) {
233            smartExecutor.submit(runnable);
234        }
235
236        @Override
237        public <T> Future<T> submit(Callable<T> callable) {
238            return smartExecutor.submit(callable);
239        }
240
241        @Override
242        public void close() {
243            // nope; delegate is managed
244        }
245    }
246}