1 /*
2  * Copyright 2013-2019 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.cloud.aws.core.io.s3;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.File;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.net.URL;
29 import java.net.URLEncoder;
30 import java.nio.charset.StandardCharsets;
31 import java.security.MessageDigest;
32 import java.security.NoSuchAlgorithmException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CompletionService;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorCompletionService;
39 import java.util.concurrent.Future;
40
41 import com.amazonaws.regions.Region;
42 import com.amazonaws.services.s3.AmazonS3;
43 import com.amazonaws.services.s3.AmazonS3Client;
44 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
45 import com.amazonaws.services.s3.model.AmazonS3Exception;
46 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
47 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
48 import com.amazonaws.services.s3.model.GetObjectRequest;
49 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
50 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
51 import com.amazonaws.services.s3.model.ObjectMetadata;
52 import com.amazonaws.services.s3.model.PartETag;
53 import com.amazonaws.services.s3.model.UploadPartRequest;
54 import com.amazonaws.services.s3.model.UploadPartResult;
55 import com.amazonaws.util.BinaryUtils;
56
57 import org.springframework.core.io.AbstractResource;
58 import org.springframework.core.io.WritableResource;
59 import org.springframework.core.task.TaskExecutor;
60 import org.springframework.core.task.support.ExecutorServiceAdapter;
61
62 /**
63  * {@link org.springframework.core.io.Resource} implementation for
64  * {@code com.amazonaws.services.s3.model.S3Object} handles. Implements the extended
65  * {@link WritableResource} interface.
66  *
67  * @author Agim Emruli
68  * @author Alain Sahli
69  * @since 1.0
70  */

71 public class SimpleStorageResource extends AbstractResource implements WritableResource {
72
73     private final String bucketName;
74
75     private final String objectName;
76
77     private final String versionId;
78
79     private final AmazonS3 amazonS3;
80
81     private final TaskExecutor taskExecutor;
82
83     private volatile ObjectMetadata objectMetadata;
84
85     public SimpleStorageResource(AmazonS3 amazonS3, String bucketName, String objectName,
86             TaskExecutor taskExecutor) {
87         this(amazonS3, bucketName, objectName, taskExecutor, null);
88     }
89
90     public SimpleStorageResource(AmazonS3 amazonS3, String bucketName, String objectName,
91             TaskExecutor taskExecutor, String versionId) {
92         this.amazonS3 = AmazonS3ProxyFactory.createProxy(amazonS3);
93         this.bucketName = bucketName;
94         this.objectName = objectName;
95         this.taskExecutor = taskExecutor;
96         this.versionId = versionId;
97     }
98
99     @Override
100     public String getDescription() {
101         StringBuilder builder = new StringBuilder("Amazon s3 resource [bucket='");
102         builder.append(this.bucketName);
103         builder.append("' and object='");
104         builder.append(this.objectName);
105         if (this.versionId != null) {
106             builder.append("' and versionId='");
107             builder.append(this.versionId);
108         }
109         builder.append("']");
110         return builder.toString();
111     }
112
113     @Override
114     public InputStream getInputStream() throws IOException {
115         GetObjectRequest getObjectRequest = new GetObjectRequest(this.bucketName,
116                 this.objectName);
117         if (this.versionId != null) {
118             getObjectRequest.setVersionId(this.versionId);
119         }
120         return this.amazonS3.getObject(getObjectRequest).getObjectContent();
121     }
122
123     @Override
124     public boolean exists() {
125         return getObjectMetadata() != null;
126     }
127
128     @Override
129     public long contentLength() throws IOException {
130         return getRequiredObjectMetadata().getContentLength();
131     }
132
133     @Override
134     public long lastModified() throws IOException {
135         return getRequiredObjectMetadata().getLastModified().getTime();
136     }
137
138     @Override
139     public String getFilename() throws IllegalStateException {
140         return this.objectName;
141     }
142
143     @Override
144     public URL getURL() throws IOException {
145         Region region = this.amazonS3.getRegion().toAWSRegion();
146         String encodedObjectName = URLEncoder.encode(this.objectName,
147                 StandardCharsets.UTF_8.toString());
148         return new URL("https", region.getServiceEndpoint(AmazonS3Client.S3_SERVICE_NAME),
149                 "/" + this.bucketName + "/" + encodedObjectName);
150     }
151
152     public URI getS3Uri() {
153         try {
154             return new URI("s3""//" + this.bucketName + "/" + this.objectName, null);
155         }
156         catch (URISyntaxException e) {
157             throw new RuntimeException("Failed to resolve s3:// uri", e);
158         }
159     }
160
161     @Override
162     public File getFile() throws IOException {
163         throw new UnsupportedOperationException(
164                 "Amazon S3 resource can not be resolved to java.io.File objects.Use "
165                         + "getInputStream() to retrieve the contents of the object!");
166     }
167
168     private ObjectMetadata getRequiredObjectMetadata() throws FileNotFoundException {
169         ObjectMetadata metadata = getObjectMetadata();
170         if (metadata == null) {
171             StringBuilder builder = new StringBuilder().append("Resource with bucket='")
172                     .append(this.bucketName).append("' and objectName='")
173                     .append(this.objectName);
174             if (this.versionId != null) {
175                 builder.append("' and versionId='");
176                 builder.append(this.versionId);
177             }
178             builder.append("' not found!");
179
180             throw new FileNotFoundException(builder.toString());
181         }
182         return metadata;
183     }
184
185     @Override
186     public boolean isWritable() {
187         return true;
188     }
189
190     @Override
191     public OutputStream getOutputStream() throws IOException {
192         return new SimpleStorageOutputStream();
193     }
194
195     @Override
196     public SimpleStorageResource createRelative(String relativePath) throws IOException {
197         String relativeKey = this.objectName + "/" + relativePath;
198         return new SimpleStorageResource(this.amazonS3, this.bucketName, relativeKey,
199                 this.taskExecutor);
200     }
201
202     public AmazonS3 getAmazonS3() {
203         return amazonS3;
204     }
205
206     private ObjectMetadata getObjectMetadata() {
207         if (this.objectMetadata == null) {
208             try {
209                 GetObjectMetadataRequest metadataRequest = new GetObjectMetadataRequest(
210                         this.bucketName, this.objectName);
211                 if (this.versionId != null) {
212                     metadataRequest.setVersionId(this.versionId);
213                 }
214                 this.objectMetadata = this.amazonS3.getObjectMetadata(metadataRequest);
215             }
216             catch (AmazonS3Exception e) {
217                 // Catch 404 (object not found) and 301 (bucket not found, moved
218                 // permanently)
219                 if (e.getStatusCode() == 404 || e.getStatusCode() == 301) {
220                     this.objectMetadata = null;
221                 }
222                 else {
223                     throw e;
224                 }
225             }
226         }
227         return this.objectMetadata;
228     }
229
230     private class SimpleStorageOutputStream extends OutputStream {
231
232         // The minimum size for a multi part is 5 MB, hence the buffer size of 5 MB
233         private static final int BUFFER_SIZE = 1024 * 1024 * 5;
234
235         private final Object monitor = new Object();
236
237         private final CompletionService<UploadPartResult> completionService;
238
239         @SuppressWarnings("FieldMayBeFinal")
240         private ByteArrayOutputStream currentOutputStream = new ByteArrayOutputStream(
241                 BUFFER_SIZE);
242
243         private int partNumberCounter = 1;
244
245         private InitiateMultipartUploadResult multiPartUploadResult;
246
247         SimpleStorageOutputStream() {
248             this.completionService = new ExecutorCompletionService<>(
249                     new ExecutorServiceAdapter(SimpleStorageResource.this.taskExecutor));
250         }
251
252         @Override
253         public void write(int b) throws IOException {
254             synchronized (this.monitor) {
255                 if (this.currentOutputStream.size() == BUFFER_SIZE) {
256                     initiateMultiPartIfNeeded();
257                     this.completionService.submit(new UploadPartResultCallable(
258                             SimpleStorageResource.this.amazonS3,
259                             this.currentOutputStream.toByteArray(),
260                             this.currentOutputStream.size(),
261                             SimpleStorageResource.this.bucketName,
262                             SimpleStorageResource.this.objectName,
263                             this.multiPartUploadResult.getUploadId(),
264                             this.partNumberCounter++, false));
265                     this.currentOutputStream.reset();
266                 }
267                 this.currentOutputStream.write(b);
268             }
269         }
270
271         @Override
272         public void close() throws IOException {
273             synchronized (this.monitor) {
274                 if (this.currentOutputStream == null) {
275                     return;
276                 }
277
278                 if (isMultiPartUpload()) {
279                     finishMultiPartUpload();
280                 }
281                 else {
282                     finishSimpleUpload();
283                 }
284             }
285         }
286
287         private boolean isMultiPartUpload() {
288             return this.multiPartUploadResult != null;
289         }
290
291         private void finishSimpleUpload() {
292             ObjectMetadata objectMetadata = new ObjectMetadata();
293             objectMetadata.setContentLength(this.currentOutputStream.size());
294
295             byte[] content = this.currentOutputStream.toByteArray();
296             try {
297                 MessageDigest messageDigest = MessageDigest.getInstance("MD5");
298                 String md5Digest = BinaryUtils.toBase64(messageDigest.digest(content));
299                 objectMetadata.setContentMD5(md5Digest);
300             }
301             catch (NoSuchAlgorithmException e) {
302                 throw new IllegalStateException(
303                         "MessageDigest could not be initialized because it uses an unknown algorithm",
304                         e);
305             }
306
307             SimpleStorageResource.this.amazonS3.putObject(
308                     SimpleStorageResource.this.bucketName,
309                     SimpleStorageResource.this.objectName,
310                     new ByteArrayInputStream(content), objectMetadata);
311
312             // Release the memory early
313             this.currentOutputStream = null;
314         }
315
316         private void finishMultiPartUpload() throws IOException {
317             this.completionService.submit(
318                     new UploadPartResultCallable(SimpleStorageResource.this.amazonS3,
319                             this.currentOutputStream.toByteArray(),
320                             this.currentOutputStream.size(),
321                             SimpleStorageResource.this.bucketName,
322                             SimpleStorageResource.this.objectName,
323                             this.multiPartUploadResult.getUploadId(),
324                             this.partNumberCounter, true));
325             try {
326                 List<PartETag> partETags = getMultiPartsUploadResults();
327                 SimpleStorageResource.this.amazonS3
328                         .completeMultipartUpload(new CompleteMultipartUploadRequest(
329                                 this.multiPartUploadResult.getBucketName(),
330                                 this.multiPartUploadResult.getKey(),
331                                 this.multiPartUploadResult.getUploadId(), partETags));
332             }
333             catch (ExecutionException e) {
334                 abortMultiPartUpload();
335                 throw new IOException("Multi part upload failed ", e.getCause());
336             }
337             catch (InterruptedException e) {
338                 abortMultiPartUpload();
339                 Thread.currentThread().interrupt();
340             }
341             finally {
342                 this.currentOutputStream = null;
343             }
344         }
345
346         private void initiateMultiPartIfNeeded() {
347             if (this.multiPartUploadResult == null) {
348                 this.multiPartUploadResult = SimpleStorageResource.this.amazonS3
349                         .initiateMultipartUpload(new InitiateMultipartUploadRequest(
350                                 SimpleStorageResource.this.bucketName,
351                                 SimpleStorageResource.this.objectName));
352             }
353         }
354
355         private void abortMultiPartUpload() {
356             if (isMultiPartUpload()) {
357                 SimpleStorageResource.this.amazonS3
358                         .abortMultipartUpload(new AbortMultipartUploadRequest(
359                                 this.multiPartUploadResult.getBucketName(),
360                                 this.multiPartUploadResult.getKey(),
361                                 this.multiPartUploadResult.getUploadId()));
362             }
363         }
364
365         private List<PartETag> getMultiPartsUploadResults()
366                 throws ExecutionException, InterruptedException {
367             List<PartETag> result = new ArrayList<>(this.partNumberCounter);
368             for (int i = 0; i < this.partNumberCounter; i++) {
369                 Future<UploadPartResult> uploadPartResultFuture = this.completionService
370                         .take();
371                 result.add(uploadPartResultFuture.get().getPartETag());
372             }
373             return result;
374         }
375
376         private final class UploadPartResultCallable
377                 implements Callable<UploadPartResult> {
378
379             private final AmazonS3 amazonS3;
380
381             private final int contentLength;
382
383             private final int partNumber;
384
385             private final boolean last;
386
387             private final String bucketName;
388
389             private final String key;
390
391             private final String uploadId;
392
393             @SuppressWarnings("FieldMayBeFinal")
394             private byte[] content;
395
396             private UploadPartResultCallable(AmazonS3 amazon, byte[] content,
397                     int writtenDataSize, String bucketName, String key, String uploadId,
398                     int partNumber, boolean last) {
399                 this.amazonS3 = amazon;
400                 this.content = content;
401                 this.contentLength = writtenDataSize;
402                 this.partNumber = partNumber;
403                 this.last = last;
404                 this.bucketName = bucketName;
405                 this.key = key;
406                 this.uploadId = uploadId;
407             }
408
409             @Override
410             public UploadPartResult call() throws Exception {
411                 try {
412                     return this.amazonS3.uploadPart(new UploadPartRequest()
413                             .withBucketName(this.bucketName).withKey(this.key)
414                             .withUploadId(this.uploadId)
415                             .withInputStream(new ByteArrayInputStream(this.content))
416                             .withPartNumber(this.partNumber).withLastPart(this.last)
417                             .withPartSize(this.contentLength));
418                 }
419                 finally {
420                     // Release the memory, as the callable may still live inside the
421                     // CompletionService which would cause
422                     // an exhaustive memory usage
423                     this.content = null;
424                 }
425             }
426
427         }
428
429     }
430
431 }
432