1 /*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16 package software.amazon.awssdk.core.sync;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.ByteBuffer;
23 import java.nio.file.DirectoryNotEmptyException;
24 import java.nio.file.FileAlreadyExistsException;
25 import java.nio.file.Files;
26 import java.nio.file.Path;
27 import software.amazon.awssdk.annotations.SdkPublicApi;
28 import software.amazon.awssdk.core.ResponseBytes;
29 import software.amazon.awssdk.core.ResponseInputStream;
30 import software.amazon.awssdk.core.exception.RetryableException;
31 import software.amazon.awssdk.core.exception.SdkClientException;
32 import software.amazon.awssdk.core.exception.SdkException;
33 import software.amazon.awssdk.core.internal.http.InterruptMonitor;
34 import software.amazon.awssdk.core.retry.RetryPolicy;
35 import software.amazon.awssdk.http.AbortableInputStream;
36 import software.amazon.awssdk.utils.IoUtils;
37 import software.amazon.awssdk.utils.Logger;
38
39 /**
40 * Interface for processing a streaming response from a service in a synchronous fashion. This interfaces gives
41 * access to the unmarshalled response POJO which may contain metadata about the streamed contents. It also provides access
42 * to the content via an {@link AbortableInputStream}. Callers do not need to worry about calling {@link InputStream#close()}
43 * on the content, but if they wish to stop reading data from the stream {@link AbortableInputStream#abort()} may be called
44 * to kill the underlying HTTP connection. This is generally not recommended and should only be done when the cost of reading
45 * the rest of the data exceeds the cost of establishing a new connection. If callers do not call abort and do not read all
46 * of the data in the stream, then the content will be drained by the SDK and the underlying HTTP connection will be returned to
47 * the connection pool (if applicable).
48 * <h3>Retries</h3>
49 * <p>
50 * Exceptions thrown from the transformer's {@link #transform(Object, AbortableInputStream)} method are not automatically retried
51 * by the RetryPolicy of the client. Since we can't know if a transformer implementation is idempotent or safe to retry, if you
52 * wish to retry on the event of a failure you must throw a {@link SdkException} with retryable set to true from the transformer.
53 * This exception can wrap the original exception that was thrown. Note that throwing a {@link
54 * SdkException} that is marked retryable from the transformer does not guarantee the request will be retried,
55 * retries are still limited by the max retry attempts and retry throttling
56 * feature of the {@link RetryPolicy}.
57 *
58 * <h3>Thread Interrupts</h3>
59 * <p>
60 * Implementations should have proper handling of Thread interrupts. For long running, non-interruptible tasks, it is recommended
61 * to check the thread interrupt status periodically and throw an {@link InterruptedException} if set. When an {@link
62 * InterruptedException} is thrown from a interruptible task, you should either re-interrupt the current thread and return or
63 * throw that {@link InterruptedException} from the {@link #transform(Object, AbortableInputStream)} method. Failure to do these
64 * things may prevent the SDK from stopping the request in a timely manner in the event the thread is interrupted externally.
65 *
66 * @param <ResponseT> Type of unmarshalled POJO response.
67 * @param <ReturnT> Return type of the {@link #transform(Object, AbortableInputStream)} method. Implementations are free to
68 * perform whatever transformations are appropriate.
69 */
70 @FunctionalInterface
71 @SdkPublicApi
72 public interface ResponseTransformer<ResponseT, ReturnT> {
73 /**
74 * Process the response contents.
75 *
76 * @param response Unmarshalled POJO response
77 * @param inputStream Input stream of streamed data.
78 * @return Transformed type.
79 * @throws Exception if any error occurs during processing of the response. This will be re-thrown by the SDK, possibly
80 * wrapped in an {@link SdkClientException}.
81 */
82 ReturnT transform(ResponseT response, AbortableInputStream inputStream) throws Exception;
83
84 /**
85 * Hook to allow connection to be left open after the SDK returns a response. Useful for returning the InputStream to
86 * the response content from the transformer.
87 *
88 * @return True if connection (and InputStream) should be left open after the SDK returns a response, false otherwise.
89 */
90 default boolean needsConnectionLeftOpen() {
91 return false;
92 }
93
94 /**
95 * Creates a response transformer that writes all response content to the specified file. If the file already exists
96 * then a {@link java.nio.file.FileAlreadyExistsException} will be thrown.
97 *
98 * @param path Path to file to write to.
99 * @param <ResponseT> Type of unmarshalled response POJO.
100 * @return ResponseTransformer instance.
101 */
102 static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toFile(Path path) {
103 return (resp, in) -> {
104 try {
105 InterruptMonitor.checkInterrupted();
106 Files.copy(in, path);
107 return resp;
108 } catch (IOException copyException) {
109 String copyError = "Failed to read response into file: " + path;
110
111 // If the write failed because of the state of the file, don't retry the request.
112 if (copyException instanceof FileAlreadyExistsException || copyException instanceof DirectoryNotEmptyException) {
113 throw new IOException(copyError, copyException);
114 }
115
116 // Try to clean up the file so that we can retry the request. If we can't delete it, don't retry the request.
117 try {
118 Files.deleteIfExists(path);
119 } catch (IOException deletionException) {
120 Logger.loggerFor(ResponseTransformer.class)
121 .error(() -> "Failed to delete destination file '" + path +
122 "' after reading the service response " +
123 "failed.", deletionException);
124
125 throw new IOException(copyError + ". Additionally, the file could not be cleaned up (" +
126 deletionException.getMessage() + "), so the request will not be retried.",
127 copyException);
128 }
129
130 // Retry the request
131 throw RetryableException.builder().message(copyError).cause(copyException).build();
132 }
133 };
134 }
135
136 /**
137 * Creates a response transformer that writes all response content to the specified file. If the file already exists
138 * then a {@link java.nio.file.FileAlreadyExistsException} will be thrown.
139 *
140 * @param file File to write to.
141 * @param <ResponseT> Type of unmarshalled response POJO.
142 * @return ResponseTransformer instance.
143 */
144 static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toFile(File file) {
145 return toFile(file.toPath());
146 }
147
148 /**
149 * Creates a response transformer that writes all response content to the given {@link OutputStream}. Note that
150 * the {@link OutputStream} is not closed or flushed after writing.
151 *
152 * @param outputStream Output stream to write data to.
153 * @param <ResponseT> Type of unmarshalled response POJO.
154 * @return ResponseTransformer instance.
155 */
156 static <ResponseT> ResponseTransformer<ResponseT, ResponseT> toOutputStream(OutputStream outputStream) {
157 return (resp, in) -> {
158 InterruptMonitor.checkInterrupted();
159 IoUtils.copy(in, outputStream);
160 return resp;
161 };
162 }
163
164 /**
165 * Creates a response transformer that loads all response content into memory, exposed as {@link ResponseBytes}. This allows
166 * for conversion into a {@link String}, {@link ByteBuffer}, etc.
167 *
168 * @param <ResponseT> Type of unmarshalled response POJO.
169 * @return The streaming response transformer that can be used on the client streaming method.
170 */
171 static <ResponseT> ResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() {
172 return (response, inputStream) -> {
173 try {
174 InterruptMonitor.checkInterrupted();
175 return ResponseBytes.fromByteArray(response, IoUtils.toByteArray(inputStream));
176 } catch (IOException e) {
177 throw RetryableException.builder().message("Failed to read response.").cause(e).build();
178 }
179 };
180 }
181
182 /**
183 * Creates a response transformer that returns an unmanaged input stream with the response content. This input stream must
184 * be explicitly closed to release the connection. The unmarshalled response object can be obtained via the {@link
185 * ResponseInputStream#response} method.
186 * <p>
187 * Note that the returned stream is not subject to the retry policy or timeout settings (except for socket timeout)
188 * of the client. No retries will be performed in the event of a socket read failure or connection reset.
189 *
190 * @param <ResponseT> Type of unmarshalled response POJO.
191 * @return ResponseTransformer instance.
192 */
193 static <ResponseT> ResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toInputStream() {
194 return unmanaged(ResponseInputStream::new);
195 }
196
197 /**
198 * Static helper method to create a response transformer that allows the connection to be left open. Useful for creating a
199 * {@link ResponseTransformer} with a lambda or method reference rather than an anonymous inner class.
200 *
201 * @param transformer Transformer to wrap.
202 * @param <ResponseT> Type of unmarshalled response POJO.
203 * @param <ReturnT> Return type of transformer.
204 * @return New {@link ResponseTransformer} which does not close the connection afterwards.
205 */
206 static <ResponseT, ReturnT> ResponseTransformer<ResponseT, ReturnT> unmanaged(
207 ResponseTransformer<ResponseT, ReturnT> transformer) {
208 return new ResponseTransformer<ResponseT, ReturnT>() {
209 @Override
210 public ReturnT transform(ResponseT response, AbortableInputStream inputStream) throws Exception {
211 InterruptMonitor.checkInterrupted();
212 return transformer.transform(response, inputStream);
213 }
214
215 @Override
216 public boolean needsConnectionLeftOpen() {
217 return true;
218 }
219 };
220
221 }
222 }
223