1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15
16 package software.amazon.awssdk.core.sync;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.ByteBuffer;
23 import java.nio.file.DirectoryNotEmptyException;
24 import java.nio.file.FileAlreadyExistsException;
25 import java.nio.file.Files;
26 import java.nio.file.Path;
27 import software.amazon.awssdk.annotations.SdkPublicApi;
28 import software.amazon.awssdk.core.ResponseBytes;
29 import software.amazon.awssdk.core.ResponseInputStream;
30 import software.amazon.awssdk.core.exception.RetryableException;
31 import software.amazon.awssdk.core.exception.SdkClientException;
32 import software.amazon.awssdk.core.exception.SdkException;
33 import software.amazon.awssdk.core.internal.http.InterruptMonitor;
34 import software.amazon.awssdk.core.retry.RetryPolicy;
35 import software.amazon.awssdk.http.AbortableInputStream;
36 import software.amazon.awssdk.utils.IoUtils;
37 import software.amazon.awssdk.utils.Logger;
38
39 /**
40  * Interface for processing a streaming response from a service in a synchronous fashion. This interfaces gives
41  * access to the unmarshalled response POJO which may contain metadata about the streamed contents. It also provides access
42  * to the content via an {@link AbortableInputStream}. Callers do not need to worry about calling {@link InputStream#close()}
43  * on the content, but if they wish to stop reading data from the stream {@link AbortableInputStream#abort()} may be called
44  * to kill the underlying HTTP connection. This is generally not recommended and should only be done when the cost of reading
45  * the rest of the data exceeds the cost of establishing a new connection. If callers do not call abort and do not read all
46  * of the data in the stream, then the content will be drained by the SDK and the underlying HTTP connection will be returned to
47  * the connection pool (if applicable).
48  * <h3>Retries</h3>
49  * <p>
50  * Exceptions thrown from the transformer's {@link #transform(Object, AbortableInputStream)} method are not automatically retried
51  * by the RetryPolicy of the client. Since we can't know if a transformer implementation is idempotent or safe to retry, if you
52  * wish to retry on the event of a failure you must throw a {@link SdkException} with retryable set to true from the transformer.
53  * This exception can wrap the original exception that was thrown. Note that throwing a {@link
54  * SdkException} that is marked retryable from the transformer does not guarantee the request will be retried,
55  * retries are still limited by the max retry attempts and retry throttling
56  * feature of the {@link RetryPolicy}.
57  *
58  * <h3>Thread Interrupts</h3>
59  * <p>
60  * Implementations should have proper handling of Thread interrupts. For long running, non-interruptible tasks, it is recommended
61  * to check the thread interrupt status periodically and throw an {@link InterruptedException} if set. When an {@link
62  * InterruptedException} is thrown from a interruptible task, you should either re-interrupt the current thread and return or
63  * throw that {@link InterruptedException} from the {@link #transform(Object, AbortableInputStream)} method. Failure to do these
64  * things may prevent the SDK from stopping the request in a timely manner in the event the thread is interrupted externally.
65  *
66  * @param <ResponseT> Type of unmarshalled POJO response.
67  * @param <ReturnT>   Return type of the {@link #transform(Object, AbortableInputStream)} method. Implementations are free to
68  * perform whatever transformations are appropriate.
69  */

70 @FunctionalInterface
71 @SdkPublicApi
72 public interface ResponseTransformer<ResponseT, ReturnT> {
73     /**
74      * Process the response contents.
75      *
76      * @param response    Unmarshalled POJO response
77      * @param inputStream Input stream of streamed data.
78      * @return Transformed type.
79      * @throws Exception if any error occurs during processing of the response. This will be re-thrown by the SDK, possibly
80      *                   wrapped in an {@link SdkClientException}.
81      */

82     ReturnT transform(ResponseT response, AbortableInputStream inputStream) throws Exception;
83
84     /**
85      * Hook to allow connection to be left open after the SDK returns a response. Useful for returning the InputStream to
86      * the response content from the transformer.
87      *
88      * @return True if connection (and InputStream) should be left open after the SDK returns a response, false otherwise.
89      */

90     default boolean needsConnectionLeftOpen() {
91         return false;
92     }
93
94     /**
95      * Creates a response transformer that writes all response content to the specified file. If the file already exists
96      * then a {@link java.nio.file.FileAlreadyExistsException} will be thrown.
97      *
98      * @param path        Path to file to write to.
99      * @param <ResponseT> Type of unmarshalled response POJO.
100      * @return ResponseTransformer instance.
101      */

102     static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toFile(Path path) {
103         return (resp, in) -> {
104             try {
105                 InterruptMonitor.checkInterrupted();
106                 Files.copy(in, path);
107                 return resp;
108             } catch (IOException copyException) {
109                 String copyError = "Failed to read response into file: " + path;
110
111                 // If the write failed because of the state of the file, don't retry the request.
112                 if (copyException instanceof FileAlreadyExistsException || copyException instanceof DirectoryNotEmptyException) {
113                     throw new IOException(copyError, copyException);
114                 }
115
116                 // Try to clean up the file so that we can retry the request. If we can't delete it, don't retry the request.
117                 try {
118                     Files.deleteIfExists(path);
119                 } catch (IOException deletionException) {
120                     Logger.loggerFor(ResponseTransformer.class)
121                           .error(() -> "Failed to delete destination file '" + path +
122                                        "' after reading the service response " +
123                                        "failed.", deletionException);
124
125                     throw new IOException(copyError + ". Additionally, the file could not be cleaned up (" +
126                                           deletionException.getMessage() + "), so the request will not be retried.",
127                                           copyException);
128                 }
129
130                 // Retry the request
131                 throw RetryableException.builder().message(copyError).cause(copyException).build();
132             }
133         };
134     }
135
136     /**
137      * Creates a response transformer that writes all response content to the specified file. If the file already exists
138      * then a {@link java.nio.file.FileAlreadyExistsException} will be thrown.
139      *
140      * @param file        File to write to.
141      * @param <ResponseT> Type of unmarshalled response POJO.
142      * @return ResponseTransformer instance.
143      */

144     static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toFile(File file) {
145         return toFile(file.toPath());
146     }
147
148     /**
149      * Creates a response transformer that writes all response content to the given {@link OutputStream}. Note that
150      * the {@link OutputStream} is not closed or flushed after writing.
151      *
152      * @param outputStream Output stream to write data to.
153      * @param <ResponseT>  Type of unmarshalled response POJO.
154      * @return ResponseTransformer instance.
155      */

156     static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toOutputStream(OutputStream outputStream) {
157         return (resp, in) -> {
158             InterruptMonitor.checkInterrupted();
159             IoUtils.copy(in, outputStream);
160             return resp;
161         };
162     }
163
164     /**
165      * Creates a response transformer that loads all response content into memory, exposed as {@link ResponseBytes}. This allows
166      * for conversion into a {@link String}, {@link ByteBuffer}, etc.
167      *
168      * @param <ResponseT> Type of unmarshalled response POJO.
169      * @return The streaming response transformer that can be used on the client streaming method.
170      */

171     static <ResponseT> ResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() {
172         return (response, inputStream) -> {
173             try {
174                 InterruptMonitor.checkInterrupted();
175                 return ResponseBytes.fromByteArray(response, IoUtils.toByteArray(inputStream));
176             } catch (IOException e) {
177                 throw RetryableException.builder().message("Failed to read response.").cause(e).build();
178             }
179         };
180     }
181
182     /**
183      * Creates a response transformer that returns an unmanaged input stream with the response content. This input stream must
184      * be explicitly closed to release the connection. The unmarshalled response object can be obtained via the {@link
185      * ResponseInputStream#response} method.
186      * <p>
187      * Note that the returned stream is not subject to the retry policy or timeout settings (except for socket timeout)
188      * of the client. No retries will be performed in the event of a socket read failure or connection reset.
189      *
190      * @param <ResponseT> Type of unmarshalled response POJO.
191      * @return ResponseTransformer instance.
192      */

193     static <ResponseT> ResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toInputStream() {
194         return unmanaged(ResponseInputStream::new);
195     }
196
197     /**
198      * Static helper method to create a response transformer that allows the connection to be left open. Useful for creating a
199      * {@link ResponseTransformer} with a lambda or method reference rather than an anonymous inner class.
200      *
201      * @param transformer     Transformer to wrap.
202      * @param <ResponseT> Type of unmarshalled response POJO.
203      * @param <ReturnT>   Return type of transformer.
204      * @return New {@link ResponseTransformer} which does not close the connection afterwards.
205      */

206     static <ResponseT, ReturnT> ResponseTransformer<ResponseT, ReturnT> unmanaged(
207         ResponseTransformer<ResponseT, ReturnT> transformer) {
208         return new ResponseTransformer<ResponseT, ReturnT>() {
209             @Override
210             public ReturnT transform(ResponseT response, AbortableInputStream inputStream) throws Exception {
211                 InterruptMonitor.checkInterrupted();
212                 return transformer.transform(response, inputStream);
213             }
214
215             @Override
216             public boolean needsConnectionLeftOpen() {
217                 return true;
218             }
219         };
220
221     }
222 }
223