1 /*
2  * Copyright 2014-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */

15 package com.amazonaws.event;
16
17 import com.amazonaws.AmazonWebServiceRequest;
18 import com.amazonaws.annotation.NotThreadSafe;
19 import com.amazonaws.annotation.SdkInternalApi;
20 import com.amazonaws.internal.SdkFilterInputStream;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 /**
26  * Used for input stream progress tracking purposes.
27  */

28 @NotThreadSafe
29 public abstract class ProgressInputStream extends SdkFilterInputStream {
30     /**
31      * Returns an input stream for request progress tracking purposes. If request/response progress
32      * tracking is not enabled, this method simply return the given input stream as is.
33      *
34      * @param is the request content input stream
35      * @deprecated
36      */

37     @Deprecated
38     public static InputStream inputStreamForRequest(InputStream is,
39             AmazonWebServiceRequest req) {
40         return req == null
41              ? is
42              : inputStreamForRequest(is, req.getGeneralProgressListener());
43     }
44
45     /**
46      * @param is               the request content input stream
47      * @param progressListener Optional progress listener
48      * @return If the progress listener is non null returns a new input stream decorated with
49      * progress reporting functionality. If progress listener is null it returns the same input
50      * stream.
51      */

52     @SdkInternalApi
53     public static InputStream inputStreamForRequest(InputStream is, ProgressListener progressListener) {
54         return progressListener == null
55                 ? is
56                 : new RequestProgressInputStream(is, progressListener);
57     }
58
59     /**
60      * Returns an input stream for response progress tracking purposes. If
61      * request/response progress tracking is not enabled, this method simply
62      * return the given input stream as is.
63      * 
64      * @param is the response content input stream
65      */

66     public static InputStream inputStreamForResponse(InputStream is, AmazonWebServiceRequest req) {
67         return req == null
68              ? is
69              : new ResponseProgressInputStream(is, req.getGeneralProgressListener());
70     }
71
72     /**
73      * Returns an input stream for response progress tracking purposes. If request/response progress tracking is not enabled, this
74      * method simply return the given input stream as is.
75      *
76      * @param is               the response content input stream
77      * @param progressListener Optional progress listener
78      * @return If the progress listener is non null returns a new input stream decorated with progress reporting functionality. If
79      * progress listener is null it returns the same input stream.
80      */

81     public static InputStream inputStreamForResponse(InputStream is, ProgressListener progressListener) {
82         return progressListener == null
83                 ? is
84                 : new ResponseProgressInputStream(is, progressListener);
85     }
86
87     /** The threshold of bytes between notifications. */
88     private static final int DEFAULT_NOTIFICATION_THRESHOLD = 8 * 1024;
89
90     private final ProgressListener listener;
91     private final int notifyThresHold;
92     /** The number of bytes read that the listener hasn't been notified about yet. */
93     private int unnotifiedByteCount;
94     private boolean hasBeenRead;
95     private boolean doneEOF;
96     private long notifiedByteCount;
97
98     public ProgressInputStream(InputStream is, ProgressListener listener) {
99         this(is, listener, DEFAULT_NOTIFICATION_THRESHOLD);
100     }
101
102     public ProgressInputStream(InputStream is, ProgressListener listener, int notifyThresHold) {
103         super(is);
104         if (is == null || listener == null)
105             throw new IllegalArgumentException();
106         this.notifyThresHold = notifyThresHold;
107         this.listener = listener;
108     }
109
110     /**
111      * The read method is called for the very first time.
112      * Defaults to do nothing.
113      */

114     protected void onFirstRead() {}
115     /**
116      * An end-of-file event is to be notified.
117      * Defaults to do nothing.
118      */

119     protected void onEOF() {}
120
121     /**
122      * Defaults to behave the same as {@link #onEOF()}.
123      */

124     protected void onClose() {
125         eof();
126     }
127     /**
128      * A reset event is to be notified.  Default to do nothing.
129      */

130     protected void onReset() {}
131     /**
132      * Upon notification of the number of bytes transferred since last
133      * notification.  Default to do nothing.
134      */

135     protected void onNotifyBytesRead() {}
136
137     /**
138      * Upon reading the given number of bytes.
139      * The default behavior is to accumulate the byte count and only fire off
140      * a notification by invoking {@link #onNotifyBytesRead()} if the count
141      * has exceeded the threshold.
142      */

143     private void onBytesRead(int bytesRead) {
144         unnotifiedByteCount += bytesRead;
145         if (unnotifiedByteCount >= notifyThresHold) {
146             onNotifyBytesRead();
147             notifiedByteCount += unnotifiedByteCount;
148             unnotifiedByteCount = 0;
149         }
150     }
151
152     @Override
153     public int read() throws IOException {
154         if (!hasBeenRead) {
155             onFirstRead();
156             hasBeenRead = true;
157         }
158         int ch = super.read();
159         if (ch == -1)
160             eof();
161         else
162             onBytesRead(1);
163         return ch;
164     }
165
166     @Override
167     public void reset() throws IOException {
168         super.reset();
169         onReset();
170         unnotifiedByteCount = 0;
171         notifiedByteCount = 0;
172     }
173
174     @Override
175     public int read(byte[] b, int off, int len) throws IOException {
176         if (!hasBeenRead) {
177             onFirstRead();
178             hasBeenRead = true;
179         }
180         int bytesRead = super.read(b, off, len);
181         if (bytesRead == -1)
182             eof();
183         else
184             onBytesRead(bytesRead);
185         return bytesRead;
186     }
187
188     private void eof() {
189         if (doneEOF)
190             return;
191         onEOF();
192         unnotifiedByteCount = 0;
193         doneEOF = true;
194     }
195
196     public final InputStream getWrappedInputStream() {
197         return in;
198     }
199
200     protected final int getUnnotifiedByteCount() {
201         return unnotifiedByteCount;
202     }
203
204     protected final long getNotifiedByteCount() {
205         return notifiedByteCount;
206     }
207
208     @Override
209     public void close() throws IOException {
210         onClose(); // report any left over bytes not yet reported
211         super.close();
212     }
213
214     public final ProgressListener getListener() {
215         return listener;
216     }
217 }
218