1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugins.invoker;
20
21 import java.util.Comparator;
22 import java.util.LinkedHashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.stream.Collectors;
29
30 import org.apache.maven.plugins.invoker.model.BuildJob;
31
32
33
34
35
36
37 class JobExecutor {
38 interface ThrowableJobConsumer {
39 void accept(BuildJob t) throws Throwable;
40 }
41
42 private final List<BuildJob> jobs;
43 private final int threadsCount;
44
45 JobExecutor(List<BuildJob> jobs, int threadsCount) {
46 this.jobs = jobs;
47 this.threadsCount = threadsCount;
48 }
49
50 public void forEach(ThrowableJobConsumer jobConsumer) {
51
52 Map<Integer, List<BuildJob>> groupedJobs = jobs.stream()
53 .sorted(Comparator.comparing(BuildJob::getOrdinal).reversed())
54 .collect(Collectors.groupingBy(BuildJob::getOrdinal, LinkedHashMap::new, Collectors.toList()));
55
56 ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
57
58 groupedJobs.forEach((key, value) -> {
59
60 List<Callable<Void>> callableJobs = value.stream()
61 .map(buildJob -> (Callable<Void>) () -> {
62 try {
63 jobConsumer.accept(buildJob);
64 } catch (Throwable e) {
65 buildJob.setResult(BuildJob.Result.ERROR);
66 buildJob.setFailureMessage(String.valueOf(e));
67 }
68 return null;
69 })
70 .collect(Collectors.toList());
71
72 try {
73 executorService.invokeAll(callableJobs);
74 } catch (InterruptedException e) {
75 Thread.currentThread().interrupt();
76 throw new RuntimeException(e);
77 }
78 });
79
80
81 executorService.shutdownNow();
82 }
83 }