001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.util.concurrency; 020 021import java.util.concurrent.Callable; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Future; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.Semaphore; 027 028/** 029 * Utilities for executors and sizing them. 030 * <em>Big fat note:</em> Do not use this class outside of resolver. This and related classes are not meant as "drop 031 * in replacement" for Jave Executors, is used in very controlled fashion only. 032 * 033 * @since 2.0.11 034 */ 035public interface SmartExecutor extends AutoCloseable { 036 /** 037 * Submits a {@link Runnable} to execution. 038 */ 039 void submit(Runnable runnable); 040 041 /** 042 * Submits a {@link Callable} to execution, returns a {@link CompletableFuture}. 043 */ 044 <T> Future<T> submit(Callable<T> callable); 045 046 /** 047 * Shut down this instance (ideally used in try-with-resource construct). 048 */ 049 void close(); 050 051 /** 052 * Direct executor (caller executes). 053 */ 054 class Direct implements SmartExecutor { 055 @Override 056 public void submit(Runnable runnable) { 057 runnable.run(); 058 } 059 060 @Override 061 public <T> CompletableFuture<T> submit(Callable<T> callable) { 062 CompletableFuture<T> future = new CompletableFuture<>(); 063 try { 064 future.complete(callable.call()); 065 } catch (Exception e) { 066 future.completeExceptionally(e); 067 } 068 return future; 069 } 070 071 @Override 072 public void close() {} 073 } 074 075 /** 076 * Pooled executor backed by {@link ExecutorService}. 077 */ 078 class Pooled implements SmartExecutor { 079 private final ExecutorService executor; 080 081 Pooled(ExecutorService executor) { 082 this.executor = executor; 083 } 084 085 @Override 086 public void submit(Runnable runnable) { 087 ClassLoader tccl = Thread.currentThread().getContextClassLoader(); 088 try { 089 executor.submit(() -> { 090 ClassLoader old = Thread.currentThread().getContextClassLoader(); 091 Thread.currentThread().setContextClassLoader(tccl); 092 try { 093 runnable.run(); 094 } finally { 095 Thread.currentThread().setContextClassLoader(old); 096 } 097 }); 098 } catch (RejectedExecutionException e) { 099 try { 100 runnable.run(); 101 } catch (RuntimeException | Error t) { 102 // swallow to match async submit() semantics where exceptions 103 // are captured by the Future; callers like RunnableErrorForwarder 104 // already record the error before re-throwing 105 } 106 } 107 } 108 109 @Override 110 public <T> Future<T> submit(Callable<T> callable) { 111 ClassLoader tccl = Thread.currentThread().getContextClassLoader(); 112 CompletableFuture<T> future = new CompletableFuture<>(); 113 try { 114 executor.submit(() -> { 115 ClassLoader old = Thread.currentThread().getContextClassLoader(); 116 Thread.currentThread().setContextClassLoader(tccl); 117 try { 118 future.complete(callable.call()); 119 } catch (Exception e) { 120 future.completeExceptionally(e); 121 } finally { 122 Thread.currentThread().setContextClassLoader(old); 123 } 124 }); 125 } catch (RejectedExecutionException e) { 126 try { 127 future.complete(callable.call()); 128 } catch (Exception ex) { 129 future.completeExceptionally(ex); 130 } 131 } 132 return future; 133 } 134 135 @Override 136 public void close() { 137 executor.shutdown(); 138 } 139 } 140 141 /** 142 * Limited executor, where the actual goal is to protect accessed resource, like when virtual threads 143 * are being used, so the "pool" itself does not provide any kind of back-pressure. 144 */ 145 class Limited implements SmartExecutor { 146 private final SmartExecutor executor; 147 private final Semaphore semaphore; 148 149 Limited(SmartExecutor executor, int limit) { 150 this.executor = executor; 151 this.semaphore = new Semaphore(limit); 152 } 153 154 @Override 155 public void submit(Runnable runnable) { 156 try { 157 semaphore.acquire(); 158 try { 159 executor.submit(() -> { 160 try { 161 runnable.run(); 162 } finally { 163 semaphore.release(); 164 } 165 }); 166 } catch (RejectedExecutionException e) { 167 try { 168 runnable.run(); 169 } catch (RuntimeException | Error t) { 170 // swallow to match async submit() semantics where exceptions 171 // are captured by the Future; callers like RunnableErrorForwarder 172 // already record the error before re-throwing 173 } finally { 174 semaphore.release(); 175 } 176 } 177 } catch (InterruptedException e) { 178 Thread.currentThread().interrupt(); 179 } 180 } 181 182 @Override 183 public <T> Future<T> submit(Callable<T> callable) { 184 try { 185 semaphore.acquire(); 186 CompletableFuture<T> future = new CompletableFuture<>(); 187 try { 188 executor.submit(() -> { 189 try { 190 future.complete(callable.call()); 191 } catch (Exception e) { 192 future.completeExceptionally(e); 193 } finally { 194 semaphore.release(); 195 } 196 }); 197 } catch (RejectedExecutionException e) { 198 try { 199 future.complete(callable.call()); 200 } catch (Exception ex) { 201 future.completeExceptionally(ex); 202 } finally { 203 semaphore.release(); 204 } 205 } 206 return future; 207 } catch (InterruptedException e) { 208 Thread.currentThread().interrupt(); 209 CompletableFuture<T> failed = new CompletableFuture<>(); 210 failed.completeExceptionally(e); 211 return failed; 212 } 213 } 214 215 @Override 216 public void close() { 217 executor.close(); 218 } 219 } 220 221 /** 222 * Wrapper to prevent closing. 223 */ 224 class NonClosing implements SmartExecutor { 225 private final SmartExecutor smartExecutor; 226 227 NonClosing(SmartExecutor smartExecutor) { 228 this.smartExecutor = smartExecutor; 229 } 230 231 @Override 232 public void submit(Runnable runnable) { 233 smartExecutor.submit(runnable); 234 } 235 236 @Override 237 public <T> Future<T> submit(Callable<T> callable) { 238 return smartExecutor.submit(callable); 239 } 240 241 @Override 242 public void close() { 243 // nope; delegate is managed 244 } 245 } 246}