001package org.eclipse.aether.util.concurrency;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 * 
012 *  http://www.apache.org/licenses/LICENSE-2.0
013 * 
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import static java.util.Objects.requireNonNull;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.concurrent.locks.LockSupport;
027
028/**
029 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
030 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
031 * 
032 * <pre>
033 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
034 * for ( Runnable task : tasks )
035 * {
036 *     executor.execute( errorForwarder.wrap( task ) );
037 * }
038 * errorForwarder.await();
039 * </pre>
040 */
041public final class RunnableErrorForwarder
042{
043
044    private final Thread thread = Thread.currentThread();
045
046    private final AtomicInteger counter = new AtomicInteger();
047
048    private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
049
050    /**
051     * Creates a new error forwarder for worker threads spawned by the current thread.
052     */
053    public RunnableErrorForwarder()
054    {
055    }
056
057    /**
058     * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
059     *
060     * @param runnable The runnable from which to forward errors, must not be {@code null}.
061     * @return The error-forwarding runnable to eventually execute, never {@code null}.
062     */
063    public Runnable wrap( final Runnable runnable )
064    {
065        requireNonNull( runnable, "runnable cannot be null" );
066
067        counter.incrementAndGet();
068
069        return new Runnable()
070        {
071            public void run()
072            {
073                try
074                {
075                    runnable.run();
076                }
077                catch ( RuntimeException e )
078                {
079                    error.compareAndSet( null, e );
080                    throw e;
081                }
082                catch ( Error e )
083                {
084                    error.compareAndSet( null, e );
085                    throw e;
086                }
087                finally
088                {
089                    counter.decrementAndGet();
090                    LockSupport.unpark( thread );
091                }
092            }
093        };
094    }
095
096    /**
097     * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
098     * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
099     * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
100     * method must be called from the same thread that created this error forwarder instance.
101     */
102    public void await()
103    {
104        awaitTerminationOfAllRunnables();
105
106        Throwable error = this.error.get();
107        if ( error != null )
108        {
109            if ( error instanceof RuntimeException )
110            {
111                throw (RuntimeException) error;
112            }
113            else if ( error instanceof ThreadDeath )
114            {
115                throw new IllegalStateException( error );
116            }
117            else if ( error instanceof Error )
118            {
119                throw (Error) error;
120            }
121            throw new IllegalStateException( error );
122        }
123    }
124
125    private void awaitTerminationOfAllRunnables()
126    {
127        if ( !thread.equals( Thread.currentThread() ) )
128        {
129            throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
130                + Thread.currentThread() );
131        }
132
133        boolean interrupted = false;
134
135        while ( counter.get() > 0 )
136        {
137            LockSupport.park();
138
139            if ( Thread.interrupted() )
140            {
141                interrupted = true;
142            }
143        }
144
145        if ( interrupted )
146        {
147            Thread.currentThread().interrupt();
148        }
149    }
150
151}