1 package org.eclipse.aether.util.concurrency;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
28 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
29 *
30 * <pre>
31 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
32 * for ( Runnable task : tasks )
33 * {
34 * executor.execute( errorForwarder.wrap( task ) );
35 * }
36 * errorForwarder.await();
37 * </pre>
38 */
39 public final class RunnableErrorForwarder
40 {
41
42 private final Thread thread = Thread.currentThread();
43
44 private final AtomicInteger counter = new AtomicInteger();
45
46 private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
47
48 /**
49 * Creates a new error forwarder for worker threads spawned by the current thread.
50 */
51 public RunnableErrorForwarder()
52 {
53 }
54
55 /**
56 * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
57 *
58 * @param runnable The runnable from which to forward errors, must not be {@code null}.
59 * @return The error-forwarding runnable to eventually execute, never {@code null}.
60 */
61 public Runnable wrap( final Runnable runnable )
62 {
63 if ( runnable == null )
64 {
65 throw new IllegalArgumentException( "runnable missing" );
66 }
67
68 counter.incrementAndGet();
69
70 return new Runnable()
71 {
72 public void run()
73 {
74 try
75 {
76 runnable.run();
77 }
78 catch ( RuntimeException e )
79 {
80 error.compareAndSet( null, e );
81 throw e;
82 }
83 catch ( Error e )
84 {
85 error.compareAndSet( null, e );
86 throw e;
87 }
88 finally
89 {
90 counter.decrementAndGet();
91 LockSupport.unpark( thread );
92 }
93 }
94 };
95 }
96
97 /**
98 * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
99 * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
100 * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
101 * method must be called from the same thread that created this error forwarder instance.
102 */
103 public void await()
104 {
105 awaitTerminationOfAllRunnables();
106
107 Throwable error = this.error.get();
108 if ( error != null )
109 {
110 if ( error instanceof RuntimeException )
111 {
112 throw (RuntimeException) error;
113 }
114 else if ( error instanceof ThreadDeath )
115 {
116 throw new IllegalStateException( error );
117 }
118 else if ( error instanceof Error )
119 {
120 throw (Error) error;
121 }
122 throw new IllegalStateException( error );
123 }
124 }
125
126 private void awaitTerminationOfAllRunnables()
127 {
128 if ( !thread.equals( Thread.currentThread() ) )
129 {
130 throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
131 + Thread.currentThread() );
132 }
133
134 boolean interrupted = false;
135
136 while ( counter.get() > 0 )
137 {
138 LockSupport.park();
139
140 if ( Thread.interrupted() )
141 {
142 interrupted = true;
143 }
144 }
145
146 if ( interrupted )
147 {
148 Thread.currentThread().interrupt();
149 }
150 }
151
152 }