View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.util.concurrency;
20  
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.Semaphore;
26  
27  /**
28   * Utilities for executors and sizing them.
29   * <em>Big fat note:</em> Do not use this class outside of resolver. This and related classes are not meant as "drop
30   * in replacement" for Jave Executors, is used in very controlled fashion only.
31   *
32   * @since 2.0.11
33   */
34  public interface SmartExecutor extends AutoCloseable {
35      /**
36       * Submits a {@link Runnable} to execution.
37       */
38      void submit(Runnable runnable);
39  
40      /**
41       * Submits a {@link Callable} to execution, returns a {@link CompletableFuture}.
42       */
43      <T> Future<T> submit(Callable<T> callable);
44  
45      /**
46       * Shut down this instance (ideally used in try-with-resource construct).
47       */
48      void close();
49  
50      /**
51       * Direct executor (caller executes).
52       */
53      class Direct implements SmartExecutor {
54          @Override
55          public void submit(Runnable runnable) {
56              runnable.run();
57          }
58  
59          @Override
60          public <T> CompletableFuture<T> submit(Callable<T> callable) {
61              CompletableFuture<T> future = new CompletableFuture<>();
62              try {
63                  future.complete(callable.call());
64              } catch (Exception e) {
65                  future.completeExceptionally(e);
66              }
67              return future;
68          }
69  
70          @Override
71          public void close() {}
72      }
73  
74      /**
75       * Pooled executor backed by {@link ExecutorService}.
76       */
77      class Pooled implements SmartExecutor {
78          private final ExecutorService executor;
79  
80          Pooled(ExecutorService executor) {
81              this.executor = executor;
82          }
83  
84          @Override
85          public void submit(Runnable runnable) {
86              ClassLoader tccl = Thread.currentThread().getContextClassLoader();
87              executor.submit(() -> {
88                  ClassLoader old = Thread.currentThread().getContextClassLoader();
89                  Thread.currentThread().setContextClassLoader(tccl);
90                  try {
91                      runnable.run();
92                  } finally {
93                      Thread.currentThread().setContextClassLoader(old);
94                  }
95              });
96          }
97  
98          @Override
99          public <T> Future<T> submit(Callable<T> callable) {
100             ClassLoader tccl = Thread.currentThread().getContextClassLoader();
101             CompletableFuture<T> future = new CompletableFuture<>();
102             executor.submit(() -> {
103                 ClassLoader old = Thread.currentThread().getContextClassLoader();
104                 Thread.currentThread().setContextClassLoader(tccl);
105                 try {
106                     future.complete(callable.call());
107                 } catch (Exception e) {
108                     future.completeExceptionally(e);
109                 } finally {
110                     Thread.currentThread().setContextClassLoader(old);
111                 }
112             });
113             return future;
114         }
115 
116         @Override
117         public void close() {
118             executor.shutdown();
119         }
120     }
121 
122     /**
123      * Limited executor, where the actual goal is to protect accessed resource, like when virtual threads
124      * are being used, so the "pool" itself does not provide any kind of back-pressure.
125      */
126     class Limited implements SmartExecutor {
127         private final SmartExecutor executor;
128         private final Semaphore semaphore;
129 
130         Limited(SmartExecutor executor, int limit) {
131             this.executor = executor;
132             this.semaphore = new Semaphore(limit);
133         }
134 
135         @Override
136         public void submit(Runnable runnable) {
137             try {
138                 semaphore.acquire();
139                 executor.submit(() -> {
140                     try {
141                         runnable.run();
142                     } finally {
143                         semaphore.release();
144                     }
145                 });
146             } catch (InterruptedException e) {
147                 Thread.currentThread().interrupt();
148             }
149         }
150 
151         @Override
152         public <T> Future<T> submit(Callable<T> callable) {
153             try {
154                 semaphore.acquire();
155                 CompletableFuture<T> future = new CompletableFuture<>();
156                 executor.submit(() -> {
157                     try {
158                         future.complete(callable.call());
159                     } catch (Exception e) {
160                         future.completeExceptionally(e);
161                     } finally {
162                         semaphore.release();
163                     }
164                 });
165                 return future;
166             } catch (InterruptedException e) {
167                 Thread.currentThread().interrupt();
168                 CompletableFuture<T> failed = new CompletableFuture<>();
169                 failed.completeExceptionally(e);
170                 return failed;
171             }
172         }
173 
174         @Override
175         public void close() {
176             executor.close();
177         }
178     }
179 
180     /**
181      * Wrapper to prevent closing.
182      */
183     class NonClosing implements SmartExecutor {
184         private final SmartExecutor smartExecutor;
185 
186         NonClosing(SmartExecutor smartExecutor) {
187             this.smartExecutor = smartExecutor;
188         }
189 
190         @Override
191         public void submit(Runnable runnable) {
192             smartExecutor.submit(runnable);
193         }
194 
195         @Override
196         public <T> Future<T> submit(Callable<T> callable) {
197             return smartExecutor.submit(callable);
198         }
199 
200         @Override
201         public void close() {
202             // nope; delegate is managed
203         }
204     }
205 }