diff options
Diffstat (limited to 'netwerk/test/httpserver/test/test_async_response_sending.js')
-rw-r--r-- | netwerk/test/httpserver/test/test_async_response_sending.js | 1683 |
1 files changed, 1683 insertions, 0 deletions
diff --git a/netwerk/test/httpserver/test/test_async_response_sending.js b/netwerk/test/httpserver/test/test_async_response_sending.js new file mode 100644 index 0000000000..84ec74daf4 --- /dev/null +++ b/netwerk/test/httpserver/test/test_async_response_sending.js @@ -0,0 +1,1683 @@ +/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */ +/* vim:set ts=2 sw=2 sts=2 et: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/* + * Ensures that data a request handler writes out in response is sent only as + * quickly as the client can receive it, without racing ahead and being forced + * to block while writing that data. + * + * NB: These tests are extremely tied to the current implementation, in terms of + * when and how stream-ready notifications occur, the amount of data which will + * be read or written at each notification, and so on. If the implementation + * changes in any way with respect to stream copying, this test will probably + * have to change a little at the edges as well. + */ + +gThreadManager = Cc["@mozilla.org/thread-manager;1"].createInstance(); + +function run_test() +{ + do_test_pending(); + tests.push(function testsComplete(_) + { + dumpn("******************\n" + + "* TESTS COMPLETE *\n" + + "******************"); + do_test_finished(); + }); + + runNextTest(); +} + +function runNextTest() +{ + testIndex++; + dumpn("*** runNextTest(), testIndex: " + testIndex); + + try + { + var test = tests[testIndex]; + test(runNextTest); + } + catch (e) + { + var msg = "exception running test " + testIndex + ": " + e; + if (e && "stack" in e) + msg += "\nstack follows:\n" + e.stack; + do_throw(msg); + } +} + + +/************* + * TEST DATA * + *************/ + +const NOTHING = []; + +const FIRST_SEGMENT = [1, 2, 3, 4]; +const SECOND_SEGMENT = [5, 6, 7, 8]; +const THIRD_SEGMENT = [9, 10, 11, 12]; + +const SEGMENT = FIRST_SEGMENT; +const TWO_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8]; +const THREE_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; + +const SEGMENT_AND_HALF = [1, 2, 3, 4, 5, 6]; + +const QUARTER_SEGMENT = [1]; +const HALF_SEGMENT = [1, 2]; +const SECOND_HALF_SEGMENT = [3, 4]; +const THREE_QUARTER_SEGMENT = [1, 2, 3]; +const EXTRA_HALF_SEGMENT = [5, 6]; +const MIDDLE_HALF_SEGMENT = [2, 3]; +const LAST_QUARTER_SEGMENT = [4]; +const FOURTH_HALF_SEGMENT = [7, 8]; +const HALF_THIRD_SEGMENT = [9, 10]; +const LATTER_HALF_THIRD_SEGMENT = [11, 12]; + +const TWO_HALF_SEGMENTS = [1, 2, 1, 2]; + + +/********* + * TESTS * + *********/ + +var tests = + [ + sourceClosedWithoutWrite, + writeOneSegmentThenClose, + simpleWriteThenRead, + writeLittleBeforeReading, + writeMultipleSegmentsThenRead, + writeLotsBeforeReading, + writeLotsBeforeReading2, + writeThenReadPartial, + manyPartialWrites, + partialRead, + partialWrite, + sinkClosedImmediately, + sinkClosedWithReadableData, + sinkClosedAfterWrite, + sourceAndSinkClosed, + sinkAndSourceClosed, + sourceAndSinkClosedWithPendingData, + sinkAndSourceClosedWithPendingData, + ]; +var testIndex = -1; + +function sourceClosedWithoutWrite(next) +{ + var t = new CopyTest("sourceClosedWithoutWrite", next); + + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [NOTHING]); +} + +function writeOneSegmentThenClose(next) +{ + var t = new CopyTest("writeLittleBeforeReading", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT]); +} + +function simpleWriteThenRead(next) +{ + var t = new CopyTest("simpleWriteThenRead", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [SEGMENT]); +} + +function writeLittleBeforeReading(next) +{ + var t = new CopyTest("writeLittleBeforeReading", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT, SEGMENT]); +} + +function writeMultipleSegmentsThenRead(next) +{ + var t = new CopyTest("writeMultipleSegmentsThenRead", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(TWO_SEGMENTS.length, + [FIRST_SEGMENT, SECOND_SEGMENT]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [TWO_SEGMENTS]); +} + +function writeLotsBeforeReading(next) +{ + var t = new CopyTest("writeLotsBeforeReading", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]); + t.expect(Cr.NS_OK, [TWO_SEGMENTS, SEGMENT, SEGMENT]); +} + +function writeLotsBeforeReading2(next) +{ + var t = new CopyTest("writeLotsBeforeReading", next); + + t.addToSource(THREE_SEGMENTS); + t.makeSourceReadable(THREE_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(THIRD_SEGMENT.length, [THIRD_SEGMENT]); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]); + t.expect(Cr.NS_OK, [THREE_SEGMENTS, SEGMENT, SEGMENT]); +} + +function writeThenReadPartial(next) +{ + var t = new CopyTest("writeThenReadPartial", next); + + t.addToSource(SEGMENT_AND_HALF); + t.makeSourceReadable(SEGMENT_AND_HALF.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(EXTRA_HALF_SEGMENT.length, [EXTRA_HALF_SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT_AND_HALF]); +} + +function manyPartialWrites(next) +{ + var t = new CopyTest("manyPartialWrites", next); + + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + t.makeSinkWritableAndWaitFor(2 * HALF_SEGMENT.length, [TWO_HALF_SEGMENTS]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [TWO_HALF_SEGMENTS]); +} + +function partialRead(next) +{ + var t = new CopyTest("partialRead", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSourceAndWaitFor(Cr.NS_OK, HALF_SEGMENT.length, [HALF_SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT, HALF_SEGMENT]); +} + +function partialWrite(next) +{ + var t = new CopyTest("partialWrite", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length, + [QUARTER_SEGMENT, + MIDDLE_HALF_SEGMENT, + LAST_QUARTER_SEGMENT]); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length, + [HALF_SEGMENT, SECOND_HALF_SEGMENT]); + + t.addToSource(THREE_SEGMENTS); + t.makeSourceReadable(THREE_SEGMENTS.length); + t.makeSinkWritableByIncrementsAndWaitFor(THREE_SEGMENTS.length, + [HALF_SEGMENT, SECOND_HALF_SEGMENT, + SECOND_SEGMENT, + HALF_THIRD_SEGMENT, + LATTER_HALF_THIRD_SEGMENT]); + + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [SEGMENT, SEGMENT, THREE_SEGMENTS]); +} + +function sinkClosedImmediately(next) +{ + var t = new CopyTest("sinkClosedImmediately", next); + + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]); +} + +function sinkClosedWithReadableData(next) +{ + var t = new CopyTest("sinkClosedWithReadableData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]); +} + +function sinkClosedAfterWrite(next) +{ + var t = new CopyTest("sinkClosedAfterWrite", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [FIRST_SEGMENT]); +} + +function sourceAndSinkClosed(next) +{ + var t = new CopyTest("sourceAndSinkClosed", next); + + t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK); + t.expect(Cr.NS_OK, []); +} + +function sinkAndSourceClosed(next) +{ + var t = new CopyTest("sinkAndSourceClosed", next); + + t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK); + + // sink notify received first, hence error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + +function sourceAndSinkClosedWithPendingData(next) +{ + var t = new CopyTest("sourceAndSinkClosedWithPendingData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + + t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK); + + // not all data from source copied, so error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + +function sinkAndSourceClosedWithPendingData(next) +{ + var t = new CopyTest("sinkAndSourceClosedWithPendingData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + + t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK); + + // not all data from source copied, plus sink notify received first, so error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + + +/************* + * UTILITIES * + *************/ + +/** Returns the sum of the elements in arr. */ +function sum(arr) +{ + var sum = 0; + for (var i = 0, sz = arr.length; i < sz; i++) + sum += arr[i]; + return sum; +} + +/** + * Returns a constructor for an input or output stream callback that will wrap + * the one provided to it as an argument. + * + * @param wrapperCallback : (nsIInputStreamCallback | nsIOutputStreamCallback) : void + * the original callback object (not a function!) being wrapped + * @param name : string + * either "onInputStreamReady" if we're wrapping an input stream callback or + * "onOutputStreamReady" if we're wrapping an output stream callback + * @returns function(nsIInputStreamCallback | nsIOutputStreamCallback) : (nsIInputStreamCallback | nsIOutputStreamCallback) + * a constructor function which constructs a callback object (not function!) + * which, when called, first calls the original callback provided to it and + * then calls wrapperCallback + */ +function createStreamReadyInterceptor(wrapperCallback, name) +{ + return function StreamReadyInterceptor(callback) + { + this.wrappedCallback = callback; + this[name] = function streamReadyInterceptor(stream) + { + dumpn("*** StreamReadyInterceptor." + name); + + try + { + dumpn("*** calling original " + name + "..."); + callback[name](stream); + } + catch (e) + { + dumpn("!!! error running inner callback: " + e); + throw e; + } + finally + { + dumpn("*** calling wrapper " + name + "..."); + wrapperCallback[name](stream); + } + } + }; +} + +/** + * Print out a banner with the given message, uppercased, for debugging + * purposes. + */ +function note(m) +{ + m = m.toUpperCase(); + var asterisks = Array(m.length + 1 + 4).join("*"); + dumpn(asterisks + "\n* " + m + " *\n" + asterisks); +} + + +/*********** + * MOCKERY * + ***********/ + +/* + * Blatantly violate abstractions in the name of testability. THIS IS NOT + * PUBLIC API! If you use any of these I will knowingly break your code by + * changing the names of variables and properties. + */ +var BinaryInputStream = function BIS(stream) { return stream; }; +var BinaryOutputStream = function BOS(stream) { return stream; }; +Response.SEGMENT_SIZE = SEGMENT.length; + +/** + * Roughly mocks an nsIPipe, presenting non-blocking input and output streams + * that appear to also be binary streams and whose readability and writability + * amounts are configurable. Only the methods used in this test have been + * implemented -- these aren't exact mocks (can't be, actually, because input + * streams have unscriptable methods). + * + * @param name : string + * a name for this pipe, used in debugging output + */ +function CustomPipe(name) +{ + var self = this; + + /** Data read from input that's buffered until it can be written to output. */ + this._data = []; + + /** + * The status of this pipe, which is to say the error result the ends of this + * pipe will return when attempts are made to use them. This value is always + * an error result when copying has finished, because success codes are + * converted to NS_BASE_STREAM_CLOSED. + */ + this._status = Cr.NS_OK; + + /** The input end of this pipe. */ + var input = this.inputStream = + { + /** A name for this stream, used in debugging output. */ + name: name + " input", + + /** + * The number of bytes of data available to be read from this pipe, or + * Infinity if any amount of data in this pipe is made readable as soon as + * it is written to the pipe output. + */ + _readable: 0, + + /** + * Data regarding a pending stream-ready callback on this, or null if no + * callback is currently waiting to be called. + */ + _waiter: null, + + /** + * The event currently dispatched to make a stream-ready callback, if any + * such callback is currently ready to be made and not already in + * progress, or null when no callback is waiting to happen. + */ + _event: null, + + /** + * A stream-ready constructor to wrap an existing callback to intercept + * stream-ready notifications, or null if notifications shouldn't be + * wrapped at all. + */ + _streamReadyInterceptCreator: null, + + /** + * Registers a stream-ready wrapper creator function so that a + * stream-ready callback made in the future can be wrapped. + */ + interceptStreamReadyCallbacks: function(streamReadyInterceptCreator) + { + dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks"); + + do_check_true(this._streamReadyInterceptCreator === null, + "intercepting twice"); + this._streamReadyInterceptCreator = streamReadyInterceptCreator; + if (this._waiter) + { + this._waiter.callback = + new streamReadyInterceptCreator(this._waiter.callback); + } + }, + + /** + * Removes a previously-registered stream-ready wrapper creator function, + * also clearing any current wrapping. + */ + removeStreamReadyInterceptor: function() + { + dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "removing interceptor when none present?"); + this._streamReadyInterceptCreator = null; + if (this._waiter) + this._waiter.callback = this._waiter.callback.wrappedCallback; + }, + + // + // see nsIAsyncInputStream.asyncWait + // + asyncWait: function asyncWait(callback, flags, requestedCount, target) + { + dumpn("*** [" + this.name + "].asyncWait"); + + do_check_true(callback && typeof callback !== "function"); + + var closureOnly = + (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0; + + do_check_true(this._waiter === null || + (this._waiter.closureOnly && !closureOnly), + "asyncWait already called with a non-closure-only " + + "callback? unexpected!"); + + this._waiter = + { + callback: + this._streamReadyInterceptCreator + ? new this._streamReadyInterceptCreator(callback) + : callback, + closureOnly: closureOnly, + requestedCount: requestedCount, + eventTarget: target + }; + + if (!Components.isSuccessCode(self._status) || + (!closureOnly && this._readable >= requestedCount && + self._data.length >= requestedCount)) + { + this._notify(); + } + }, + + // + // see nsIAsyncInputStream.closeWithStatus + // + closeWithStatus: function closeWithStatus(status) + { + dumpn("*** [" + this.name + "].closeWithStatus" + + "(" + status + ")"); + + if (!Components.isSuccessCode(self._status)) + { + dumpn("*** ignoring second closure of [input " + this.name + "] " + + "(status " + self._status + ")"); + return; + } + + if (Components.isSuccessCode(status)) + status = Cr.NS_BASE_STREAM_CLOSED; + + self._status = status; + + if (this._waiter) + this._notify(); + if (output._waiter) + output._notify(); + }, + + // + // see nsIBinaryInputStream.readByteArray + // + readByteArray: function readByteArray(count) + { + dumpn("*** [" + this.name + "].readByteArray(" + count + ")"); + + if (self._data.length === 0) + { + throw Components.isSuccessCode(self._status) + ? Cr.NS_BASE_STREAM_WOULD_BLOCK + : self._status; + } + + do_check_true(this._readable <= self._data.length || + this._readable === Infinity, + "consistency check"); + + if (this._readable < count || self._data.length < count) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + this._readable -= count; + return self._data.splice(0, count); + }, + + /** + * Makes the given number of additional bytes of data previously written + * to the pipe's output stream available for reading, triggering future + * notifications when required. + * + * @param count : uint + * the number of bytes of additional data to make available; must not be + * greater than the number of bytes already buffered but not made + * available by previous makeReadable calls + */ + makeReadable: function makeReadable(count) + { + dumpn("*** [" + this.name + "].makeReadable(" + count + ")"); + + do_check_true(Components.isSuccessCode(self._status), "errant call"); + do_check_true(this._readable + count <= self._data.length || + this._readable === Infinity, + "increasing readable beyond written amount"); + + this._readable += count; + + dumpn("readable: " + this._readable + ", data: " + self._data); + + var waiter = this._waiter; + if (waiter !== null) + { + if (waiter.requestedCount <= this._readable && !waiter.closureOnly) + this._notify(); + } + }, + + /** + * Disables the readability limit on this stream, meaning that as soon as + * *any* amount of data is written to output it becomes available from + * this stream and a stream-ready event is dispatched (if any stream-ready + * callback is currently set). + */ + disableReadabilityLimit: function disableReadabilityLimit() + { + dumpn("*** [" + this.name + "].disableReadabilityLimit()"); + + this._readable = Infinity; + }, + + // + // see nsIInputStream.available + // + available: function available() + { + dumpn("*** [" + this.name + "].available()"); + + if (self._data.length === 0 && !Components.isSuccessCode(self._status)) + throw self._status; + + return Math.min(this._readable, self._data.length); + }, + + /** + * Dispatches a pending stream-ready event ahead of schedule, rather than + * waiting for it to be dispatched in response to normal writes. This is + * useful when writing to the output has completed, and we need to have + * read all data written to this stream. If the output isn't closed and + * the reading of data from this races ahead of the last write to output, + * we need a notification to know when everything that's been written has + * been read. This ordinarily might be supplied by closing output, but + * in some cases it's not desirable to close output, so this supplies an + * alternative method to get notified when the last write has occurred. + */ + maybeNotifyFinally: function maybeNotifyFinally() + { + dumpn("*** [" + this.name + "].maybeNotifyFinally()"); + + do_check_true(this._waiter !== null, "must be waiting now"); + + if (self._data.length > 0) + { + dumpn("*** data still pending, normal notifications will signal " + + "completion"); + return; + } + + // No data waiting to be written, so notify. We could just close the + // stream, but that's less faithful to the server's behavior (it doesn't + // close the stream, and we're pretending to impersonate the server as + // much as we can here), so instead we're going to notify when no data + // can be read. The CopyTest has already been flagged as complete, so + // the stream listener will detect that this is a wrap-it-up notify and + // invoke the next test. + this._notify(); + }, + + /** + * Dispatches an event to call a previously-registered stream-ready + * callback. + */ + _notify: function _notify() + { + dumpn("*** [" + this.name + "]._notify()"); + + var waiter = this._waiter; + do_check_true(waiter !== null, "no waiter?"); + + if (this._event === null) + { + var event = this._event = + { + run: function run() + { + input._waiter = null; + input._event = null; + try + { + do_check_true(!Components.isSuccessCode(self._status) || + input._readable >= waiter.requestedCount); + waiter.callback.onInputStreamReady(input); + } + catch (e) + { + do_throw("error calling onInputStreamReady: " + e); + } + } + }; + waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + } + }, + + QueryInterface: function QueryInterface(iid) + { + if (iid.equals(Ci.nsIAsyncInputStream) || + iid.equals(Ci.nsIInputStream) || + iid.equals(Ci.nsISupports)) + { + return this; + } + + throw Cr.NS_ERROR_NO_INTERFACE; + } + }; + + /** The output end of this pipe. */ + var output = this.outputStream = + { + /** A name for this stream, used in debugging output. */ + name: name + " output", + + /** + * The number of bytes of data which may be written to this pipe without + * blocking. + */ + _writable: 0, + + /** + * The increments in which pending data should be written, rather than + * simply defaulting to the amount requested (which, given that + * input.asyncWait precisely respects the requestedCount argument, will + * ordinarily always be writable in that amount), as an array whose + * elements from start to finish are the number of bytes to write each + * time write() or writeByteArray() is subsequently called. The sum of + * the values in this array, if this array is not empty, is always equal + * to this._writable. + */ + _writableAmounts: [], + + /** + * Data regarding a pending stream-ready callback on this, or null if no + * callback is currently waiting to be called. + */ + _waiter: null, + + /** + * The event currently dispatched to make a stream-ready callback, if any + * such callback is currently ready to be made and not already in + * progress, or null when no callback is waiting to happen. + */ + _event: null, + + /** + * A stream-ready constructor to wrap an existing callback to intercept + * stream-ready notifications, or null if notifications shouldn't be + * wrapped at all. + */ + _streamReadyInterceptCreator: null, + + /** + * Registers a stream-ready wrapper creator function so that a + * stream-ready callback made in the future can be wrapped. + */ + interceptStreamReadyCallbacks: function(streamReadyInterceptCreator) + { + dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "intercepting onOutputStreamReady twice"); + this._streamReadyInterceptCreator = streamReadyInterceptCreator; + if (this._waiter) + { + this._waiter.callback = + new streamReadyInterceptCreator(this._waiter.callback); + } + }, + + /** + * Removes a previously-registered stream-ready wrapper creator function, + * also clearing any current wrapping. + */ + removeStreamReadyInterceptor: function() + { + dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "removing interceptor when none present?"); + this._streamReadyInterceptCreator = null; + if (this._waiter) + this._waiter.callback = this._waiter.callback.wrappedCallback; + }, + + // + // see nsIAsyncOutputStream.asyncWait + // + asyncWait: function asyncWait(callback, flags, requestedCount, target) + { + dumpn("*** [" + this.name + "].asyncWait"); + + do_check_true(callback && typeof callback !== "function"); + + var closureOnly = + (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0; + + do_check_true(this._waiter === null || + (this._waiter.closureOnly && !closureOnly), + "asyncWait already called with a non-closure-only " + + "callback? unexpected!"); + + this._waiter = + { + callback: + this._streamReadyInterceptCreator + ? new this._streamReadyInterceptCreator(callback) + : callback, + closureOnly: closureOnly, + requestedCount: requestedCount, + eventTarget: target, + toString: function toString() + { + return "waiter(" + (closureOnly ? "closure only, " : "") + + "requestedCount: " + requestedCount + ", target: " + + target + ")"; + } + }; + + if ((!closureOnly && this._writable >= requestedCount) || + !Components.isSuccessCode(this.status)) + { + this._notify(); + } + }, + + // + // see nsIAsyncOutputStream.closeWithStatus + // + closeWithStatus: function closeWithStatus(status) + { + dumpn("*** [" + this.name + "].closeWithStatus(" + status + ")"); + + if (!Components.isSuccessCode(self._status)) + { + dumpn("*** ignoring redundant closure of [input " + this.name + "] " + + "because it's already closed (status " + self._status + ")"); + return; + } + + if (Components.isSuccessCode(status)) + status = Cr.NS_BASE_STREAM_CLOSED; + + self._status = status; + + if (input._waiter) + input._notify(); + if (this._waiter) + this._notify(); + }, + + // + // see nsIBinaryOutputStream.writeByteArray + // + writeByteArray: function writeByteArray(bytes, length) + { + dumpn("*** [" + this.name + "].writeByteArray" + + "([" + bytes + "], " + length + ")"); + + do_check_eq(bytes.length, length, "sanity"); + if (!Components.isSuccessCode(self._status)) + throw self._status; + + do_check_eq(this._writableAmounts.length, 0, + "writeByteArray can't support specified-length writes"); + + if (this._writable < length) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + + self._data.push.apply(self._data, bytes); + this._writable -= length; + + if (input._readable === Infinity && input._waiter && + !input._waiter.closureOnly) + { + input._notify(); + } + }, + + // + // see nsIOutputStream.write + // + write: function write(str, length) + { + dumpn("*** [" + this.name + "].write"); + + do_check_eq(str.length, length, "sanity"); + if (!Components.isSuccessCode(self._status)) + throw self._status; + if (this._writable === 0) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + + var actualWritten; + if (this._writableAmounts.length === 0) + { + actualWritten = Math.min(this._writable, length); + } + else + { + do_check_true(this._writable >= this._writableAmounts[0], + "writable amounts value greater than writable data?"); + do_check_eq(this._writable, sum(this._writableAmounts), + "total writable amount not equal to sum of writable " + + "increments"); + actualWritten = this._writableAmounts.shift(); + } + + var bytes = str.substring(0, actualWritten) + .split("") + .map(function(v) { return v.charCodeAt(0); }); + + self._data.push.apply(self._data, bytes); + this._writable -= actualWritten; + + if (input._readable === Infinity && input._waiter && + !input._waiter.closureOnly) + { + input._notify(); + } + + return actualWritten; + }, + + /** + * Increase the amount of data that can be written without blocking by the + * given number of bytes, triggering future notifications when required. + * + * @param count : uint + * the number of bytes of additional data to make writable + */ + makeWritable: function makeWritable(count) + { + dumpn("*** [" + this.name + "].makeWritable(" + count + ")"); + + do_check_true(Components.isSuccessCode(self._status)); + + this._writable += count; + + var waiter = this._waiter; + if (waiter && !waiter.closureOnly && + waiter.requestedCount <= this._writable) + { + this._notify(); + } + }, + + /** + * Increase the amount of data that can be written without blocking, but + * do so by specifying a number of bytes that will be written each time + * a write occurs, even as asyncWait notifications are initially triggered + * as usual. Thus, rather than writes eagerly writing everything possible + * at each step, attempts to write out data by segment devolve into a + * partial segment write, then another, and so on until the amount of data + * specified as permitted to be written, has been written. + * + * Note that the writeByteArray method is incompatible with the previous + * calling of this method, in that, until all increments provided to this + * method have been consumed, writeByteArray cannot be called. Once all + * increments have been consumed, writeByteArray may again be called. + * + * @param increments : [uint] + * an array whose elements are positive numbers of bytes to permit to be + * written each time write() is subsequently called on this, ignoring + * the total amount of writable space specified by the sum of all + * increments + */ + makeWritableByIncrements: function makeWritableByIncrements(increments) + { + dumpn("*** [" + this.name + "].makeWritableByIncrements" + + "([" + increments.join(", ") + "])"); + + do_check_true(increments.length > 0, "bad increments"); + do_check_true(increments.every(function(v) { return v > 0; }), + "zero increment?"); + + do_check_true(Components.isSuccessCode(self._status)); + + this._writable += sum(increments); + this._writableAmounts = increments; + + var waiter = this._waiter; + if (waiter && !waiter.closureOnly && + waiter.requestedCount <= this._writable) + { + this._notify(); + } + }, + + /** + * Dispatches an event to call a previously-registered stream-ready + * callback. + */ + _notify: function _notify() + { + dumpn("*** [" + this.name + "]._notify()"); + + var waiter = this._waiter; + do_check_true(waiter !== null, "no waiter?"); + + if (this._event === null) + { + var event = this._event = + { + run: function run() + { + output._waiter = null; + output._event = null; + + try + { + waiter.callback.onOutputStreamReady(output); + } + catch (e) + { + do_throw("error calling onOutputStreamReady: " + e); + } + } + }; + waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + } + }, + + QueryInterface: function QueryInterface(iid) + { + if (iid.equals(Ci.nsIAsyncOutputStream) || + iid.equals(Ci.nsIOutputStream) || + iid.equals(Ci.nsISupports)) + { + return this; + } + + throw Cr.NS_ERROR_NO_INTERFACE; + } + }; +} + +/** + * Represents a sequence of interactions to perform with a copier, in a given + * order and at the desired time intervals. + * + * @param name : string + * test name, used in debugging output + */ +function CopyTest(name, next) +{ + /** Name used in debugging output. */ + this.name = name; + + /** A function called when the test completes. */ + this._done = next; + + var sourcePipe = new CustomPipe(name + "-source"); + + /** The source of data for the copier to copy. */ + this._source = sourcePipe.inputStream; + + /** + * The sink to which to write data which will appear in the copier's source. + */ + this._copyableDataStream = sourcePipe.outputStream; + + var sinkPipe = new CustomPipe(name + "-sink"); + + /** The sink to which the copier copies data. */ + this._sink = sinkPipe.outputStream; + + /** Input stream from which to read data the copier's written to its sink. */ + this._copiedDataStream = sinkPipe.inputStream; + + this._copiedDataStream.disableReadabilityLimit(); + + /** + * True if there's a callback waiting to read data written by the copier to + * its output, from the input end of the pipe representing the copier's sink. + */ + this._waitingForData = false; + + /** + * An array of the bytes of data expected to be written to output by the + * copier when this test runs. + */ + this._expectedData = undefined; + + /** Array of bytes of data received so far. */ + this._receivedData = []; + + /** The expected final status returned by the copier. */ + this._expectedStatus = -1; + + /** The actual final status returned by the copier. */ + this._actualStatus = -1; + + /** The most recent sequence of bytes written to output by the copier. */ + this._lastQuantum = []; + + /** + * True iff we've received the last quantum of data written to the sink by the + * copier. + */ + this._allDataWritten = false; + + /** + * True iff the copier has notified its associated stream listener of + * completion. + */ + this._copyingFinished = false; + + /** Index of the next task to execute while driving the copier. */ + this._currentTask = 0; + + /** Array containing all tasks to run. */ + this._tasks = []; + + /** The copier used by this test. */ + this._copier = + new WriteThroughCopier(this._source, this._sink, this, null); + + // Start watching for data written by the copier to the sink. + this._waitForWrittenData(); +} +CopyTest.prototype = +{ + /** + * Adds the given array of bytes to data in the copier's source. + * + * @param bytes : [uint] + * array of bytes of data to add to the source for the copier + */ + addToSource: function addToSource(bytes) + { + var self = this; + this._addToTasks(function addToSourceTask() + { + note("addToSourceTask"); + + try + { + self._copyableDataStream.makeWritable(bytes.length); + self._copyableDataStream.writeByteArray(bytes, bytes.length); + } + finally + { + self._stageNextTask(); + } + }); + }, + + /** + * Makes bytes of data previously added to the source available to be read by + * the copier. + * + * @param count : uint + * number of bytes to make available for reading + */ + makeSourceReadable: function makeSourceReadable(count) + { + var self = this; + this._addToTasks(function makeSourceReadableTask() + { + note("makeSourceReadableTask"); + + self._source.makeReadable(count); + self._stageNextTask(); + }); + }, + + /** + * Increases available space in the sink by the given amount, waits for the + * given series of arrays of bytes to be written to sink by the copier, and + * causes execution to asynchronously continue to the next task when the last + * of those arrays of bytes is received. + * + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + makeSinkWritableAndWaitFor: + function makeSinkWritableAndWaitFor(bytes, dataQuantums) + { + var self = this; + + do_check_eq(bytes, + dataQuantums.reduce(function(partial, current) + { + return partial + current.length; + }, 0), + "bytes/quantums mismatch"); + + function increaseSinkSpaceTask() + { + /* Now do the actual work to trigger the interceptor. */ + self._sink.makeWritable(bytes); + } + + this._waitForHelper("increaseSinkSpaceTask", + dataQuantums, increaseSinkSpaceTask); + }, + + /** + * Increases available space in the sink by the given amount, waits for the + * given series of arrays of bytes to be written to sink by the copier, and + * causes execution to asynchronously continue to the next task when the last + * of those arrays of bytes is received. + * + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + makeSinkWritableByIncrementsAndWaitFor: + function makeSinkWritableByIncrementsAndWaitFor(bytes, dataQuantums) + { + var self = this; + + var desiredAmounts = dataQuantums.map(function(v) { return v.length; }); + do_check_eq(bytes, sum(desiredAmounts), "bytes/quantums mismatch"); + + function increaseSinkSpaceByIncrementsTask() + { + /* Now do the actual work to trigger the interceptor incrementally. */ + self._sink.makeWritableByIncrements(desiredAmounts); + } + + this._waitForHelper("increaseSinkSpaceByIncrementsTask", + dataQuantums, increaseSinkSpaceByIncrementsTask); + }, + + /** + * Close the copier's source stream, then asynchronously continue to the next + * task. + * + * @param status : nsresult + * the status to provide when closing the copier's source stream + */ + closeSource: function closeSource(status) + { + var self = this; + + this._addToTasks(function closeSourceTask() + { + note("closeSourceTask"); + + self._source.closeWithStatus(status); + self._stageNextTask(); + }); + }, + + /** + * Close the copier's source stream, then wait for the given number of bytes + * and for the given series of arrays of bytes to be written to the sink, then + * asynchronously continue to the next task. + * + * @param status : nsresult + * the status to provide when closing the copier's source stream + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + closeSourceAndWaitFor: + function closeSourceAndWaitFor(status, bytes, dataQuantums) + { + var self = this; + + do_check_eq(bytes, sum(dataQuantums.map(function(v) { return v.length; })), + "bytes/quantums mismatch"); + + function closeSourceAndWaitForTask() + { + self._sink.makeWritable(bytes); + self._copyableDataStream.closeWithStatus(status); + } + + this._waitForHelper("closeSourceAndWaitForTask", + dataQuantums, closeSourceAndWaitForTask); + }, + + /** + * Closes the copier's sink stream, providing the given status, then + * asynchronously continue to the next task. + * + * @param status : nsresult + * the status to provide when closing the copier's sink stream + */ + closeSink: function closeSink(status) + { + var self = this; + this._addToTasks(function closeSinkTask() + { + note("closeSinkTask"); + + self._sink.closeWithStatus(status); + self._stageNextTask(); + }); + }, + + /** + * Closes the copier's source stream, then immediately closes the copier's + * sink stream, then asynchronously continues to the next task. + * + * @param sourceStatus : nsresult + * the status to provide when closing the copier's source stream + * @param sinkStatus : nsresult + * the status to provide when closing the copier's sink stream + */ + closeSourceThenSink: function closeSourceThenSink(sourceStatus, sinkStatus) + { + var self = this; + this._addToTasks(function closeSourceThenSinkTask() + { + note("closeSourceThenSinkTask"); + + self._source.closeWithStatus(sourceStatus); + self._sink.closeWithStatus(sinkStatus); + self._stageNextTask(); + }); + }, + + /** + * Closes the copier's sink stream, then immediately closes the copier's + * source stream, then asynchronously continues to the next task. + * + * @param sinkStatus : nsresult + * the status to provide when closing the copier's sink stream + * @param sourceStatus : nsresult + * the status to provide when closing the copier's source stream + */ + closeSinkThenSource: function closeSinkThenSource(sinkStatus, sourceStatus) + { + var self = this; + this._addToTasks(function closeSinkThenSourceTask() + { + note("closeSinkThenSource"); + + self._sink.closeWithStatus(sinkStatus); + self._source.closeWithStatus(sourceStatus); + self._stageNextTask(); + }); + }, + + /** + * Indicates that the given status is expected to be returned when the stream + * listener for the copy indicates completion, that the expected data copied + * by the copier to sink are the concatenation of the arrays of bytes in + * receivedData, and kicks off the tasks in this test. + * + * @param expectedStatus : nsresult + * the status expected to be returned by the copier at completion + * @param receivedData : [[uint]] + * an array containing arrays of bytes whose concatenation constitutes the + * expected copied data + */ + expect: function expect(expectedStatus, receivedData) + { + this._expectedStatus = expectedStatus; + this._expectedData = []; + for (var i = 0, sz = receivedData.length; i < sz; i++) + this._expectedData.push.apply(this._expectedData, receivedData[i]); + + this._stageNextTask(); + }, + + /** + * Sets up a stream interceptor that will verify that each piece of data + * written to the sink by the copier corresponds to the currently expected + * pieces of data, calls the trigger, then waits for those pieces of data to + * be received. Once all have been received, the interceptor is removed and + * the next task is asynchronously executed. + * + * @param name : string + * name of the task created by this, used in debugging output + * @param dataQuantums : [[uint]] + * array of expected arrays of bytes to be written to the sink by the copier + * @param trigger : function() : void + * function to call after setting up the interceptor to wait for + * notifications (which will be generated as a result of this function's + * actions) + */ + _waitForHelper: function _waitForHelper(name, dataQuantums, trigger) + { + var self = this; + this._addToTasks(function waitForHelperTask() + { + note(name); + + var quantumIndex = 0; + + /* + * Intercept all data-available notifications so we can continue when all + * the ones we expect have been received. + */ + var streamReadyCallback = + { + onInputStreamReady: function wrapperOnInputStreamReady(input) + { + dumpn("*** streamReadyCallback.onInputStreamReady" + + "(" + input.name + ")"); + + do_check_eq(this, streamReadyCallback, "sanity"); + + try + { + if (quantumIndex < dataQuantums.length) + { + var quantum = dataQuantums[quantumIndex++]; + var sz = quantum.length; + do_check_eq(self._lastQuantum.length, sz, + "different quantum lengths"); + for (var i = 0; i < sz; i++) + { + do_check_eq(self._lastQuantum[i], quantum[i], + "bad data at " + i); + } + + dumpn("*** waiting to check remaining " + + (dataQuantums.length - quantumIndex) + " quantums..."); + } + } + finally + { + if (quantumIndex === dataQuantums.length) + { + dumpn("*** data checks completed! next task..."); + self._copiedDataStream.removeStreamReadyInterceptor(); + self._stageNextTask(); + } + } + } + }; + + var interceptor = + createStreamReadyInterceptor(streamReadyCallback, "onInputStreamReady"); + self._copiedDataStream.interceptStreamReadyCallbacks(interceptor); + + /* Do the deed. */ + trigger(); + }); + }, + + /** + * Initiates asynchronous waiting for data written to the copier's sink to be + * available for reading from the input end of the sink's pipe. The callback + * stores the received data for comparison in the interceptor used in the + * callback added by _waitForHelper and signals test completion when it + * receives a zero-data-available notification (if the copier has notified + * that it is finished; otherwise allows execution to continue until that has + * occurred). + */ + _waitForWrittenData: function _waitForWrittenData() + { + dumpn("*** _waitForWrittenData (" + this.name + ")"); + + var self = this; + var outputWrittenWatcher = + { + onInputStreamReady: function onInputStreamReady(input) + { + dumpn("*** outputWrittenWatcher.onInputStreamReady" + + "(" + input.name + ")"); + + if (self._allDataWritten) + { + do_throw("ruh-roh! why are we getting notified of more data " + + "after we should have received all of it?"); + } + + self._waitingForData = false; + + try + { + var avail = input.available(); + } + catch (e) + { + dumpn("*** available() threw! error: " + e); + if (self._completed) + { + dumpn("*** NB: this isn't a problem, because we've copied " + + "completely now, and this notify may have been expedited " + + "by maybeNotifyFinally such that we're being called when " + + "we can *guarantee* nothing is available any more"); + } + avail = 0; + } + + if (avail > 0) + { + var data = input.readByteArray(avail); + do_check_eq(data.length, avail, + "readByteArray returned wrong number of bytes?"); + self._lastQuantum = data; + self._receivedData.push.apply(self._receivedData, data); + } + + if (avail === 0) + { + dumpn("*** all data received!"); + + self._allDataWritten = true; + + if (self._copyingFinished) + { + dumpn("*** copying already finished, continuing to next test"); + self._testComplete(); + } + else + { + dumpn("*** copying not finished, waiting for that to happen"); + } + + return; + } + + self._waitForWrittenData(); + } + }; + + this._copiedDataStream.asyncWait(outputWrittenWatcher, 0, 1, + gThreadManager.currentThread); + this._waitingForData = true; + }, + + /** + * Indicates this test is complete, does the final data-received and copy + * status comparisons, and calls the test-completion function provided when + * this test was first created. + */ + _testComplete: function _testComplete() + { + dumpn("*** CopyTest(" + this.name + ") complete! " + + "On to the next test..."); + + try + { + do_check_true(this._allDataWritten, "expect all data written now!"); + do_check_true(this._copyingFinished, "expect copying finished now!"); + + do_check_eq(this._actualStatus, this._expectedStatus, + "wrong final status"); + + var expected = this._expectedData, received = this._receivedData; + dumpn("received: [" + received + "], expected: [" + expected + "]"); + do_check_eq(received.length, expected.length, "wrong data"); + for (var i = 0, sz = expected.length; i < sz; i++) + do_check_eq(received[i], expected[i], "bad data at " + i); + } + catch (e) + { + dumpn("!!! ERROR PERFORMING FINAL " + this.name + " CHECKS! " + e); + throw e; + } + finally + { + dumpn("*** CopyTest(" + this.name + ") complete! " + + "Invoking test-completion callback..."); + this._done(); + } + }, + + /** Dispatches an event at this thread which will run the next task. */ + _stageNextTask: function _stageNextTask() + { + dumpn("*** CopyTest(" + this.name + ")._stageNextTask()"); + + if (this._currentTask === this._tasks.length) + { + dumpn("*** CopyTest(" + this.name + ") tasks complete!"); + return; + } + + var task = this._tasks[this._currentTask++]; + var self = this; + var event = + { + run: function run() + { + try + { + task(); + } + catch (e) + { + do_throw("exception thrown running task: " + e); + } + } + }; + gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + }, + + /** + * Adds the given function as a task to be run at a later time. + * + * @param task : function() : void + * the function to call as a task + */ + _addToTasks: function _addToTasks(task) + { + this._tasks.push(task); + }, + + // + // see nsIRequestObserver.onStartRequest + // + onStartRequest: function onStartRequest(self, _) + { + dumpn("*** CopyTest.onStartRequest (" + self.name + ")"); + + do_check_true(_ === null); + do_check_eq(this._receivedData.length, 0); + do_check_eq(this._lastQuantum.length, 0); + }, + + // + // see nsIRequestObserver.onStopRequest + // + onStopRequest: function onStopRequest(self, _, status) + { + dumpn("*** CopyTest.onStopRequest (" + self.name + ", " + status + ")"); + + do_check_true(_ === null); + this._actualStatus = status; + + this._copyingFinished = true; + + if (this._allDataWritten) + { + dumpn("*** all data written, continuing with remaining tests..."); + this._testComplete(); + } + else + { + /* + * Everything's copied as far as the copier is concerned. However, there + * may be a backup transferring from the output end of the copy sink to + * the input end where we can actually verify that the expected data was + * written as expected, because that transfer occurs asynchronously. If + * we do final data-received checks now, we'll miss still-pending data. + * Therefore, to wrap up this copy test we still need to asynchronously + * wait on the input end of the sink until we hit end-of-stream or some + * error condition. Then we know we're done and can continue with the + * next test. + */ + dumpn("*** not all data copied, waiting for that to happen..."); + + if (!this._waitingForData) + this._waitForWrittenData(); + + this._copiedDataStream.maybeNotifyFinally(); + } + } +}; |