View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.internal.impl.util;
20  
21  import java.util.concurrent.Executor;
22  import java.util.concurrent.ExecutorService;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.ReentrantLock;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  /**
33   * The phasing executor allows executing tasks in parallel and waiting for all tasks
34   * to be executed before fully closing the executor. Tasks can be submitted even after
35   * the close method has been called, allowing for use with try-with-resources.
36   * The {@link #phase()} method can be used to submit tasks and wait for them to be
37   * executed without closing the executor.
38   *
39   * <p>Example usage:
40   * <pre>
41   * try (PhasingExecutor executor = createExecutor()) {
42   *     try (var phase = executor.phase()) {
43   *         executor.execute(() -> { /* task 1 *&#47; });
44   *         executor.execute(() -> { /* task 2 *&#47; });
45   *         More tasks...
46   *     } This will wait for all tasks in this phase to complete
47   *
48   *     You can have multiple phases
49   *     try (var anotherPhase = executor.phase()) {
50   *         executor.execute(() -> { /* another task *&#47; });
51   *     }
52   * } The executor will wait for all tasks to complete before shutting down
53   * </pre>
54   */
55  public class PhasingExecutor implements Executor, AutoCloseable {
56      private static final AtomicInteger ID = new AtomicInteger(0);
57      private static final Logger LOGGER = LoggerFactory.getLogger(PhasingExecutor.class);
58  
59      private final ExecutorService executor;
60      private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
61      private final AtomicBoolean inPhase = new AtomicBoolean(false);
62      private final AtomicInteger activeTaskCount = new AtomicInteger(0);
63      private final AtomicInteger completedTaskCount = new AtomicInteger(0);
64      private final int id = ID.incrementAndGet();
65      private final ReentrantLock lock = new ReentrantLock();
66      private final Condition taskCompletionCondition = lock.newCondition();
67  
68      public PhasingExecutor(ExecutorService executor) {
69          this.executor = executor;
70          log("[{}][general] PhasingExecutor created.");
71      }
72  
73      @Override
74      public void execute(Runnable command) {
75          activeTaskCount.incrementAndGet();
76          log("[{}][task] Task submitted. Active tasks: {}", activeTaskCount.get());
77          executor.execute(() -> {
78              try {
79                  log("[{}][task] Task executing. Active tasks: {}", activeTaskCount.get());
80                  command.run();
81              } finally {
82                  lock.lock();
83                  try {
84                      completedTaskCount.incrementAndGet();
85                      activeTaskCount.decrementAndGet();
86                      log("[{}][task] Task completed. Active tasks: {}", activeTaskCount.get());
87                      taskCompletionCondition.signalAll();
88                      if (activeTaskCount.get() == 0 && shutdownInitiated.get()) {
89                          log("[{}][task] Last task completed. Initiating executor shutdown.");
90                          executor.shutdown();
91                      }
92                  } finally {
93                      lock.unlock();
94                  }
95              }
96          });
97      }
98  
99      public AutoCloseable phase() {
100         if (inPhase.getAndSet(true)) {
101             throw new IllegalStateException("Already in a phase");
102         }
103         int phaseNumber = completedTaskCount.get();
104         log("[{}][phase] Entering phase {}. Active tasks: {}", phaseNumber, activeTaskCount.get());
105         return () -> {
106             try {
107                 int tasksAtPhaseStart = completedTaskCount.get();
108                 log("[{}][phase] Closing phase {}. Waiting for all tasks to complete.", phaseNumber);
109                 lock.lock();
110                 try {
111                     while (activeTaskCount.get() > 0
112                             && completedTaskCount.get() - tasksAtPhaseStart < activeTaskCount.get()) {
113                         taskCompletionCondition.await(100, TimeUnit.MILLISECONDS);
114                     }
115                 } finally {
116                     lock.unlock();
117                 }
118                 log("[{}][phase] Phase {} completed. Total completed tasks: {}", phaseNumber, completedTaskCount.get());
119             } catch (InterruptedException e) {
120                 log("[{}][phase] Phase {} was interrupted.", phaseNumber);
121                 Thread.currentThread().interrupt();
122                 throw new RuntimeException("Phase interrupted", e);
123             } finally {
124                 inPhase.set(false);
125             }
126         };
127     }
128 
129     @Override
130     public void close() {
131         log("[{}][close] Closing PhasingExecutor. Active tasks: {}", activeTaskCount.get());
132         if (shutdownInitiated.getAndSet(true)) {
133             log("[{}][close] Shutdown already initiated. Returning.");
134             return;
135         }
136 
137         lock.lock();
138         try {
139             while (activeTaskCount.get() > 0) {
140                 log("[{}][close] Waiting for {} active tasks to complete.", activeTaskCount.get());
141                 taskCompletionCondition.await(100, TimeUnit.MILLISECONDS);
142             }
143         } catch (InterruptedException e) {
144             log("[{}][close] Interrupted while waiting for tasks to complete.");
145             Thread.currentThread().interrupt();
146         } finally {
147             lock.unlock();
148             log("[{}][close] All tasks completed. Shutting down executor.");
149             executor.shutdown();
150         }
151         log("[{}][close] PhasingExecutor closed. Total completed tasks: {}", completedTaskCount.get());
152     }
153 
154     private void log(String message) {
155         LOGGER.debug(message, id);
156     }
157 
158     private void log(String message, Object o1) {
159         LOGGER.debug(message, id, o1);
160     }
161 
162     private void log(String message, Object o1, Object o2) {
163         LOGGER.debug(message, id, o1, o2);
164     }
165 }