1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/SnappyUncompressInputStream.h"
#include <algorithm>
#include "nsIAsyncInputStream.h"
#include "nsStreamUtils.h"
#include "snappy/snappy.h"
namespace mozilla {
NS_IMPL_ISUPPORTS(SnappyUncompressInputStream,
nsIInputStream);
// Putting kCompressedBufferLength inside a function avoids a static
// constructor.
static size_t CompressedBufferLength()
{
static size_t kCompressedBufferLength =
detail::SnappyFrameUtils::MaxCompressedBufferLength(snappy::kBlockSize);
MOZ_ASSERT(kCompressedBufferLength > 0);
return kCompressedBufferLength;
}
SnappyUncompressInputStream::SnappyUncompressInputStream(nsIInputStream* aBaseStream)
: mBaseStream(aBaseStream)
, mUncompressedBytes(0)
, mNextByte(0)
, mNextChunkType(Unknown)
, mNextChunkDataLength(0)
, mNeedFirstStreamIdentifier(true)
{
// This implementation only supports sync base streams. Verify this in debug
// builds. Note, this is a bit complicated because the streams we support
// advertise different capabilities:
// - nsFileInputStream - blocking and sync
// - nsStringInputStream - non-blocking and sync
// - nsPipeInputStream - can be blocking, but provides async interface
#ifdef DEBUG
bool baseNonBlocking;
nsresult rv = mBaseStream->IsNonBlocking(&baseNonBlocking);
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (baseNonBlocking) {
nsCOMPtr<nsIAsyncInputStream> async = do_QueryInterface(mBaseStream);
MOZ_ASSERT(!async);
}
#endif
}
NS_IMETHODIMP
SnappyUncompressInputStream::Close()
{
if (!mBaseStream) {
return NS_OK;
}
mBaseStream->Close();
mBaseStream = nullptr;
mUncompressedBuffer = nullptr;
mCompressedBuffer = nullptr;
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::Available(uint64_t* aLengthOut)
{
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
// If we have uncompressed bytes, then we are done.
*aLengthOut = UncompressedLength();
if (*aLengthOut > 0) {
return NS_OK;
}
// Otherwise, attempt to uncompress bytes until we get something or the
// underlying stream is drained. We loop here because some chunks can
// be StreamIdentifiers, padding, etc with no data.
uint32_t bytesRead;
do {
nsresult rv = ParseNextChunk(&bytesRead);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
*aLengthOut = UncompressedLength();
} while(*aLengthOut == 0 && bytesRead);
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::Read(char* aBuf, uint32_t aCount,
uint32_t* aBytesReadOut)
{
return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aBytesReadOut);
}
NS_IMETHODIMP
SnappyUncompressInputStream::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aBytesReadOut)
{
*aBytesReadOut = 0;
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
nsresult rv;
// Do not try to use the base stream's ReadSegements here. Its very
// unlikely we will get a single buffer that contains all of the compressed
// data and therefore would have to copy into our own buffer anyways.
// Instead, focus on making efficient use of the Read() interface.
while (aCount > 0) {
// We have some decompressed data in our buffer. Provide it to the
// callers writer function.
if (mUncompressedBytes > 0) {
MOZ_ASSERT(mUncompressedBuffer);
uint32_t remaining = UncompressedLength();
uint32_t numToWrite = std::min(aCount, remaining);
uint32_t numWritten;
rv = aWriter(this, aClosure, &mUncompressedBuffer[mNextByte], *aBytesReadOut,
numToWrite, &numWritten);
// As defined in nsIInputputStream.idl, do not pass writer func errors.
if (NS_FAILED(rv)) {
return NS_OK;
}
// End-of-file
if (numWritten == 0) {
return NS_OK;
}
*aBytesReadOut += numWritten;
mNextByte += numWritten;
MOZ_ASSERT(mNextByte <= mUncompressedBytes);
if (mNextByte == mUncompressedBytes) {
mNextByte = 0;
mUncompressedBytes = 0;
}
aCount -= numWritten;
continue;
}
// Otherwise uncompress the next chunk and loop. Any resulting data
// will set mUncompressedBytes which we check at the top of the loop.
uint32_t bytesRead;
rv = ParseNextChunk(&bytesRead);
if (NS_FAILED(rv)) { return rv; }
// If we couldn't read anything and there is no more data to provide
// to the caller, then this is eof.
if (bytesRead == 0 && mUncompressedBytes == 0) {
return NS_OK;
}
}
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::IsNonBlocking(bool* aNonBlockingOut)
{
*aNonBlockingOut = false;
return NS_OK;
}
SnappyUncompressInputStream::~SnappyUncompressInputStream()
{
Close();
}
nsresult
SnappyUncompressInputStream::ParseNextChunk(uint32_t* aBytesReadOut)
{
// There must not be any uncompressed data already in mUncompressedBuffer.
MOZ_ASSERT(mUncompressedBytes == 0);
MOZ_ASSERT(mNextByte == 0);
nsresult rv;
*aBytesReadOut = 0;
// Lazily create our two buffers so we can report OOM during stream
// operation. These allocations only happens once. The buffers are reused
// until the stream is closed.
if (!mUncompressedBuffer) {
mUncompressedBuffer.reset(new (fallible) char[snappy::kBlockSize]);
if (NS_WARN_IF(!mUncompressedBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
if (!mCompressedBuffer) {
mCompressedBuffer.reset(new (fallible) char[CompressedBufferLength()]);
if (NS_WARN_IF(!mCompressedBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
// We have no decompressed data and we also have not seen the start of stream
// yet. Read and validate the StreamIdentifier chunk. Also read the next
// header to determine the size of the first real data chunk.
if (mNeedFirstStreamIdentifier) {
const uint32_t firstReadLength = kHeaderLength +
kStreamIdentifierDataLength +
kHeaderLength;
MOZ_ASSERT(firstReadLength <= CompressedBufferLength());
rv = ReadAll(mCompressedBuffer.get(), firstReadLength, firstReadLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) { return rv; }
rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
if (NS_WARN_IF(mNextChunkType != StreamIdentifier ||
mNextChunkDataLength != kStreamIdentifierDataLength)) {
return NS_ERROR_CORRUPTED_CONTENT;
}
size_t offset = kHeaderLength;
mNeedFirstStreamIdentifier = false;
size_t numRead;
size_t numWritten;
rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize, mNextChunkType,
&mCompressedBuffer[offset],
mNextChunkDataLength, &numWritten, &numRead);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
MOZ_ASSERT(numWritten == 0);
MOZ_ASSERT(numRead == mNextChunkDataLength);
offset += numRead;
rv = ParseHeader(&mCompressedBuffer[offset], *aBytesReadOut - offset,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
return NS_OK;
}
// We have no compressed data and we don't know how big the next chunk is.
// This happens when we get an EOF pause in the middle of a stream and also
// at the end of the stream. Simply read the next header and return. The
// chunk body will be read on the next entry into this method.
if (mNextChunkType == Unknown) {
rv = ReadAll(mCompressedBuffer.get(), kHeaderLength, kHeaderLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) { return rv; }
rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
return NS_OK;
}
// We have no decompressed data, but we do know the size of the next chunk.
// Read at least that much from the base stream.
uint32_t readLength = mNextChunkDataLength;
MOZ_ASSERT(readLength <= CompressedBufferLength());
// However, if there is enough data in the base stream, also read the next
// chunk header. This helps optimize the stream by avoiding many small reads.
uint64_t avail;
rv = mBaseStream->Available(&avail);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
if (avail >= (readLength + kHeaderLength)) {
readLength += kHeaderLength;
MOZ_ASSERT(readLength <= CompressedBufferLength());
}
rv = ReadAll(mCompressedBuffer.get(), readLength, mNextChunkDataLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) { return rv; }
size_t numRead;
size_t numWritten;
rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize, mNextChunkType,
mCompressedBuffer.get(), mNextChunkDataLength,
&numWritten, &numRead);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
MOZ_ASSERT(numRead == mNextChunkDataLength);
mUncompressedBytes = numWritten;
// If we were unable to directly read the next chunk header, then clear
// our internal state. We will have to perform a small read to get the
// header the next time we enter this method.
if (*aBytesReadOut <= mNextChunkDataLength) {
mNextChunkType = Unknown;
mNextChunkDataLength = 0;
return NS_OK;
}
// We got the next chunk header. Parse it so that we are ready to for the
// next call into this method.
rv = ParseHeader(&mCompressedBuffer[numRead], *aBytesReadOut - numRead,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
return NS_OK;
}
nsresult
SnappyUncompressInputStream::ReadAll(char* aBuf, uint32_t aCount,
uint32_t aMinValidCount,
uint32_t* aBytesReadOut)
{
MOZ_ASSERT(aCount >= aMinValidCount);
*aBytesReadOut = 0;
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
uint32_t offset = 0;
while (aCount > 0) {
uint32_t bytesRead = 0;
nsresult rv = mBaseStream->Read(aBuf + offset, aCount, &bytesRead);
if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
// EOF, but don't immediately return. We need to validate min read bytes
// below.
if (bytesRead == 0) {
break;
}
*aBytesReadOut += bytesRead;
offset += bytesRead;
aCount -= bytesRead;
}
// Reading zero bytes is not an error. Its the expected EOF condition.
// Only compare to the minimum valid count if we read at least one byte.
if (*aBytesReadOut != 0 && *aBytesReadOut < aMinValidCount) {
return NS_ERROR_CORRUPTED_CONTENT;
}
return NS_OK;
}
size_t
SnappyUncompressInputStream::UncompressedLength() const
{
MOZ_ASSERT(mNextByte <= mUncompressedBytes);
return mUncompressedBytes - mNextByte;
}
} // namespace mozilla
|