1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */

17 package org.apache.commons.compress.archivers.zip;
18
19 import java.io.Closeable;
20 import java.io.DataOutput;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.SeekableByteChannel;
26 import java.util.zip.CRC32;
27 import java.util.zip.Deflater;
28 import java.util.zip.ZipEntry;
29
30 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
31
32 /**
33  * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams. Currently {@link java.util.zip.ZipEntry#DEFLATED} and
34  * {@link java.util.zip.ZipEntry#STORED} are the only supported compression methods.
35  *
36  * @since 1.10
37  */

38 public abstract class StreamCompressor implements Closeable {
39
40     private static final class DataOutputCompressor extends StreamCompressor {
41         private final DataOutput raf;
42
43         DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
44             super(deflater);
45             this.raf = raf;
46         }
47
48         @Override
49         protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
50             raf.write(data, offset, length);
51         }
52     }
53
54     private static final class OutputStreamCompressor extends StreamCompressor {
55         private final OutputStream os;
56
57         OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
58             super(deflater);
59             this.os = os;
60         }
61
62         @Override
63         protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
64             os.write(data, offset, length);
65         }
66     }
67
68     private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
69         private final ScatterGatherBackingStore bs;
70
71         ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
72             super(deflater);
73             this.bs = bs;
74         }
75
76         @Override
77         protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
78             bs.writeOut(data, offset, length);
79         }
80     }
81
82     private static final class SeekableByteChannelCompressor extends StreamCompressor {
83         private final SeekableByteChannel channel;
84
85         SeekableByteChannelCompressor(final Deflater deflater, final SeekableByteChannel channel) {
86             super(deflater);
87             this.channel = channel;
88         }
89
90         @Override
91         protected void writeOut(final byte[] data, final int offset, final int length) throws IOException {
92             channel.write(ByteBuffer.wrap(data, offset, length));
93         }
94     }
95
96     /*
97      * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs when it gets handed a huge buffer. See
98      * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
99      *
100      * Using a buffer size of 8 kB proved to be a good compromise
101      */

102     private static final int DEFLATER_BLOCK_SIZE = 8192;
103     private static final int BUFFER_SIZE = 4096;
104
105     /**
106      * Creates a stream compressor with the given compression level.
107      *
108      * @param os       The DataOutput to receive output
109      * @param deflater The deflater to use for the compressor
110      * @return A stream compressor
111      */

112     static StreamCompressor create(final DataOutput os, final Deflater deflater) {
113         return new DataOutputCompressor(deflater, os);
114     }
115
116     /**
117      * Creates a stream compressor with the given compression level.
118      *
119      * @param compressionLevel The {@link Deflater} compression level
120      * @param bs               The ScatterGatherBackingStore to receive output
121      * @return A stream compressor
122      */

123     public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
124         final Deflater deflater = new Deflater(compressionLevel, true);
125         return new ScatterGatherBackingStoreCompressor(deflater, bs);
126     }
127
128     /**
129      * Creates a stream compressor with the default compression level.
130      *
131      * @param os The stream to receive output
132      * @return A stream compressor
133      */

134     static StreamCompressor create(final OutputStream os) {
135         return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
136     }
137
138     /**
139      * Creates a stream compressor with the given compression level.
140      *
141      * @param os       The stream to receive output
142      * @param deflater The deflater to use
143      * @return A stream compressor
144      */

145     static StreamCompressor create(final OutputStream os, final Deflater deflater) {
146         return new OutputStreamCompressor(deflater, os);
147     }
148
149     /**
150      * Creates a stream compressor with the default compression level.
151      *
152      * @param bs The ScatterGatherBackingStore to receive output
153      * @return A stream compressor
154      */

155     public static StreamCompressor create(final ScatterGatherBackingStore bs) {
156         return create(Deflater.DEFAULT_COMPRESSION, bs);
157     }
158
159     /**
160      * Creates a stream compressor with the given compression level.
161      *
162      * @param os       The SeekableByteChannel to receive output
163      * @param deflater The deflater to use for the compressor
164      * @return A stream compressor
165      * @since 1.13
166      */

167     static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
168         return new SeekableByteChannelCompressor(deflater, os);
169     }
170
171     private final Deflater def;
172
173     private final CRC32 crc = new CRC32();
174
175     private long writtenToOutputStreamForLastEntry;
176
177     private long sourcePayloadLength;
178
179     private long totalWrittenToOutputStream;
180
181     private final byte[] outputBuffer = new byte[BUFFER_SIZE];
182
183     private final byte[] readerBuf = new byte[BUFFER_SIZE];
184
185     StreamCompressor(final Deflater deflater) {
186         this.def = deflater;
187     }
188
189     @Override
190     public void close() throws IOException {
191         def.end();
192     }
193
194     void deflate() throws IOException {
195         final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
196         if (len > 0) {
197             writeCounted(outputBuffer, 0, len);
198         }
199     }
200
201     /**
202      * Deflate the given source using the supplied compression method
203      *
204      * @param source The source to compress
205      * @param method The #ZipArchiveEntry compression method
206      * @throws IOException When failures happen
207      */

208
209     public void deflate(final InputStream source, final int method) throws IOException {
210         reset();
211         int length;
212
213         while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
214             write(readerBuf, 0, length, method);
215         }
216         if (method == ZipEntry.DEFLATED) {
217             flushDeflater();
218         }
219     }
220
221     private void deflateUntilInputIsNeeded() throws IOException {
222         while (!def.needsInput()) {
223             deflate();
224         }
225     }
226
227     void flushDeflater() throws IOException {
228         def.finish();
229         while (!def.finished()) {
230             deflate();
231         }
232     }
233
234     /**
235      * Gets the number of bytes read from the source stream
236      *
237      * @return The number of bytes read, never negative
238      */

239     public long getBytesRead() {
240         return sourcePayloadLength;
241     }
242
243     /**
244      * The number of bytes written to the output for the last entry
245      *
246      * @return The number of bytes, never negative
247      */

248     public long getBytesWrittenForLastEntry() {
249         return writtenToOutputStreamForLastEntry;
250     }
251
252     /**
253      * The crc32 of the last deflated file
254      *
255      * @return the crc32
256      */

257
258     public long getCrc32() {
259         return crc.getValue();
260     }
261
262     /**
263      * The total number of bytes written to the output for all files
264      *
265      * @return The number of bytes, never negative
266      */

267     public long getTotalBytesWritten() {
268         return totalWrittenToOutputStream;
269     }
270
271     void reset() {
272         crc.reset();
273         def.reset();
274         sourcePayloadLength = 0;
275         writtenToOutputStreamForLastEntry = 0;
276     }
277
278     /**
279      * Writes bytes to ZIP entry.
280      *
281      * @param b      the byte array to write
282      * @param offset the start position to write from
283      * @param length the number of bytes to write
284      * @param method the comrpession method to use
285      * @return the number of bytes written to the stream this time
286      * @throws IOException on error
287      */

288     long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
289         final long current = writtenToOutputStreamForLastEntry;
290         crc.update(b, offset, length);
291         if (method == ZipEntry.DEFLATED) {
292             writeDeflated(b, offset, length);
293         } else {
294             writeCounted(b, offset, length);
295         }
296         sourcePayloadLength += length;
297         return writtenToOutputStreamForLastEntry - current;
298     }
299
300     public void writeCounted(final byte[] data) throws IOException {
301         writeCounted(data, 0, data.length);
302     }
303
304     public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
305         writeOut(data, offset, length);
306         writtenToOutputStreamForLastEntry += length;
307         totalWrittenToOutputStream += length;
308     }
309
310     private void writeDeflated(final byte[] b, final int offset, final int length) throws IOException {
311         if (length > 0 && !def.finished()) {
312             if (length <= DEFLATER_BLOCK_SIZE) {
313                 def.setInput(b, offset, length);
314                 deflateUntilInputIsNeeded();
315             } else {
316                 final int fullblocks = length / DEFLATER_BLOCK_SIZE;
317                 for (int i = 0; i < fullblocks; i++) {
318                     def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE, DEFLATER_BLOCK_SIZE);
319                     deflateUntilInputIsNeeded();
320                 }
321                 final int done = fullblocks * DEFLATER_BLOCK_SIZE;
322                 if (done < length) {
323                     def.setInput(b, offset + done, length - done);
324                     deflateUntilInputIsNeeded();
325                 }
326             }
327         }
328     }
329
330     protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
331 }
332