View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.cli.transfer;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.ArrayBlockingQueue;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.function.Consumer;
27  
28  import org.eclipse.aether.transfer.AbstractTransferListener;
29  import org.eclipse.aether.transfer.TransferCancelledException;
30  import org.eclipse.aether.transfer.TransferEvent;
31  import org.eclipse.aether.transfer.TransferListener;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import static java.util.Objects.requireNonNull;
36  
37  /**
38   * A {@link TransferListener} implementation that wraps another delegate {@link TransferListener} but makes it run
39   * on single thread, keeping the listener logic simple. This listener also blocks on last transfer event to allow
40   * output to perform possible cleanup. It spawns a daemon thread to consume queued events that may fall in even
41   * concurrently.
42   *
43   * @since 4.0.0
44   */
45  public final class SimplexTransferListener extends AbstractTransferListener {
46      private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
47      private static final int QUEUE_SIZE = 1024;
48      private static final int BATCH_MAX_SIZE = 500;
49      private final TransferListener delegate;
50      private final int batchMaxSize;
51      private final boolean blockOnLastEvent;
52      private final ArrayBlockingQueue<Exchange> eventQueue;
53  
54      /**
55       * Constructor that makes passed in delegate run on single thread, and will block on last event.
56       */
57      public SimplexTransferListener(TransferListener delegate) {
58          this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true);
59      }
60  
61      /**
62       * Constructor that may alter behaviour of this listener.
63       *
64       * @param delegate The delegate that should run on single thread.
65       * @param queueSize The event queue size (default {@code 1024}).
66       * @param batchMaxSize The maximum batch size delegate should receive (default {@code 500}).
67       * @param blockOnLastEvent Should this listener block on last transfer end (completed or corrupted) block? (default {@code true}).
68       */
69      public SimplexTransferListener(
70              TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) {
71          this.delegate = requireNonNull(delegate);
72          if (queueSize < 1 || batchMaxSize < 1) {
73              throw new IllegalArgumentException("Queue and batch sizes must be greater than 1");
74          }
75          this.batchMaxSize = batchMaxSize;
76          this.blockOnLastEvent = blockOnLastEvent;
77  
78          this.eventQueue = new ArrayBlockingQueue<>(queueSize);
79          Thread updater = new Thread(this::feedConsumer);
80          updater.setDaemon(true);
81          updater.start();
82      }
83  
84      public TransferListener getDelegate() {
85          return delegate;
86      }
87  
88      private void feedConsumer() {
89          final ArrayList<Exchange> batch = new ArrayList<>(batchMaxSize);
90          try {
91              while (true) {
92                  batch.clear();
93                  if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) {
94                      batch.add(eventQueue.take());
95                  }
96                  demux(batch);
97              }
98          } catch (InterruptedException e) {
99              throw new RuntimeException(e);
100         }
101     }
102 
103     private void demux(List<Exchange> exchanges) {
104         for (Exchange exchange : exchanges) {
105             exchange.process(transferEvent -> {
106                 TransferEvent.EventType type = transferEvent.getType();
107                 try {
108                     switch (type) {
109                         case INITIATED:
110                             delegate.transferInitiated(transferEvent);
111                             break;
112                         case STARTED:
113                             delegate.transferStarted(transferEvent);
114                             break;
115                         case PROGRESSED:
116                             delegate.transferProgressed(transferEvent);
117                             break;
118                         case CORRUPTED:
119                             delegate.transferCorrupted(transferEvent);
120                             break;
121                         case SUCCEEDED:
122                             delegate.transferSucceeded(transferEvent);
123                             break;
124                         case FAILED:
125                             delegate.transferFailed(transferEvent);
126                             break;
127                         default:
128                             LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
129                     }
130                 } catch (TransferCancelledException e) {
131                     ongoing.put(new TransferResourceIdentifier(transferEvent.getResource()), Boolean.FALSE);
132                 }
133             });
134         }
135     }
136 
137     private void put(TransferEvent event, boolean last) {
138         try {
139             Exchange exchange;
140             if (blockOnLastEvent && last) {
141                 exchange = new BlockingExchange(event);
142             } else {
143                 exchange = new Exchange(event);
144             }
145             eventQueue.put(exchange);
146             exchange.waitForProcessed();
147         } catch (InterruptedException e) {
148             throw new RuntimeException(e);
149         }
150     }
151 
152     private final ConcurrentHashMap<TransferResourceIdentifier, Boolean> ongoing = new ConcurrentHashMap<>();
153 
154     @Override
155     public void transferInitiated(TransferEvent event) {
156         ongoing.putIfAbsent(new TransferResourceIdentifier(event.getResource()), Boolean.TRUE);
157         put(event, false);
158     }
159 
160     @Override
161     public void transferStarted(TransferEvent event) throws TransferCancelledException {
162         if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
163             throw new TransferCancelledException();
164         }
165         put(event, false);
166     }
167 
168     @Override
169     public void transferProgressed(TransferEvent event) throws TransferCancelledException {
170         if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
171             throw new TransferCancelledException();
172         }
173         put(event, false);
174     }
175 
176     @Override
177     public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
178         if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
179             throw new TransferCancelledException();
180         }
181         put(event, false);
182     }
183 
184     @Override
185     public void transferSucceeded(TransferEvent event) {
186         ongoing.remove(new TransferResourceIdentifier(event.getResource()));
187         put(event, ongoing.isEmpty());
188     }
189 
190     @Override
191     public void transferFailed(TransferEvent event) {
192         ongoing.remove(new TransferResourceIdentifier(event.getResource()));
193         put(event, ongoing.isEmpty());
194     }
195 
196     private static class Exchange {
197         private final TransferEvent event;
198 
199         private Exchange(TransferEvent event) {
200             this.event = event;
201         }
202 
203         public void process(Consumer<TransferEvent> consumer) {
204             consumer.accept(event);
205         }
206 
207         public void waitForProcessed() throws InterruptedException {
208             // nothing, is async
209         }
210     }
211 
212     private static class BlockingExchange extends Exchange {
213         private final CountDownLatch latch = new CountDownLatch(1);
214 
215         private BlockingExchange(TransferEvent event) {
216             super(event);
217         }
218 
219         @Override
220         public void process(Consumer<TransferEvent> consumer) {
221             super.process(consumer);
222             latch.countDown();
223         }
224 
225         @Override
226         public void waitForProcessed() throws InterruptedException {
227             latch.await();
228         }
229     }
230 }