001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.spi.connector.transport;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.eclipse.aether.transfer.TransferCancelledException;
028
029import static java.util.Objects.requireNonNull;
030
031/**
032 * A skeleton implementation for custom transporters.
033 */
034public abstract class AbstractTransporter implements Transporter {
035
036    private final AtomicBoolean closed;
037
038    /**
039     * Enables subclassing.
040     */
041    protected AbstractTransporter() {
042        closed = new AtomicBoolean();
043    }
044
045    public void peek(PeekTask task) throws Exception {
046        requireNonNull(task, "task cannot be null");
047
048        failIfClosed(task);
049        implPeek(task);
050    }
051
052    /**
053     * Implements {@link #peek(PeekTask)}, gets only called if the transporter has not been closed.
054     *
055     * @param task The existence check to perform, must not be {@code null}.
056     * @throws Exception If the existence of the specified resource could not be confirmed.
057     */
058    protected abstract void implPeek(PeekTask task) throws Exception;
059
060    public void get(GetTask task) throws Exception {
061        requireNonNull(task, "task cannot be null");
062
063        failIfClosed(task);
064        implGet(task);
065    }
066
067    /**
068     * Implements {@link #get(GetTask)}, gets only called if the transporter has not been closed.
069     *
070     * @param task The download to perform, must not be {@code null}.
071     * @throws Exception If the transfer failed.
072     */
073    protected abstract void implGet(GetTask task) throws Exception;
074
075    /**
076     * Performs stream-based I/O for the specified download task and notifies the configured transport listener.
077     * Subclasses might want to invoke this utility method from within their {@link #implGet(GetTask)} to avoid
078     * boilerplate I/O code.
079     *
080     * @param task The download to perform, must not be {@code null}.
081     * @param is The input stream to download the data from, must not be {@code null}.
082     * @param close {@code true} if the supplied input stream should be automatically closed, {@code false} to leave the
083     *            stream open.
084     * @param length The size in bytes of the downloaded resource or {@code -1} if unknown, not to be confused with the
085     *            length of the supplied input stream which might be smaller if the download is resumed.
086     * @param resume {@code true} if the download resumes from {@link GetTask#getResumeOffset()}, {@code false} if the
087     *            download starts at the first byte of the resource.
088     * @throws IOException If the transfer encountered an I/O error.
089     * @throws TransferCancelledException If the transfer was cancelled.
090     */
091    protected void utilGet(GetTask task, InputStream is, boolean close, long length, boolean resume)
092            throws IOException, TransferCancelledException {
093        try (OutputStream os = task.newOutputStream(resume)) {
094            task.getListener().transportStarted(resume ? task.getResumeOffset() : 0L, length);
095            copy(os, is, task.getListener());
096        } finally {
097            if (close) {
098                is.close();
099            }
100        }
101    }
102
103    public void put(PutTask task) throws Exception {
104        requireNonNull(task, "task cannot be null");
105
106        failIfClosed(task);
107        implPut(task);
108    }
109
110    /**
111     * Implements {@link #put(PutTask)}, gets only called if the transporter has not been closed.
112     *
113     * @param task The upload to perform, must not be {@code null}.
114     * @throws Exception If the transfer failed.
115     */
116    protected abstract void implPut(PutTask task) throws Exception;
117
118    /**
119     * Performs stream-based I/O for the specified upload task and notifies the configured transport listener.
120     * Subclasses might want to invoke this utility method from within their {@link #implPut(PutTask)} to avoid
121     * boilerplate I/O code.
122     *
123     * @param task The upload to perform, must not be {@code null}.
124     * @param os The output stream to upload the data to, must not be {@code null}.
125     * @param close {@code true} if the supplied output stream should be automatically closed, {@code false} to leave
126     *            the stream open.
127     * @throws IOException If the transfer encountered an I/O error.
128     * @throws TransferCancelledException If the transfer was cancelled.
129     */
130    protected void utilPut(PutTask task, OutputStream os, boolean close)
131            throws IOException, TransferCancelledException {
132        try (InputStream is = task.newInputStream()) {
133            task.getListener().transportStarted(0, task.getDataLength());
134            copy(os, is, task.getListener());
135        } finally {
136            if (close) {
137                os.close();
138            } else {
139                os.flush();
140            }
141        }
142    }
143
144    public void close() {
145        if (closed.compareAndSet(false, true)) {
146            implClose();
147        }
148    }
149
150    /**
151     * Implements {@link #close()}, gets only called if the transporter has not already been closed.
152     */
153    protected abstract void implClose();
154
155    private void failIfClosed(TransportTask task) {
156        if (closed.get()) {
157            throw new IllegalStateException("transporter closed, cannot execute task " + task);
158        }
159    }
160
161    private static void copy(OutputStream os, InputStream is, TransportListener listener)
162            throws IOException, TransferCancelledException {
163        byte[] buffer = new byte[1024 * 32];
164        for (int read = is.read(buffer); read >= 0; read = is.read(buffer)) {
165            os.write(buffer, 0, read);
166            listener.transportProgressed(ByteBuffer.wrap(buffer, 0, read));
167        }
168    }
169}