1 /*
2 * Copyright 2014-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15 package com.amazonaws.event;
16
17 import com.amazonaws.AmazonWebServiceRequest;
18 import com.amazonaws.annotation.NotThreadSafe;
19 import com.amazonaws.annotation.SdkInternalApi;
20 import com.amazonaws.internal.SdkFilterInputStream;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 /**
26 * Used for input stream progress tracking purposes.
27 */
28 @NotThreadSafe
29 public abstract class ProgressInputStream extends SdkFilterInputStream {
30 /**
31 * Returns an input stream for request progress tracking purposes. If request/response progress
32 * tracking is not enabled, this method simply return the given input stream as is.
33 *
34 * @param is the request content input stream
35 * @deprecated
36 */
37 @Deprecated
38 public static InputStream inputStreamForRequest(InputStream is,
39 AmazonWebServiceRequest req) {
40 return req == null
41 ? is
42 : inputStreamForRequest(is, req.getGeneralProgressListener());
43 }
44
45 /**
46 * @param is the request content input stream
47 * @param progressListener Optional progress listener
48 * @return If the progress listener is non null returns a new input stream decorated with
49 * progress reporting functionality. If progress listener is null it returns the same input
50 * stream.
51 */
52 @SdkInternalApi
53 public static InputStream inputStreamForRequest(InputStream is, ProgressListener progressListener) {
54 return progressListener == null
55 ? is
56 : new RequestProgressInputStream(is, progressListener);
57 }
58
59 /**
60 * Returns an input stream for response progress tracking purposes. If
61 * request/response progress tracking is not enabled, this method simply
62 * return the given input stream as is.
63 *
64 * @param is the response content input stream
65 */
66 public static InputStream inputStreamForResponse(InputStream is, AmazonWebServiceRequest req) {
67 return req == null
68 ? is
69 : new ResponseProgressInputStream(is, req.getGeneralProgressListener());
70 }
71
72 /**
73 * Returns an input stream for response progress tracking purposes. If request/response progress tracking is not enabled, this
74 * method simply return the given input stream as is.
75 *
76 * @param is the response content input stream
77 * @param progressListener Optional progress listener
78 * @return If the progress listener is non null returns a new input stream decorated with progress reporting functionality. If
79 * progress listener is null it returns the same input stream.
80 */
81 public static InputStream inputStreamForResponse(InputStream is, ProgressListener progressListener) {
82 return progressListener == null
83 ? is
84 : new ResponseProgressInputStream(is, progressListener);
85 }
86
87 /** The threshold of bytes between notifications. */
88 private static final int DEFAULT_NOTIFICATION_THRESHOLD = 8 * 1024;
89
90 private final ProgressListener listener;
91 private final int notifyThresHold;
92 /** The number of bytes read that the listener hasn't been notified about yet. */
93 private int unnotifiedByteCount;
94 private boolean hasBeenRead;
95 private boolean doneEOF;
96 private long notifiedByteCount;
97
98 public ProgressInputStream(InputStream is, ProgressListener listener) {
99 this(is, listener, DEFAULT_NOTIFICATION_THRESHOLD);
100 }
101
102 public ProgressInputStream(InputStream is, ProgressListener listener, int notifyThresHold) {
103 super(is);
104 if (is == null || listener == null)
105 throw new IllegalArgumentException();
106 this.notifyThresHold = notifyThresHold;
107 this.listener = listener;
108 }
109
110 /**
111 * The read method is called for the very first time.
112 * Defaults to do nothing.
113 */
114 protected void onFirstRead() {}
115 /**
116 * An end-of-file event is to be notified.
117 * Defaults to do nothing.
118 */
119 protected void onEOF() {}
120
121 /**
122 * Defaults to behave the same as {@link #onEOF()}.
123 */
124 protected void onClose() {
125 eof();
126 }
127 /**
128 * A reset event is to be notified. Default to do nothing.
129 */
130 protected void onReset() {}
131 /**
132 * Upon notification of the number of bytes transferred since last
133 * notification. Default to do nothing.
134 */
135 protected void onNotifyBytesRead() {}
136
137 /**
138 * Upon reading the given number of bytes.
139 * The default behavior is to accumulate the byte count and only fire off
140 * a notification by invoking {@link #onNotifyBytesRead()} if the count
141 * has exceeded the threshold.
142 */
143 private void onBytesRead(int bytesRead) {
144 unnotifiedByteCount += bytesRead;
145 if (unnotifiedByteCount >= notifyThresHold) {
146 onNotifyBytesRead();
147 notifiedByteCount += unnotifiedByteCount;
148 unnotifiedByteCount = 0;
149 }
150 }
151
152 @Override
153 public int read() throws IOException {
154 if (!hasBeenRead) {
155 onFirstRead();
156 hasBeenRead = true;
157 }
158 int ch = super.read();
159 if (ch == -1)
160 eof();
161 else
162 onBytesRead(1);
163 return ch;
164 }
165
166 @Override
167 public void reset() throws IOException {
168 super.reset();
169 onReset();
170 unnotifiedByteCount = 0;
171 notifiedByteCount = 0;
172 }
173
174 @Override
175 public int read(byte[] b, int off, int len) throws IOException {
176 if (!hasBeenRead) {
177 onFirstRead();
178 hasBeenRead = true;
179 }
180 int bytesRead = super.read(b, off, len);
181 if (bytesRead == -1)
182 eof();
183 else
184 onBytesRead(bytesRead);
185 return bytesRead;
186 }
187
188 private void eof() {
189 if (doneEOF)
190 return;
191 onEOF();
192 unnotifiedByteCount = 0;
193 doneEOF = true;
194 }
195
196 public final InputStream getWrappedInputStream() {
197 return in;
198 }
199
200 protected final int getUnnotifiedByteCount() {
201 return unnotifiedByteCount;
202 }
203
204 protected final long getNotifiedByteCount() {
205 return notifiedByteCount;
206 }
207
208 @Override
209 public void close() throws IOException {
210 onClose(); // report any left over bytes not yet reported
211 super.close();
212 }
213
214 public final ProgressListener getListener() {
215 return listener;
216 }
217 }
218