001package org.eclipse.aether.util.concurrency;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 * 
012 *  http://www.apache.org/licenses/LICENSE-2.0
013 * 
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import static java.util.Objects.requireNonNull;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.concurrent.locks.LockSupport;
027
028/**
029 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
030 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
031 * 
032 * <pre>
033 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
034 * for ( Runnable task : tasks )
035 * {
036 *     executor.execute( errorForwarder.wrap( task ) );
037 * }
038 * errorForwarder.await();
039 * </pre>
040 */
041public final class RunnableErrorForwarder
042{
043
044    private final Thread thread = Thread.currentThread();
045
046    private final AtomicInteger counter = new AtomicInteger();
047
048    private final AtomicReference<Throwable> error = new AtomicReference<>();
049
050    /**
051     * Creates a new error forwarder for worker threads spawned by the current thread.
052     */
053    public RunnableErrorForwarder()
054    {
055    }
056
057    /**
058     * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
059     *
060     * @param runnable The runnable from which to forward errors, must not be {@code null}.
061     * @return The error-forwarding runnable to eventually execute, never {@code null}.
062     */
063    public Runnable wrap( final Runnable runnable )
064    {
065        requireNonNull( runnable, "runnable cannot be null" );
066
067        counter.incrementAndGet();
068
069        return () ->
070        {
071            try
072            {
073                runnable.run();
074            }
075            catch ( RuntimeException | Error e )
076            {
077                error.compareAndSet( null, e );
078                throw e;
079            }
080            finally
081            {
082                counter.decrementAndGet();
083                LockSupport.unpark( thread );
084            }
085        };
086    }
087
088    /**
089     * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
090     * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
091     * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
092     * method must be called from the same thread that created this error forwarder instance.
093     */
094    public void await()
095    {
096        awaitTerminationOfAllRunnables();
097
098        Throwable error = this.error.get();
099        if ( error != null )
100        {
101            if ( error instanceof RuntimeException )
102            {
103                throw (RuntimeException) error;
104            }
105            else if ( error instanceof ThreadDeath )
106            {
107                throw new IllegalStateException( error );
108            }
109            else if ( error instanceof Error )
110            {
111                throw (Error) error;
112            }
113            throw new IllegalStateException( error );
114        }
115    }
116
117    private void awaitTerminationOfAllRunnables()
118    {
119        if ( !thread.equals( Thread.currentThread() ) )
120        {
121            throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
122                + Thread.currentThread() );
123        }
124
125        boolean interrupted = false;
126
127        while ( counter.get() > 0 )
128        {
129            LockSupport.park();
130
131            if ( Thread.interrupted() )
132            {
133                interrupted = true;
134            }
135        }
136
137        if ( interrupted )
138        {
139            Thread.currentThread().interrupt();
140        }
141    }
142
143}