001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.util.concurrency; 020 021import java.util.concurrent.Callable; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Future; 025import java.util.concurrent.Semaphore; 026 027/** 028 * Utilities for executors and sizing them. 029 * <em>Big fat note:</em> Do not use this class outside of resolver. This and related classes are not meant as "drop 030 * in replacement" for Jave Executors, is used in very controlled fashion only. 031 * 032 * @since 2.0.11 033 */ 034public interface SmartExecutor extends AutoCloseable { 035 /** 036 * Submits a {@link Runnable} to execution. 037 */ 038 void submit(Runnable runnable); 039 040 /** 041 * Submits a {@link Callable} to execution, returns a {@link CompletableFuture}. 042 */ 043 <T> Future<T> submit(Callable<T> callable); 044 045 /** 046 * Shut down this instance (ideally used in try-with-resource construct). 047 */ 048 void close(); 049 050 /** 051 * Direct executor (caller executes). 052 */ 053 class Direct implements SmartExecutor { 054 @Override 055 public void submit(Runnable runnable) { 056 runnable.run(); 057 } 058 059 @Override 060 public <T> CompletableFuture<T> submit(Callable<T> callable) { 061 CompletableFuture<T> future = new CompletableFuture<>(); 062 try { 063 future.complete(callable.call()); 064 } catch (Exception e) { 065 future.completeExceptionally(e); 066 } 067 return future; 068 } 069 070 @Override 071 public void close() {} 072 } 073 074 /** 075 * Pooled executor backed by {@link ExecutorService}. 076 */ 077 class Pooled implements SmartExecutor { 078 private final ExecutorService executor; 079 080 Pooled(ExecutorService executor) { 081 this.executor = executor; 082 } 083 084 @Override 085 public void submit(Runnable runnable) { 086 ClassLoader tccl = Thread.currentThread().getContextClassLoader(); 087 executor.submit(() -> { 088 ClassLoader old = Thread.currentThread().getContextClassLoader(); 089 Thread.currentThread().setContextClassLoader(tccl); 090 try { 091 runnable.run(); 092 } finally { 093 Thread.currentThread().setContextClassLoader(old); 094 } 095 }); 096 } 097 098 @Override 099 public <T> Future<T> submit(Callable<T> callable) { 100 ClassLoader tccl = Thread.currentThread().getContextClassLoader(); 101 CompletableFuture<T> future = new CompletableFuture<>(); 102 executor.submit(() -> { 103 ClassLoader old = Thread.currentThread().getContextClassLoader(); 104 Thread.currentThread().setContextClassLoader(tccl); 105 try { 106 future.complete(callable.call()); 107 } catch (Exception e) { 108 future.completeExceptionally(e); 109 } finally { 110 Thread.currentThread().setContextClassLoader(old); 111 } 112 }); 113 return future; 114 } 115 116 @Override 117 public void close() { 118 executor.shutdown(); 119 } 120 } 121 122 /** 123 * Limited executor, where the actual goal is to protect accessed resource, like when virtual threads 124 * are being used, so the "pool" itself does not provide any kind of back-pressure. 125 */ 126 class Limited implements SmartExecutor { 127 private final SmartExecutor executor; 128 private final Semaphore semaphore; 129 130 Limited(SmartExecutor executor, int limit) { 131 this.executor = executor; 132 this.semaphore = new Semaphore(limit); 133 } 134 135 @Override 136 public void submit(Runnable runnable) { 137 try { 138 semaphore.acquire(); 139 executor.submit(() -> { 140 try { 141 runnable.run(); 142 } finally { 143 semaphore.release(); 144 } 145 }); 146 } catch (InterruptedException e) { 147 Thread.currentThread().interrupt(); 148 } 149 } 150 151 @Override 152 public <T> Future<T> submit(Callable<T> callable) { 153 try { 154 semaphore.acquire(); 155 CompletableFuture<T> future = new CompletableFuture<>(); 156 executor.submit(() -> { 157 try { 158 future.complete(callable.call()); 159 } catch (Exception e) { 160 future.completeExceptionally(e); 161 } finally { 162 semaphore.release(); 163 } 164 }); 165 return future; 166 } catch (InterruptedException e) { 167 Thread.currentThread().interrupt(); 168 CompletableFuture<T> failed = new CompletableFuture<>(); 169 failed.completeExceptionally(e); 170 return failed; 171 } 172 } 173 174 @Override 175 public void close() { 176 executor.close(); 177 } 178 } 179 180 /** 181 * Wrapper to prevent closing. 182 */ 183 class NonClosing implements SmartExecutor { 184 private final SmartExecutor smartExecutor; 185 186 NonClosing(SmartExecutor smartExecutor) { 187 this.smartExecutor = smartExecutor; 188 } 189 190 @Override 191 public void submit(Runnable runnable) { 192 smartExecutor.submit(runnable); 193 } 194 195 @Override 196 public <T> Future<T> submit(Callable<T> callable) { 197 return smartExecutor.submit(callable); 198 } 199 200 @Override 201 public void close() { 202 // nope; delegate is managed 203 } 204 } 205}