001package org.eclipse.aether.util.concurrency;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 * 
012 *  http://www.apache.org/licenses/LICENSE-2.0
013 * 
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.concurrent.atomic.AtomicInteger;
023import java.util.concurrent.atomic.AtomicReference;
024import java.util.concurrent.locks.LockSupport;
025
026/**
027 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
028 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
029 * 
030 * <pre>
031 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
032 * for ( Runnable task : tasks )
033 * {
034 *     executor.execute( errorForwarder.wrap( task ) );
035 * }
036 * errorForwarder.await();
037 * </pre>
038 */
039public final class RunnableErrorForwarder
040{
041
042    private final Thread thread = Thread.currentThread();
043
044    private final AtomicInteger counter = new AtomicInteger();
045
046    private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
047
048    /**
049     * Creates a new error forwarder for worker threads spawned by the current thread.
050     */
051    public RunnableErrorForwarder()
052    {
053    }
054
055    /**
056     * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
057     * 
058     * @param runnable The runnable from which to forward errors, must not be {@code null}.
059     * @return The error-forwarding runnable to eventually execute, never {@code null}.
060     */
061    public Runnable wrap( final Runnable runnable )
062    {
063        if ( runnable == null )
064        {
065            throw new IllegalArgumentException( "runnable missing" );
066        }
067
068        counter.incrementAndGet();
069
070        return new Runnable()
071        {
072            public void run()
073            {
074                try
075                {
076                    runnable.run();
077                }
078                catch ( RuntimeException e )
079                {
080                    error.compareAndSet( null, e );
081                    throw e;
082                }
083                catch ( Error e )
084                {
085                    error.compareAndSet( null, e );
086                    throw e;
087                }
088                finally
089                {
090                    counter.decrementAndGet();
091                    LockSupport.unpark( thread );
092                }
093            }
094        };
095    }
096
097    /**
098     * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
099     * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
100     * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
101     * method must be called from the same thread that created this error forwarder instance.
102     */
103    public void await()
104    {
105        awaitTerminationOfAllRunnables();
106
107        Throwable error = this.error.get();
108        if ( error != null )
109        {
110            if ( error instanceof RuntimeException )
111            {
112                throw (RuntimeException) error;
113            }
114            else if ( error instanceof ThreadDeath )
115            {
116                throw new IllegalStateException( error );
117            }
118            else if ( error instanceof Error )
119            {
120                throw (Error) error;
121            }
122            throw new IllegalStateException( error );
123        }
124    }
125
126    private void awaitTerminationOfAllRunnables()
127    {
128        if ( !thread.equals( Thread.currentThread() ) )
129        {
130            throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
131                + Thread.currentThread() );
132        }
133
134        boolean interrupted = false;
135
136        while ( counter.get() > 0 )
137        {
138            LockSupport.park();
139
140            if ( Thread.interrupted() )
141            {
142                interrupted = true;
143            }
144        }
145
146        if ( interrupted )
147        {
148            Thread.currentThread().interrupt();
149        }
150    }
151
152}