1 package org.eclipse.aether.util.concurrency;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import static java.util.Objects.requireNonNull;
23
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.LockSupport;
27
28 /**
29 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
30 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
31 *
32 * <pre>
33 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
34 * for ( Runnable task : tasks )
35 * {
36 * executor.execute( errorForwarder.wrap( task ) );
37 * }
38 * errorForwarder.await();
39 * </pre>
40 */
41 public final class RunnableErrorForwarder
42 {
43
44 private final Thread thread = Thread.currentThread();
45
46 private final AtomicInteger counter = new AtomicInteger();
47
48 private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
49
50 /**
51 * Creates a new error forwarder for worker threads spawned by the current thread.
52 */
53 public RunnableErrorForwarder()
54 {
55 }
56
57 /**
58 * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
59 *
60 * @param runnable The runnable from which to forward errors, must not be {@code null}.
61 * @return The error-forwarding runnable to eventually execute, never {@code null}.
62 */
63 public Runnable wrap( final Runnable runnable )
64 {
65 requireNonNull( runnable, "runnable cannot be null" );
66
67 counter.incrementAndGet();
68
69 return new Runnable()
70 {
71 public void run()
72 {
73 try
74 {
75 runnable.run();
76 }
77 catch ( RuntimeException e )
78 {
79 error.compareAndSet( null, e );
80 throw e;
81 }
82 catch ( Error e )
83 {
84 error.compareAndSet( null, e );
85 throw e;
86 }
87 finally
88 {
89 counter.decrementAndGet();
90 LockSupport.unpark( thread );
91 }
92 }
93 };
94 }
95
96 /**
97 * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
98 * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
99 * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
100 * method must be called from the same thread that created this error forwarder instance.
101 */
102 public void await()
103 {
104 awaitTerminationOfAllRunnables();
105
106 Throwable error = this.error.get();
107 if ( error != null )
108 {
109 if ( error instanceof RuntimeException )
110 {
111 throw (RuntimeException) error;
112 }
113 else if ( error instanceof ThreadDeath )
114 {
115 throw new IllegalStateException( error );
116 }
117 else if ( error instanceof Error )
118 {
119 throw (Error) error;
120 }
121 throw new IllegalStateException( error );
122 }
123 }
124
125 private void awaitTerminationOfAllRunnables()
126 {
127 if ( !thread.equals( Thread.currentThread() ) )
128 {
129 throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
130 + Thread.currentThread() );
131 }
132
133 boolean interrupted = false;
134
135 while ( counter.get() > 0 )
136 {
137 LockSupport.park();
138
139 if ( Thread.interrupted() )
140 {
141 interrupted = true;
142 }
143 }
144
145 if ( interrupted )
146 {
147 Thread.currentThread().interrupt();
148 }
149 }
150
151 }