View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugins.invoker;
20  
21  import java.util.Comparator;
22  import java.util.LinkedHashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.stream.Collectors;
29  
30  import org.apache.maven.plugins.invoker.model.BuildJob;
31  
32  /**
33   * Execute build jobs with parallel.
34   *
35   * @author Slawomir Jaranowski
36   */
37  class JobExecutor {
38      interface ThrowableJobConsumer {
39          void accept(BuildJob t) throws Throwable;
40      }
41  
42      private final List<BuildJob> jobs;
43      private final int threadsCount;
44  
45      JobExecutor(List<BuildJob> jobs, int threadsCount) {
46          this.jobs = jobs;
47          this.threadsCount = threadsCount;
48      }
49  
50      public void forEach(ThrowableJobConsumer jobConsumer) {
51          // group and sort jobs by ordinal
52          Map<Integer, List<BuildJob>> groupedJobs = jobs.stream()
53                  .sorted(Comparator.comparing(BuildJob::getOrdinal).reversed())
54                  .collect(Collectors.groupingBy(BuildJob::getOrdinal, LinkedHashMap::new, Collectors.toList()));
55  
56          ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
57  
58          groupedJobs.forEach((key, value) -> {
59              // prepare list of callable tasks
60              List<Callable<Void>> callableJobs = value.stream()
61                      .map(buildJob -> (Callable<Void>) () -> {
62                          try {
63                              jobConsumer.accept(buildJob);
64                          } catch (Throwable e) {
65                              buildJob.setResult(BuildJob.Result.ERROR);
66                              buildJob.setFailureMessage(String.valueOf(e));
67                          }
68                          return null;
69                      })
70                      .collect(Collectors.toList());
71  
72              try {
73                  executorService.invokeAll(callableJobs);
74              } catch (InterruptedException e) {
75                  Thread.currentThread().interrupt();
76                  throw new RuntimeException(e);
77              }
78          });
79  
80          // all task are finished here
81          executorService.shutdownNow();
82      }
83  }