1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.internal.impl.util;
20
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class PhasingExecutor implements Executor, AutoCloseable {
56 private static final AtomicInteger ID = new AtomicInteger(0);
57 private static final Logger LOGGER = LoggerFactory.getLogger(PhasingExecutor.class);
58
59 private final ExecutorService executor;
60 private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
61 private final AtomicBoolean inPhase = new AtomicBoolean(false);
62 private final AtomicInteger activeTaskCount = new AtomicInteger(0);
63 private final AtomicInteger completedTaskCount = new AtomicInteger(0);
64 private final int id = ID.incrementAndGet();
65 private final ReentrantLock lock = new ReentrantLock();
66 private final Condition taskCompletionCondition = lock.newCondition();
67
68 public PhasingExecutor(ExecutorService executor) {
69 this.executor = executor;
70 log("[{}][general] PhasingExecutor created.");
71 }
72
73 @Override
74 public void execute(Runnable command) {
75 activeTaskCount.incrementAndGet();
76 log("[{}][task] Task submitted. Active tasks: {}", activeTaskCount.get());
77 executor.execute(() -> {
78 try {
79 log("[{}][task] Task executing. Active tasks: {}", activeTaskCount.get());
80 command.run();
81 } finally {
82 lock.lock();
83 try {
84 completedTaskCount.incrementAndGet();
85 activeTaskCount.decrementAndGet();
86 log("[{}][task] Task completed. Active tasks: {}", activeTaskCount.get());
87 taskCompletionCondition.signalAll();
88 if (activeTaskCount.get() == 0 && shutdownInitiated.get()) {
89 log("[{}][task] Last task completed. Initiating executor shutdown.");
90 executor.shutdown();
91 }
92 } finally {
93 lock.unlock();
94 }
95 }
96 });
97 }
98
99 public AutoCloseable phase() {
100 if (inPhase.getAndSet(true)) {
101 throw new IllegalStateException("Already in a phase");
102 }
103 int phaseNumber = completedTaskCount.get();
104 log("[{}][phase] Entering phase {}. Active tasks: {}", phaseNumber, activeTaskCount.get());
105 return () -> {
106 try {
107 int tasksAtPhaseStart = completedTaskCount.get();
108 log("[{}][phase] Closing phase {}. Waiting for all tasks to complete.", phaseNumber);
109 lock.lock();
110 try {
111 while (activeTaskCount.get() > 0
112 && completedTaskCount.get() - tasksAtPhaseStart < activeTaskCount.get()) {
113 taskCompletionCondition.await(100, TimeUnit.MILLISECONDS);
114 }
115 } finally {
116 lock.unlock();
117 }
118 log("[{}][phase] Phase {} completed. Total completed tasks: {}", phaseNumber, completedTaskCount.get());
119 } catch (InterruptedException e) {
120 log("[{}][phase] Phase {} was interrupted.", phaseNumber);
121 Thread.currentThread().interrupt();
122 throw new RuntimeException("Phase interrupted", e);
123 } finally {
124 inPhase.set(false);
125 }
126 };
127 }
128
129 @Override
130 public void close() {
131 log("[{}][close] Closing PhasingExecutor. Active tasks: {}", activeTaskCount.get());
132 if (shutdownInitiated.getAndSet(true)) {
133 log("[{}][close] Shutdown already initiated. Returning.");
134 return;
135 }
136
137 lock.lock();
138 try {
139 while (activeTaskCount.get() > 0) {
140 log("[{}][close] Waiting for {} active tasks to complete.", activeTaskCount.get());
141 taskCompletionCondition.await(100, TimeUnit.MILLISECONDS);
142 }
143 } catch (InterruptedException e) {
144 log("[{}][close] Interrupted while waiting for tasks to complete.");
145 Thread.currentThread().interrupt();
146 } finally {
147 lock.unlock();
148 log("[{}][close] All tasks completed. Shutting down executor.");
149 executor.shutdown();
150 }
151 log("[{}][close] PhasingExecutor closed. Total completed tasks: {}", completedTaskCount.get());
152 }
153
154 private void log(String message) {
155 LOGGER.debug(message, id);
156 }
157
158 private void log(String message, Object o1) {
159 LOGGER.debug(message, id, o1);
160 }
161
162 private void log(String message, Object o1, Object o2) {
163 LOGGER.debug(message, id, o1, o2);
164 }
165 }