001package org.eclipse.aether.util.concurrency;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 * 
012 *  http://www.apache.org/licenses/LICENSE-2.0
013 * 
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import static java.util.Objects.requireNonNull;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.concurrent.locks.LockSupport;
027
028/**
029 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
030 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
031 * 
032 * <pre>
033 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
034 * for ( Runnable task : tasks )
035 * {
036 *     executor.execute( errorForwarder.wrap( task ) );
037 * }
038 * errorForwarder.await();
039 * </pre>
040 */
041public final class RunnableErrorForwarder
042{
043
044    private final Thread thread = Thread.currentThread();
045
046    private final AtomicInteger counter = new AtomicInteger();
047
048    private final AtomicReference<Throwable> error = new AtomicReference<>();
049
050    /**
051     * Creates a new error forwarder for worker threads spawned by the current thread.
052     */
053    public RunnableErrorForwarder()
054    {
055    }
056
057    /**
058     * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
059     *
060     * @param runnable The runnable from which to forward errors, must not be {@code null}.
061     * @return The error-forwarding runnable to eventually execute, never {@code null}.
062     */
063    public Runnable wrap( final Runnable runnable )
064    {
065        requireNonNull( runnable, "runnable cannot be null" );
066
067        counter.incrementAndGet();
068
069        return new Runnable()
070        {
071            public void run()
072            {
073                try
074                {
075                    runnable.run();
076                }
077                catch ( RuntimeException | Error e )
078                {
079                    error.compareAndSet( null, e );
080                    throw e;
081                }
082                finally
083                {
084                    counter.decrementAndGet();
085                    LockSupport.unpark( thread );
086                }
087            }
088        };
089    }
090
091    /**
092     * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
093     * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
094     * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
095     * method must be called from the same thread that created this error forwarder instance.
096     */
097    public void await()
098    {
099        awaitTerminationOfAllRunnables();
100
101        Throwable error = this.error.get();
102        if ( error != null )
103        {
104            if ( error instanceof RuntimeException )
105            {
106                throw (RuntimeException) error;
107            }
108            else if ( error instanceof ThreadDeath )
109            {
110                throw new IllegalStateException( error );
111            }
112            else if ( error instanceof Error )
113            {
114                throw (Error) error;
115            }
116            throw new IllegalStateException( error );
117        }
118    }
119
120    private void awaitTerminationOfAllRunnables()
121    {
122        if ( !thread.equals( Thread.currentThread() ) )
123        {
124            throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
125                + Thread.currentThread() );
126        }
127
128        boolean interrupted = false;
129
130        while ( counter.get() > 0 )
131        {
132            LockSupport.park();
133
134            if ( Thread.interrupted() )
135            {
136                interrupted = true;
137            }
138        }
139
140        if ( interrupted )
141        {
142            Thread.currentThread().interrupt();
143        }
144    }
145
146}