1
16
17 package org.springframework.cloud.aws.core.io.s3;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.File;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.net.URL;
29 import java.net.URLEncoder;
30 import java.nio.charset.StandardCharsets;
31 import java.security.MessageDigest;
32 import java.security.NoSuchAlgorithmException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CompletionService;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorCompletionService;
39 import java.util.concurrent.Future;
40
41 import com.amazonaws.regions.Region;
42 import com.amazonaws.services.s3.AmazonS3;
43 import com.amazonaws.services.s3.AmazonS3Client;
44 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
45 import com.amazonaws.services.s3.model.AmazonS3Exception;
46 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
47 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
48 import com.amazonaws.services.s3.model.GetObjectRequest;
49 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
50 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
51 import com.amazonaws.services.s3.model.ObjectMetadata;
52 import com.amazonaws.services.s3.model.PartETag;
53 import com.amazonaws.services.s3.model.UploadPartRequest;
54 import com.amazonaws.services.s3.model.UploadPartResult;
55 import com.amazonaws.util.BinaryUtils;
56
57 import org.springframework.core.io.AbstractResource;
58 import org.springframework.core.io.WritableResource;
59 import org.springframework.core.task.TaskExecutor;
60 import org.springframework.core.task.support.ExecutorServiceAdapter;
61
62
71 public class SimpleStorageResource extends AbstractResource implements WritableResource {
72
73 private final String bucketName;
74
75 private final String objectName;
76
77 private final String versionId;
78
79 private final AmazonS3 amazonS3;
80
81 private final TaskExecutor taskExecutor;
82
83 private volatile ObjectMetadata objectMetadata;
84
85 public SimpleStorageResource(AmazonS3 amazonS3, String bucketName, String objectName,
86 TaskExecutor taskExecutor) {
87 this(amazonS3, bucketName, objectName, taskExecutor, null);
88 }
89
90 public SimpleStorageResource(AmazonS3 amazonS3, String bucketName, String objectName,
91 TaskExecutor taskExecutor, String versionId) {
92 this.amazonS3 = AmazonS3ProxyFactory.createProxy(amazonS3);
93 this.bucketName = bucketName;
94 this.objectName = objectName;
95 this.taskExecutor = taskExecutor;
96 this.versionId = versionId;
97 }
98
99 @Override
100 public String getDescription() {
101 StringBuilder builder = new StringBuilder("Amazon s3 resource [bucket='");
102 builder.append(this.bucketName);
103 builder.append("' and object='");
104 builder.append(this.objectName);
105 if (this.versionId != null) {
106 builder.append("' and versionId='");
107 builder.append(this.versionId);
108 }
109 builder.append("']");
110 return builder.toString();
111 }
112
113 @Override
114 public InputStream getInputStream() throws IOException {
115 GetObjectRequest getObjectRequest = new GetObjectRequest(this.bucketName,
116 this.objectName);
117 if (this.versionId != null) {
118 getObjectRequest.setVersionId(this.versionId);
119 }
120 return this.amazonS3.getObject(getObjectRequest).getObjectContent();
121 }
122
123 @Override
124 public boolean exists() {
125 return getObjectMetadata() != null;
126 }
127
128 @Override
129 public long contentLength() throws IOException {
130 return getRequiredObjectMetadata().getContentLength();
131 }
132
133 @Override
134 public long lastModified() throws IOException {
135 return getRequiredObjectMetadata().getLastModified().getTime();
136 }
137
138 @Override
139 public String getFilename() throws IllegalStateException {
140 return this.objectName;
141 }
142
143 @Override
144 public URL getURL() throws IOException {
145 Region region = this.amazonS3.getRegion().toAWSRegion();
146 String encodedObjectName = URLEncoder.encode(this.objectName,
147 StandardCharsets.UTF_8.toString());
148 return new URL("https", region.getServiceEndpoint(AmazonS3Client.S3_SERVICE_NAME),
149 "/" + this.bucketName + "/" + encodedObjectName);
150 }
151
152 public URI getS3Uri() {
153 try {
154 return new URI("s3", " + this.bucketName + "/" + this.objectName, null);
155 }
156 catch (URISyntaxException e) {
157 throw new RuntimeException("Failed to resolve s3:, e);
158 }
159 }
160
161 @Override
162 public File getFile() throws IOException {
163 throw new UnsupportedOperationException(
164 "Amazon S3 resource can not be resolved to java.io.File objects.Use "
165 + "getInputStream() to retrieve the contents of the object!");
166 }
167
168 private ObjectMetadata getRequiredObjectMetadata() throws FileNotFoundException {
169 ObjectMetadata metadata = getObjectMetadata();
170 if (metadata == null) {
171 StringBuilder builder = new StringBuilder().append("Resource with bucket='")
172 .append(this.bucketName).append("' and objectName='")
173 .append(this.objectName);
174 if (this.versionId != null) {
175 builder.append("' and versionId='");
176 builder.append(this.versionId);
177 }
178 builder.append("' not found!");
179
180 throw new FileNotFoundException(builder.toString());
181 }
182 return metadata;
183 }
184
185 @Override
186 public boolean isWritable() {
187 return true;
188 }
189
190 @Override
191 public OutputStream getOutputStream() throws IOException {
192 return new SimpleStorageOutputStream();
193 }
194
195 @Override
196 public SimpleStorageResource createRelative(String relativePath) throws IOException {
197 String relativeKey = this.objectName + "/" + relativePath;
198 return new SimpleStorageResource(this.amazonS3, this.bucketName, relativeKey,
199 this.taskExecutor);
200 }
201
202 public AmazonS3 getAmazonS3() {
203 return amazonS3;
204 }
205
206 private ObjectMetadata getObjectMetadata() {
207 if (this.objectMetadata == null) {
208 try {
209 GetObjectMetadataRequest metadataRequest = new GetObjectMetadataRequest(
210 this.bucketName, this.objectName);
211 if (this.versionId != null) {
212 metadataRequest.setVersionId(this.versionId);
213 }
214 this.objectMetadata = this.amazonS3.getObjectMetadata(metadataRequest);
215 }
216 catch (AmazonS3Exception e) {
217
218
219 if (e.getStatusCode() == 404 || e.getStatusCode() == 301) {
220 this.objectMetadata = null;
221 }
222 else {
223 throw e;
224 }
225 }
226 }
227 return this.objectMetadata;
228 }
229
230 private class SimpleStorageOutputStream extends OutputStream {
231
232
233 private static final int BUFFER_SIZE = 1024 * 1024 * 5;
234
235 private final Object monitor = new Object();
236
237 private final CompletionService<UploadPartResult> completionService;
238
239 @SuppressWarnings("FieldMayBeFinal")
240 private ByteArrayOutputStream currentOutputStream = new ByteArrayOutputStream(
241 BUFFER_SIZE);
242
243 private int partNumberCounter = 1;
244
245 private InitiateMultipartUploadResult multiPartUploadResult;
246
247 SimpleStorageOutputStream() {
248 this.completionService = new ExecutorCompletionService<>(
249 new ExecutorServiceAdapter(SimpleStorageResource.this.taskExecutor));
250 }
251
252 @Override
253 public void write(int b) throws IOException {
254 synchronized (this.monitor) {
255 if (this.currentOutputStream.size() == BUFFER_SIZE) {
256 initiateMultiPartIfNeeded();
257 this.completionService.submit(new UploadPartResultCallable(
258 SimpleStorageResource.this.amazonS3,
259 this.currentOutputStream.toByteArray(),
260 this.currentOutputStream.size(),
261 SimpleStorageResource.this.bucketName,
262 SimpleStorageResource.this.objectName,
263 this.multiPartUploadResult.getUploadId(),
264 this.partNumberCounter++, false));
265 this.currentOutputStream.reset();
266 }
267 this.currentOutputStream.write(b);
268 }
269 }
270
271 @Override
272 public void close() throws IOException {
273 synchronized (this.monitor) {
274 if (this.currentOutputStream == null) {
275 return;
276 }
277
278 if (isMultiPartUpload()) {
279 finishMultiPartUpload();
280 }
281 else {
282 finishSimpleUpload();
283 }
284 }
285 }
286
287 private boolean isMultiPartUpload() {
288 return this.multiPartUploadResult != null;
289 }
290
291 private void finishSimpleUpload() {
292 ObjectMetadata objectMetadata = new ObjectMetadata();
293 objectMetadata.setContentLength(this.currentOutputStream.size());
294
295 byte[] content = this.currentOutputStream.toByteArray();
296 try {
297 MessageDigest messageDigest = MessageDigest.getInstance("MD5");
298 String md5Digest = BinaryUtils.toBase64(messageDigest.digest(content));
299 objectMetadata.setContentMD5(md5Digest);
300 }
301 catch (NoSuchAlgorithmException e) {
302 throw new IllegalStateException(
303 "MessageDigest could not be initialized because it uses an unknown algorithm",
304 e);
305 }
306
307 SimpleStorageResource.this.amazonS3.putObject(
308 SimpleStorageResource.this.bucketName,
309 SimpleStorageResource.this.objectName,
310 new ByteArrayInputStream(content), objectMetadata);
311
312
313 this.currentOutputStream = null;
314 }
315
316 private void finishMultiPartUpload() throws IOException {
317 this.completionService.submit(
318 new UploadPartResultCallable(SimpleStorageResource.this.amazonS3,
319 this.currentOutputStream.toByteArray(),
320 this.currentOutputStream.size(),
321 SimpleStorageResource.this.bucketName,
322 SimpleStorageResource.this.objectName,
323 this.multiPartUploadResult.getUploadId(),
324 this.partNumberCounter, true));
325 try {
326 List<PartETag> partETags = getMultiPartsUploadResults();
327 SimpleStorageResource.this.amazonS3
328 .completeMultipartUpload(new CompleteMultipartUploadRequest(
329 this.multiPartUploadResult.getBucketName(),
330 this.multiPartUploadResult.getKey(),
331 this.multiPartUploadResult.getUploadId(), partETags));
332 }
333 catch (ExecutionException e) {
334 abortMultiPartUpload();
335 throw new IOException("Multi part upload failed ", e.getCause());
336 }
337 catch (InterruptedException e) {
338 abortMultiPartUpload();
339 Thread.currentThread().interrupt();
340 }
341 finally {
342 this.currentOutputStream = null;
343 }
344 }
345
346 private void initiateMultiPartIfNeeded() {
347 if (this.multiPartUploadResult == null) {
348 this.multiPartUploadResult = SimpleStorageResource.this.amazonS3
349 .initiateMultipartUpload(new InitiateMultipartUploadRequest(
350 SimpleStorageResource.this.bucketName,
351 SimpleStorageResource.this.objectName));
352 }
353 }
354
355 private void abortMultiPartUpload() {
356 if (isMultiPartUpload()) {
357 SimpleStorageResource.this.amazonS3
358 .abortMultipartUpload(new AbortMultipartUploadRequest(
359 this.multiPartUploadResult.getBucketName(),
360 this.multiPartUploadResult.getKey(),
361 this.multiPartUploadResult.getUploadId()));
362 }
363 }
364
365 private List<PartETag> getMultiPartsUploadResults()
366 throws ExecutionException, InterruptedException {
367 List<PartETag> result = new ArrayList<>(this.partNumberCounter);
368 for (int i = 0; i < this.partNumberCounter; i++) {
369 Future<UploadPartResult> uploadPartResultFuture = this.completionService
370 .take();
371 result.add(uploadPartResultFuture.get().getPartETag());
372 }
373 return result;
374 }
375
376 private final class UploadPartResultCallable
377 implements Callable<UploadPartResult> {
378
379 private final AmazonS3 amazonS3;
380
381 private final int contentLength;
382
383 private final int partNumber;
384
385 private final boolean last;
386
387 private final String bucketName;
388
389 private final String key;
390
391 private final String uploadId;
392
393 @SuppressWarnings("FieldMayBeFinal")
394 private byte[] content;
395
396 private UploadPartResultCallable(AmazonS3 amazon, byte[] content,
397 int writtenDataSize, String bucketName, String key, String uploadId,
398 int partNumber, boolean last) {
399 this.amazonS3 = amazon;
400 this.content = content;
401 this.contentLength = writtenDataSize;
402 this.partNumber = partNumber;
403 this.last = last;
404 this.bucketName = bucketName;
405 this.key = key;
406 this.uploadId = uploadId;
407 }
408
409 @Override
410 public UploadPartResult call() throws Exception {
411 try {
412 return this.amazonS3.uploadPart(new UploadPartRequest()
413 .withBucketName(this.bucketName).withKey(this.key)
414 .withUploadId(this.uploadId)
415 .withInputStream(new ByteArrayInputStream(this.content))
416 .withPartNumber(this.partNumber).withLastPart(this.last)
417 .withPartSize(this.contentLength));
418 }
419 finally {
420
421
422
423 this.content = null;
424 }
425 }
426
427 }
428
429 }
430
431 }
432