001package org.eclipse.aether.util.concurrency; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.concurrent.atomic.AtomicInteger; 023import java.util.concurrent.atomic.AtomicReference; 024import java.util.concurrent.locks.LockSupport; 025 026/** 027 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in 028 * a worker thread back to the parent thread. The simplified usage pattern looks like this: 029 * 030 * <pre> 031 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder(); 032 * for ( Runnable task : tasks ) 033 * { 034 * executor.execute( errorForwarder.wrap( task ) ); 035 * } 036 * errorForwarder.await(); 037 * </pre> 038 */ 039public final class RunnableErrorForwarder 040{ 041 042 private final Thread thread = Thread.currentThread(); 043 044 private final AtomicInteger counter = new AtomicInteger(); 045 046 private final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); 047 048 /** 049 * Creates a new error forwarder for worker threads spawned by the current thread. 050 */ 051 public RunnableErrorForwarder() 052 { 053 } 054 055 /** 056 * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors. 057 * 058 * @param runnable The runnable from which to forward errors, must not be {@code null}. 059 * @return The error-forwarding runnable to eventually execute, never {@code null}. 060 */ 061 public Runnable wrap( final Runnable runnable ) 062 { 063 if ( runnable == null ) 064 { 065 throw new IllegalArgumentException( "runnable missing" ); 066 } 067 068 counter.incrementAndGet(); 069 070 return new Runnable() 071 { 072 public void run() 073 { 074 try 075 { 076 runnable.run(); 077 } 078 catch ( RuntimeException e ) 079 { 080 error.compareAndSet( null, e ); 081 throw e; 082 } 083 catch ( Error e ) 084 { 085 error.compareAndSet( null, e ); 086 throw e; 087 } 088 finally 089 { 090 counter.decrementAndGet(); 091 LockSupport.unpark( thread ); 092 } 093 } 094 }; 095 } 096 097 /** 098 * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated 099 * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In 100 * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This 101 * method must be called from the same thread that created this error forwarder instance. 102 */ 103 public void await() 104 { 105 awaitTerminationOfAllRunnables(); 106 107 Throwable error = this.error.get(); 108 if ( error != null ) 109 { 110 if ( error instanceof RuntimeException ) 111 { 112 throw (RuntimeException) error; 113 } 114 else if ( error instanceof ThreadDeath ) 115 { 116 throw new IllegalStateException( error ); 117 } 118 else if ( error instanceof Error ) 119 { 120 throw (Error) error; 121 } 122 throw new IllegalStateException( error ); 123 } 124 } 125 126 private void awaitTerminationOfAllRunnables() 127 { 128 if ( !thread.equals( Thread.currentThread() ) ) 129 { 130 throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not " 131 + Thread.currentThread() ); 132 } 133 134 boolean interrupted = false; 135 136 while ( counter.get() > 0 ) 137 { 138 LockSupport.park(); 139 140 if ( Thread.interrupted() ) 141 { 142 interrupted = true; 143 } 144 } 145 146 if ( interrupted ) 147 { 148 Thread.currentThread().interrupt(); 149 } 150 } 151 152}