001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.spi.connector.transport;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.eclipse.aether.transfer.TransferCancelledException;
029
030/**
031 * A skeleton implementation for custom transporters.
032 */
033public abstract class AbstractTransporter implements Transporter {
034
035    private final AtomicBoolean closed;
036
037    /**
038     * Enables subclassing.
039     */
040    protected AbstractTransporter() {
041        closed = new AtomicBoolean();
042    }
043
044    public void peek(PeekTask task) throws Exception {
045        Objects.requireNonNull(task, "task cannot be null");
046
047        failIfClosed(task);
048        implPeek(task);
049    }
050
051    /**
052     * Implements {@link #peek(PeekTask)}, gets only called if the transporter has not been closed.
053     *
054     * @param task The existence check to perform, must not be {@code null}.
055     * @throws Exception If the existence of the specified resource could not be confirmed.
056     */
057    protected abstract void implPeek(PeekTask task) throws Exception;
058
059    public void get(GetTask task) throws Exception {
060        Objects.requireNonNull(task, "task cannot be null");
061
062        failIfClosed(task);
063        implGet(task);
064    }
065
066    /**
067     * Implements {@link #get(GetTask)}, gets only called if the transporter has not been closed.
068     *
069     * @param task The download to perform, must not be {@code null}.
070     * @throws Exception If the transfer failed.
071     */
072    protected abstract void implGet(GetTask task) throws Exception;
073
074    /**
075     * Performs stream-based I/O for the specified download task and notifies the configured transport listener.
076     * Subclasses might want to invoke this utility method from within their {@link #implGet(GetTask)} to avoid
077     * boilerplate I/O code.
078     *
079     * @param task The download to perform, must not be {@code null}.
080     * @param is The input stream to download the data from, must not be {@code null}.
081     * @param close {@code true} if the supplied input stream should be automatically closed, {@code false} to leave the
082     *            stream open.
083     * @param length The size in bytes of the downloaded resource or {@code -1} if unknown, not to be confused with the
084     *            length of the supplied input stream which might be smaller if the download is resumed.
085     * @param resume {@code true} if the download resumes from {@link GetTask#getResumeOffset()}, {@code false} if the
086     *            download starts at the first byte of the resource.
087     * @throws IOException If the transfer encountered an I/O error.
088     * @throws TransferCancelledException If the transfer was cancelled.
089     */
090    protected void utilGet(GetTask task, InputStream is, boolean close, long length, boolean resume)
091            throws IOException, TransferCancelledException {
092        try (OutputStream os = task.newOutputStream(resume)) {
093            task.getListener().transportStarted(resume ? task.getResumeOffset() : 0L, length);
094            copy(os, is, task.getListener());
095        } finally {
096            if (close) {
097                is.close();
098            }
099        }
100    }
101
102    public void put(PutTask task) throws Exception {
103        Objects.requireNonNull(task, "task cannot be null");
104
105        failIfClosed(task);
106        implPut(task);
107    }
108
109    /**
110     * Implements {@link #put(PutTask)}, gets only called if the transporter has not been closed.
111     *
112     * @param task The upload to perform, must not be {@code null}.
113     * @throws Exception If the transfer failed.
114     */
115    protected abstract void implPut(PutTask task) throws Exception;
116
117    /**
118     * Performs stream-based I/O for the specified upload task and notifies the configured transport listener.
119     * Subclasses might want to invoke this utility method from within their {@link #implPut(PutTask)} to avoid
120     * boilerplate I/O code.
121     *
122     * @param task The upload to perform, must not be {@code null}.
123     * @param os The output stream to upload the data to, must not be {@code null}.
124     * @param close {@code true} if the supplied output stream should be automatically closed, {@code false} to leave
125     *            the stream open.
126     * @throws IOException If the transfer encountered an I/O error.
127     * @throws TransferCancelledException If the transfer was cancelled.
128     */
129    protected void utilPut(PutTask task, OutputStream os, boolean close)
130            throws IOException, TransferCancelledException {
131        try (InputStream is = task.newInputStream()) {
132            task.getListener().transportStarted(0, task.getDataLength());
133            copy(os, is, task.getListener());
134        } finally {
135            if (close) {
136                os.close();
137            } else {
138                os.flush();
139            }
140        }
141    }
142
143    public void close() {
144        if (closed.compareAndSet(false, true)) {
145            implClose();
146        }
147    }
148
149    /**
150     * Implements {@link #close()}, gets only called if the transporter has not already been closed.
151     */
152    protected abstract void implClose();
153
154    private void failIfClosed(TransportTask task) {
155        if (closed.get()) {
156            throw new IllegalStateException("transporter closed, cannot execute task " + task);
157        }
158    }
159
160    private static void copy(OutputStream os, InputStream is, TransportListener listener)
161            throws IOException, TransferCancelledException {
162        byte[] buffer = new byte[1024 * 32];
163        for (int read = is.read(buffer); read >= 0; read = is.read(buffer)) {
164            os.write(buffer, 0, read);
165            listener.transportProgressed(ByteBuffer.wrap(buffer, 0, read));
166        }
167    }
168}