View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.util.concurrency;
20  
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.Semaphore;
27  
28  /**
29   * Utilities for executors and sizing them.
30   * <em>Big fat note:</em> Do not use this class outside of resolver. This and related classes are not meant as "drop
31   * in replacement" for Jave Executors, is used in very controlled fashion only.
32   *
33   * @since 2.0.11
34   */
35  public interface SmartExecutor extends AutoCloseable {
36      /**
37       * Submits a {@link Runnable} to execution.
38       */
39      void submit(Runnable runnable);
40  
41      /**
42       * Submits a {@link Callable} to execution, returns a {@link CompletableFuture}.
43       */
44      <T> Future<T> submit(Callable<T> callable);
45  
46      /**
47       * Shut down this instance (ideally used in try-with-resource construct).
48       */
49      void close();
50  
51      /**
52       * Direct executor (caller executes).
53       */
54      class Direct implements SmartExecutor {
55          @Override
56          public void submit(Runnable runnable) {
57              runnable.run();
58          }
59  
60          @Override
61          public <T> CompletableFuture<T> submit(Callable<T> callable) {
62              CompletableFuture<T> future = new CompletableFuture<>();
63              try {
64                  future.complete(callable.call());
65              } catch (Exception e) {
66                  future.completeExceptionally(e);
67              }
68              return future;
69          }
70  
71          @Override
72          public void close() {}
73      }
74  
75      /**
76       * Pooled executor backed by {@link ExecutorService}.
77       */
78      class Pooled implements SmartExecutor {
79          private final ExecutorService executor;
80  
81          Pooled(ExecutorService executor) {
82              this.executor = executor;
83          }
84  
85          @Override
86          public void submit(Runnable runnable) {
87              ClassLoader tccl = Thread.currentThread().getContextClassLoader();
88              try {
89                  executor.submit(() -> {
90                      ClassLoader old = Thread.currentThread().getContextClassLoader();
91                      Thread.currentThread().setContextClassLoader(tccl);
92                      try {
93                          runnable.run();
94                      } finally {
95                          Thread.currentThread().setContextClassLoader(old);
96                      }
97                  });
98              } catch (RejectedExecutionException e) {
99                  try {
100                     runnable.run();
101                 } catch (RuntimeException | Error t) {
102                     // swallow to match async submit() semantics where exceptions
103                     // are captured by the Future; callers like RunnableErrorForwarder
104                     // already record the error before re-throwing
105                 }
106             }
107         }
108 
109         @Override
110         public <T> Future<T> submit(Callable<T> callable) {
111             ClassLoader tccl = Thread.currentThread().getContextClassLoader();
112             CompletableFuture<T> future = new CompletableFuture<>();
113             try {
114                 executor.submit(() -> {
115                     ClassLoader old = Thread.currentThread().getContextClassLoader();
116                     Thread.currentThread().setContextClassLoader(tccl);
117                     try {
118                         future.complete(callable.call());
119                     } catch (Exception e) {
120                         future.completeExceptionally(e);
121                     } finally {
122                         Thread.currentThread().setContextClassLoader(old);
123                     }
124                 });
125             } catch (RejectedExecutionException e) {
126                 try {
127                     future.complete(callable.call());
128                 } catch (Exception ex) {
129                     future.completeExceptionally(ex);
130                 }
131             }
132             return future;
133         }
134 
135         @Override
136         public void close() {
137             executor.shutdown();
138         }
139     }
140 
141     /**
142      * Limited executor, where the actual goal is to protect accessed resource, like when virtual threads
143      * are being used, so the "pool" itself does not provide any kind of back-pressure.
144      */
145     class Limited implements SmartExecutor {
146         private final SmartExecutor executor;
147         private final Semaphore semaphore;
148 
149         Limited(SmartExecutor executor, int limit) {
150             this.executor = executor;
151             this.semaphore = new Semaphore(limit);
152         }
153 
154         @Override
155         public void submit(Runnable runnable) {
156             try {
157                 semaphore.acquire();
158                 try {
159                     executor.submit(() -> {
160                         try {
161                             runnable.run();
162                         } finally {
163                             semaphore.release();
164                         }
165                     });
166                 } catch (RejectedExecutionException e) {
167                     try {
168                         runnable.run();
169                     } catch (RuntimeException | Error t) {
170                         // swallow to match async submit() semantics where exceptions
171                         // are captured by the Future; callers like RunnableErrorForwarder
172                         // already record the error before re-throwing
173                     } finally {
174                         semaphore.release();
175                     }
176                 }
177             } catch (InterruptedException e) {
178                 Thread.currentThread().interrupt();
179             }
180         }
181 
182         @Override
183         public <T> Future<T> submit(Callable<T> callable) {
184             try {
185                 semaphore.acquire();
186                 CompletableFuture<T> future = new CompletableFuture<>();
187                 try {
188                     executor.submit(() -> {
189                         try {
190                             future.complete(callable.call());
191                         } catch (Exception e) {
192                             future.completeExceptionally(e);
193                         } finally {
194                             semaphore.release();
195                         }
196                     });
197                 } catch (RejectedExecutionException e) {
198                     try {
199                         future.complete(callable.call());
200                     } catch (Exception ex) {
201                         future.completeExceptionally(ex);
202                     } finally {
203                         semaphore.release();
204                     }
205                 }
206                 return future;
207             } catch (InterruptedException e) {
208                 Thread.currentThread().interrupt();
209                 CompletableFuture<T> failed = new CompletableFuture<>();
210                 failed.completeExceptionally(e);
211                 return failed;
212             }
213         }
214 
215         @Override
216         public void close() {
217             executor.close();
218         }
219     }
220 
221     /**
222      * Wrapper to prevent closing.
223      */
224     class NonClosing implements SmartExecutor {
225         private final SmartExecutor smartExecutor;
226 
227         NonClosing(SmartExecutor smartExecutor) {
228             this.smartExecutor = smartExecutor;
229         }
230 
231         @Override
232         public void submit(Runnable runnable) {
233             smartExecutor.submit(runnable);
234         }
235 
236         @Override
237         public <T> Future<T> submit(Callable<T> callable) {
238             return smartExecutor.submit(callable);
239         }
240 
241         @Override
242         public void close() {
243             // nope; delegate is managed
244         }
245     }
246 }