summaryrefslogtreecommitdiff
path: root/netwerk/test/httpserver/test/test_async_response_sending.js
diff options
context:
space:
mode:
Diffstat (limited to 'netwerk/test/httpserver/test/test_async_response_sending.js')
-rw-r--r--netwerk/test/httpserver/test/test_async_response_sending.js1683
1 files changed, 1683 insertions, 0 deletions
diff --git a/netwerk/test/httpserver/test/test_async_response_sending.js b/netwerk/test/httpserver/test/test_async_response_sending.js
new file mode 100644
index 0000000000..84ec74daf4
--- /dev/null
+++ b/netwerk/test/httpserver/test/test_async_response_sending.js
@@ -0,0 +1,1683 @@
+/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim:set ts=2 sw=2 sts=2 et: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/*
+ * Ensures that data a request handler writes out in response is sent only as
+ * quickly as the client can receive it, without racing ahead and being forced
+ * to block while writing that data.
+ *
+ * NB: These tests are extremely tied to the current implementation, in terms of
+ * when and how stream-ready notifications occur, the amount of data which will
+ * be read or written at each notification, and so on. If the implementation
+ * changes in any way with respect to stream copying, this test will probably
+ * have to change a little at the edges as well.
+ */
+
+gThreadManager = Cc["@mozilla.org/thread-manager;1"].createInstance();
+
+function run_test()
+{
+ do_test_pending();
+ tests.push(function testsComplete(_)
+ {
+ dumpn("******************\n" +
+ "* TESTS COMPLETE *\n" +
+ "******************");
+ do_test_finished();
+ });
+
+ runNextTest();
+}
+
+function runNextTest()
+{
+ testIndex++;
+ dumpn("*** runNextTest(), testIndex: " + testIndex);
+
+ try
+ {
+ var test = tests[testIndex];
+ test(runNextTest);
+ }
+ catch (e)
+ {
+ var msg = "exception running test " + testIndex + ": " + e;
+ if (e && "stack" in e)
+ msg += "\nstack follows:\n" + e.stack;
+ do_throw(msg);
+ }
+}
+
+
+/*************
+ * TEST DATA *
+ *************/
+
+const NOTHING = [];
+
+const FIRST_SEGMENT = [1, 2, 3, 4];
+const SECOND_SEGMENT = [5, 6, 7, 8];
+const THIRD_SEGMENT = [9, 10, 11, 12];
+
+const SEGMENT = FIRST_SEGMENT;
+const TWO_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8];
+const THREE_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
+
+const SEGMENT_AND_HALF = [1, 2, 3, 4, 5, 6];
+
+const QUARTER_SEGMENT = [1];
+const HALF_SEGMENT = [1, 2];
+const SECOND_HALF_SEGMENT = [3, 4];
+const THREE_QUARTER_SEGMENT = [1, 2, 3];
+const EXTRA_HALF_SEGMENT = [5, 6];
+const MIDDLE_HALF_SEGMENT = [2, 3];
+const LAST_QUARTER_SEGMENT = [4];
+const FOURTH_HALF_SEGMENT = [7, 8];
+const HALF_THIRD_SEGMENT = [9, 10];
+const LATTER_HALF_THIRD_SEGMENT = [11, 12];
+
+const TWO_HALF_SEGMENTS = [1, 2, 1, 2];
+
+
+/*********
+ * TESTS *
+ *********/
+
+var tests =
+ [
+ sourceClosedWithoutWrite,
+ writeOneSegmentThenClose,
+ simpleWriteThenRead,
+ writeLittleBeforeReading,
+ writeMultipleSegmentsThenRead,
+ writeLotsBeforeReading,
+ writeLotsBeforeReading2,
+ writeThenReadPartial,
+ manyPartialWrites,
+ partialRead,
+ partialWrite,
+ sinkClosedImmediately,
+ sinkClosedWithReadableData,
+ sinkClosedAfterWrite,
+ sourceAndSinkClosed,
+ sinkAndSourceClosed,
+ sourceAndSinkClosedWithPendingData,
+ sinkAndSourceClosedWithPendingData,
+ ];
+var testIndex = -1;
+
+function sourceClosedWithoutWrite(next)
+{
+ var t = new CopyTest("sourceClosedWithoutWrite", next);
+
+ t.closeSource(Cr.NS_OK);
+ t.expect(Cr.NS_OK, [NOTHING]);
+}
+
+function writeOneSegmentThenClose(next)
+{
+ var t = new CopyTest("writeLittleBeforeReading", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.closeSource(Cr.NS_OK);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.expect(Cr.NS_OK, [SEGMENT]);
+}
+
+function simpleWriteThenRead(next)
+{
+ var t = new CopyTest("simpleWriteThenRead", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.closeSource(Cr.NS_OK);
+ t.expect(Cr.NS_OK, [SEGMENT]);
+}
+
+function writeLittleBeforeReading(next)
+{
+ var t = new CopyTest("writeLittleBeforeReading", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.closeSource(Cr.NS_OK);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.expect(Cr.NS_OK, [SEGMENT, SEGMENT]);
+}
+
+function writeMultipleSegmentsThenRead(next)
+{
+ var t = new CopyTest("writeMultipleSegmentsThenRead", next);
+
+ t.addToSource(TWO_SEGMENTS);
+ t.makeSourceReadable(TWO_SEGMENTS.length);
+ t.makeSinkWritableAndWaitFor(TWO_SEGMENTS.length,
+ [FIRST_SEGMENT, SECOND_SEGMENT]);
+ t.closeSource(Cr.NS_OK);
+ t.expect(Cr.NS_OK, [TWO_SEGMENTS]);
+}
+
+function writeLotsBeforeReading(next)
+{
+ var t = new CopyTest("writeLotsBeforeReading", next);
+
+ t.addToSource(TWO_SEGMENTS);
+ t.makeSourceReadable(TWO_SEGMENTS.length);
+ t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]);
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.closeSource(Cr.NS_OK);
+ t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]);
+ t.expect(Cr.NS_OK, [TWO_SEGMENTS, SEGMENT, SEGMENT]);
+}
+
+function writeLotsBeforeReading2(next)
+{
+ var t = new CopyTest("writeLotsBeforeReading", next);
+
+ t.addToSource(THREE_SEGMENTS);
+ t.makeSourceReadable(THREE_SEGMENTS.length);
+ t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]);
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(THIRD_SEGMENT.length, [THIRD_SEGMENT]);
+ t.closeSource(Cr.NS_OK);
+ t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]);
+ t.expect(Cr.NS_OK, [THREE_SEGMENTS, SEGMENT, SEGMENT]);
+}
+
+function writeThenReadPartial(next)
+{
+ var t = new CopyTest("writeThenReadPartial", next);
+
+ t.addToSource(SEGMENT_AND_HALF);
+ t.makeSourceReadable(SEGMENT_AND_HALF.length);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.closeSource(Cr.NS_OK);
+ t.makeSinkWritableAndWaitFor(EXTRA_HALF_SEGMENT.length, [EXTRA_HALF_SEGMENT]);
+ t.expect(Cr.NS_OK, [SEGMENT_AND_HALF]);
+}
+
+function manyPartialWrites(next)
+{
+ var t = new CopyTest("manyPartialWrites", next);
+
+ t.addToSource(HALF_SEGMENT);
+ t.makeSourceReadable(HALF_SEGMENT.length);
+
+ t.addToSource(HALF_SEGMENT);
+ t.makeSourceReadable(HALF_SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(2 * HALF_SEGMENT.length, [TWO_HALF_SEGMENTS]);
+ t.closeSource(Cr.NS_OK);
+ t.expect(Cr.NS_OK, [TWO_HALF_SEGMENTS]);
+}
+
+function partialRead(next)
+{
+ var t = new CopyTest("partialRead", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.addToSource(HALF_SEGMENT);
+ t.makeSourceReadable(HALF_SEGMENT.length);
+ t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
+ t.closeSourceAndWaitFor(Cr.NS_OK, HALF_SEGMENT.length, [HALF_SEGMENT]);
+ t.expect(Cr.NS_OK, [SEGMENT, HALF_SEGMENT]);
+}
+
+function partialWrite(next)
+{
+ var t = new CopyTest("partialWrite", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length,
+ [QUARTER_SEGMENT,
+ MIDDLE_HALF_SEGMENT,
+ LAST_QUARTER_SEGMENT]);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length,
+ [HALF_SEGMENT, SECOND_HALF_SEGMENT]);
+
+ t.addToSource(THREE_SEGMENTS);
+ t.makeSourceReadable(THREE_SEGMENTS.length);
+ t.makeSinkWritableByIncrementsAndWaitFor(THREE_SEGMENTS.length,
+ [HALF_SEGMENT, SECOND_HALF_SEGMENT,
+ SECOND_SEGMENT,
+ HALF_THIRD_SEGMENT,
+ LATTER_HALF_THIRD_SEGMENT]);
+
+ t.closeSource(Cr.NS_OK);
+ t.expect(Cr.NS_OK, [SEGMENT, SEGMENT, THREE_SEGMENTS]);
+}
+
+function sinkClosedImmediately(next)
+{
+ var t = new CopyTest("sinkClosedImmediately", next);
+
+ t.closeSink(Cr.NS_OK);
+ t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]);
+}
+
+function sinkClosedWithReadableData(next)
+{
+ var t = new CopyTest("sinkClosedWithReadableData", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+ t.closeSink(Cr.NS_OK);
+ t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]);
+}
+
+function sinkClosedAfterWrite(next)
+{
+ var t = new CopyTest("sinkClosedAfterWrite", next);
+
+ t.addToSource(TWO_SEGMENTS);
+ t.makeSourceReadable(TWO_SEGMENTS.length);
+ t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
+ t.closeSink(Cr.NS_OK);
+ t.expect(Cr.NS_ERROR_UNEXPECTED, [FIRST_SEGMENT]);
+}
+
+function sourceAndSinkClosed(next)
+{
+ var t = new CopyTest("sourceAndSinkClosed", next);
+
+ t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK);
+ t.expect(Cr.NS_OK, []);
+}
+
+function sinkAndSourceClosed(next)
+{
+ var t = new CopyTest("sinkAndSourceClosed", next);
+
+ t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK);
+
+ // sink notify received first, hence error
+ t.expect(Cr.NS_ERROR_UNEXPECTED, []);
+}
+
+function sourceAndSinkClosedWithPendingData(next)
+{
+ var t = new CopyTest("sourceAndSinkClosedWithPendingData", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+
+ t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK);
+
+ // not all data from source copied, so error
+ t.expect(Cr.NS_ERROR_UNEXPECTED, []);
+}
+
+function sinkAndSourceClosedWithPendingData(next)
+{
+ var t = new CopyTest("sinkAndSourceClosedWithPendingData", next);
+
+ t.addToSource(SEGMENT);
+ t.makeSourceReadable(SEGMENT.length);
+
+ t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK);
+
+ // not all data from source copied, plus sink notify received first, so error
+ t.expect(Cr.NS_ERROR_UNEXPECTED, []);
+}
+
+
+/*************
+ * UTILITIES *
+ *************/
+
+/** Returns the sum of the elements in arr. */
+function sum(arr)
+{
+ var sum = 0;
+ for (var i = 0, sz = arr.length; i < sz; i++)
+ sum += arr[i];
+ return sum;
+}
+
+/**
+ * Returns a constructor for an input or output stream callback that will wrap
+ * the one provided to it as an argument.
+ *
+ * @param wrapperCallback : (nsIInputStreamCallback | nsIOutputStreamCallback) : void
+ * the original callback object (not a function!) being wrapped
+ * @param name : string
+ * either "onInputStreamReady" if we're wrapping an input stream callback or
+ * "onOutputStreamReady" if we're wrapping an output stream callback
+ * @returns function(nsIInputStreamCallback | nsIOutputStreamCallback) : (nsIInputStreamCallback | nsIOutputStreamCallback)
+ * a constructor function which constructs a callback object (not function!)
+ * which, when called, first calls the original callback provided to it and
+ * then calls wrapperCallback
+ */
+function createStreamReadyInterceptor(wrapperCallback, name)
+{
+ return function StreamReadyInterceptor(callback)
+ {
+ this.wrappedCallback = callback;
+ this[name] = function streamReadyInterceptor(stream)
+ {
+ dumpn("*** StreamReadyInterceptor." + name);
+
+ try
+ {
+ dumpn("*** calling original " + name + "...");
+ callback[name](stream);
+ }
+ catch (e)
+ {
+ dumpn("!!! error running inner callback: " + e);
+ throw e;
+ }
+ finally
+ {
+ dumpn("*** calling wrapper " + name + "...");
+ wrapperCallback[name](stream);
+ }
+ }
+ };
+}
+
+/**
+ * Print out a banner with the given message, uppercased, for debugging
+ * purposes.
+ */
+function note(m)
+{
+ m = m.toUpperCase();
+ var asterisks = Array(m.length + 1 + 4).join("*");
+ dumpn(asterisks + "\n* " + m + " *\n" + asterisks);
+}
+
+
+/***********
+ * MOCKERY *
+ ***********/
+
+/*
+ * Blatantly violate abstractions in the name of testability. THIS IS NOT
+ * PUBLIC API! If you use any of these I will knowingly break your code by
+ * changing the names of variables and properties.
+ */
+var BinaryInputStream = function BIS(stream) { return stream; };
+var BinaryOutputStream = function BOS(stream) { return stream; };
+Response.SEGMENT_SIZE = SEGMENT.length;
+
+/**
+ * Roughly mocks an nsIPipe, presenting non-blocking input and output streams
+ * that appear to also be binary streams and whose readability and writability
+ * amounts are configurable. Only the methods used in this test have been
+ * implemented -- these aren't exact mocks (can't be, actually, because input
+ * streams have unscriptable methods).
+ *
+ * @param name : string
+ * a name for this pipe, used in debugging output
+ */
+function CustomPipe(name)
+{
+ var self = this;
+
+ /** Data read from input that's buffered until it can be written to output. */
+ this._data = [];
+
+ /**
+ * The status of this pipe, which is to say the error result the ends of this
+ * pipe will return when attempts are made to use them. This value is always
+ * an error result when copying has finished, because success codes are
+ * converted to NS_BASE_STREAM_CLOSED.
+ */
+ this._status = Cr.NS_OK;
+
+ /** The input end of this pipe. */
+ var input = this.inputStream =
+ {
+ /** A name for this stream, used in debugging output. */
+ name: name + " input",
+
+ /**
+ * The number of bytes of data available to be read from this pipe, or
+ * Infinity if any amount of data in this pipe is made readable as soon as
+ * it is written to the pipe output.
+ */
+ _readable: 0,
+
+ /**
+ * Data regarding a pending stream-ready callback on this, or null if no
+ * callback is currently waiting to be called.
+ */
+ _waiter: null,
+
+ /**
+ * The event currently dispatched to make a stream-ready callback, if any
+ * such callback is currently ready to be made and not already in
+ * progress, or null when no callback is waiting to happen.
+ */
+ _event: null,
+
+ /**
+ * A stream-ready constructor to wrap an existing callback to intercept
+ * stream-ready notifications, or null if notifications shouldn't be
+ * wrapped at all.
+ */
+ _streamReadyInterceptCreator: null,
+
+ /**
+ * Registers a stream-ready wrapper creator function so that a
+ * stream-ready callback made in the future can be wrapped.
+ */
+ interceptStreamReadyCallbacks: function(streamReadyInterceptCreator)
+ {
+ dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks");
+
+ do_check_true(this._streamReadyInterceptCreator === null,
+ "intercepting twice");
+ this._streamReadyInterceptCreator = streamReadyInterceptCreator;
+ if (this._waiter)
+ {
+ this._waiter.callback =
+ new streamReadyInterceptCreator(this._waiter.callback);
+ }
+ },
+
+ /**
+ * Removes a previously-registered stream-ready wrapper creator function,
+ * also clearing any current wrapping.
+ */
+ removeStreamReadyInterceptor: function()
+ {
+ dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()");
+
+ do_check_true(this._streamReadyInterceptCreator !== null,
+ "removing interceptor when none present?");
+ this._streamReadyInterceptCreator = null;
+ if (this._waiter)
+ this._waiter.callback = this._waiter.callback.wrappedCallback;
+ },
+
+ //
+ // see nsIAsyncInputStream.asyncWait
+ //
+ asyncWait: function asyncWait(callback, flags, requestedCount, target)
+ {
+ dumpn("*** [" + this.name + "].asyncWait");
+
+ do_check_true(callback && typeof callback !== "function");
+
+ var closureOnly =
+ (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0;
+
+ do_check_true(this._waiter === null ||
+ (this._waiter.closureOnly && !closureOnly),
+ "asyncWait already called with a non-closure-only " +
+ "callback? unexpected!");
+
+ this._waiter =
+ {
+ callback:
+ this._streamReadyInterceptCreator
+ ? new this._streamReadyInterceptCreator(callback)
+ : callback,
+ closureOnly: closureOnly,
+ requestedCount: requestedCount,
+ eventTarget: target
+ };
+
+ if (!Components.isSuccessCode(self._status) ||
+ (!closureOnly && this._readable >= requestedCount &&
+ self._data.length >= requestedCount))
+ {
+ this._notify();
+ }
+ },
+
+ //
+ // see nsIAsyncInputStream.closeWithStatus
+ //
+ closeWithStatus: function closeWithStatus(status)
+ {
+ dumpn("*** [" + this.name + "].closeWithStatus" +
+ "(" + status + ")");
+
+ if (!Components.isSuccessCode(self._status))
+ {
+ dumpn("*** ignoring second closure of [input " + this.name + "] " +
+ "(status " + self._status + ")");
+ return;
+ }
+
+ if (Components.isSuccessCode(status))
+ status = Cr.NS_BASE_STREAM_CLOSED;
+
+ self._status = status;
+
+ if (this._waiter)
+ this._notify();
+ if (output._waiter)
+ output._notify();
+ },
+
+ //
+ // see nsIBinaryInputStream.readByteArray
+ //
+ readByteArray: function readByteArray(count)
+ {
+ dumpn("*** [" + this.name + "].readByteArray(" + count + ")");
+
+ if (self._data.length === 0)
+ {
+ throw Components.isSuccessCode(self._status)
+ ? Cr.NS_BASE_STREAM_WOULD_BLOCK
+ : self._status;
+ }
+
+ do_check_true(this._readable <= self._data.length ||
+ this._readable === Infinity,
+ "consistency check");
+
+ if (this._readable < count || self._data.length < count)
+ throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
+ this._readable -= count;
+ return self._data.splice(0, count);
+ },
+
+ /**
+ * Makes the given number of additional bytes of data previously written
+ * to the pipe's output stream available for reading, triggering future
+ * notifications when required.
+ *
+ * @param count : uint
+ * the number of bytes of additional data to make available; must not be
+ * greater than the number of bytes already buffered but not made
+ * available by previous makeReadable calls
+ */
+ makeReadable: function makeReadable(count)
+ {
+ dumpn("*** [" + this.name + "].makeReadable(" + count + ")");
+
+ do_check_true(Components.isSuccessCode(self._status), "errant call");
+ do_check_true(this._readable + count <= self._data.length ||
+ this._readable === Infinity,
+ "increasing readable beyond written amount");
+
+ this._readable += count;
+
+ dumpn("readable: " + this._readable + ", data: " + self._data);
+
+ var waiter = this._waiter;
+ if (waiter !== null)
+ {
+ if (waiter.requestedCount <= this._readable && !waiter.closureOnly)
+ this._notify();
+ }
+ },
+
+ /**
+ * Disables the readability limit on this stream, meaning that as soon as
+ * *any* amount of data is written to output it becomes available from
+ * this stream and a stream-ready event is dispatched (if any stream-ready
+ * callback is currently set).
+ */
+ disableReadabilityLimit: function disableReadabilityLimit()
+ {
+ dumpn("*** [" + this.name + "].disableReadabilityLimit()");
+
+ this._readable = Infinity;
+ },
+
+ //
+ // see nsIInputStream.available
+ //
+ available: function available()
+ {
+ dumpn("*** [" + this.name + "].available()");
+
+ if (self._data.length === 0 && !Components.isSuccessCode(self._status))
+ throw self._status;
+
+ return Math.min(this._readable, self._data.length);
+ },
+
+ /**
+ * Dispatches a pending stream-ready event ahead of schedule, rather than
+ * waiting for it to be dispatched in response to normal writes. This is
+ * useful when writing to the output has completed, and we need to have
+ * read all data written to this stream. If the output isn't closed and
+ * the reading of data from this races ahead of the last write to output,
+ * we need a notification to know when everything that's been written has
+ * been read. This ordinarily might be supplied by closing output, but
+ * in some cases it's not desirable to close output, so this supplies an
+ * alternative method to get notified when the last write has occurred.
+ */
+ maybeNotifyFinally: function maybeNotifyFinally()
+ {
+ dumpn("*** [" + this.name + "].maybeNotifyFinally()");
+
+ do_check_true(this._waiter !== null, "must be waiting now");
+
+ if (self._data.length > 0)
+ {
+ dumpn("*** data still pending, normal notifications will signal " +
+ "completion");
+ return;
+ }
+
+ // No data waiting to be written, so notify. We could just close the
+ // stream, but that's less faithful to the server's behavior (it doesn't
+ // close the stream, and we're pretending to impersonate the server as
+ // much as we can here), so instead we're going to notify when no data
+ // can be read. The CopyTest has already been flagged as complete, so
+ // the stream listener will detect that this is a wrap-it-up notify and
+ // invoke the next test.
+ this._notify();
+ },
+
+ /**
+ * Dispatches an event to call a previously-registered stream-ready
+ * callback.
+ */
+ _notify: function _notify()
+ {
+ dumpn("*** [" + this.name + "]._notify()");
+
+ var waiter = this._waiter;
+ do_check_true(waiter !== null, "no waiter?");
+
+ if (this._event === null)
+ {
+ var event = this._event =
+ {
+ run: function run()
+ {
+ input._waiter = null;
+ input._event = null;
+ try
+ {
+ do_check_true(!Components.isSuccessCode(self._status) ||
+ input._readable >= waiter.requestedCount);
+ waiter.callback.onInputStreamReady(input);
+ }
+ catch (e)
+ {
+ do_throw("error calling onInputStreamReady: " + e);
+ }
+ }
+ };
+ waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
+ }
+ },
+
+ QueryInterface: function QueryInterface(iid)
+ {
+ if (iid.equals(Ci.nsIAsyncInputStream) ||
+ iid.equals(Ci.nsIInputStream) ||
+ iid.equals(Ci.nsISupports))
+ {
+ return this;
+ }
+
+ throw Cr.NS_ERROR_NO_INTERFACE;
+ }
+ };
+
+ /** The output end of this pipe. */
+ var output = this.outputStream =
+ {
+ /** A name for this stream, used in debugging output. */
+ name: name + " output",
+
+ /**
+ * The number of bytes of data which may be written to this pipe without
+ * blocking.
+ */
+ _writable: 0,
+
+ /**
+ * The increments in which pending data should be written, rather than
+ * simply defaulting to the amount requested (which, given that
+ * input.asyncWait precisely respects the requestedCount argument, will
+ * ordinarily always be writable in that amount), as an array whose
+ * elements from start to finish are the number of bytes to write each
+ * time write() or writeByteArray() is subsequently called. The sum of
+ * the values in this array, if this array is not empty, is always equal
+ * to this._writable.
+ */
+ _writableAmounts: [],
+
+ /**
+ * Data regarding a pending stream-ready callback on this, or null if no
+ * callback is currently waiting to be called.
+ */
+ _waiter: null,
+
+ /**
+ * The event currently dispatched to make a stream-ready callback, if any
+ * such callback is currently ready to be made and not already in
+ * progress, or null when no callback is waiting to happen.
+ */
+ _event: null,
+
+ /**
+ * A stream-ready constructor to wrap an existing callback to intercept
+ * stream-ready notifications, or null if notifications shouldn't be
+ * wrapped at all.
+ */
+ _streamReadyInterceptCreator: null,
+
+ /**
+ * Registers a stream-ready wrapper creator function so that a
+ * stream-ready callback made in the future can be wrapped.
+ */
+ interceptStreamReadyCallbacks: function(streamReadyInterceptCreator)
+ {
+ dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks");
+
+ do_check_true(this._streamReadyInterceptCreator !== null,
+ "intercepting onOutputStreamReady twice");
+ this._streamReadyInterceptCreator = streamReadyInterceptCreator;
+ if (this._waiter)
+ {
+ this._waiter.callback =
+ new streamReadyInterceptCreator(this._waiter.callback);
+ }
+ },
+
+ /**
+ * Removes a previously-registered stream-ready wrapper creator function,
+ * also clearing any current wrapping.
+ */
+ removeStreamReadyInterceptor: function()
+ {
+ dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()");
+
+ do_check_true(this._streamReadyInterceptCreator !== null,
+ "removing interceptor when none present?");
+ this._streamReadyInterceptCreator = null;
+ if (this._waiter)
+ this._waiter.callback = this._waiter.callback.wrappedCallback;
+ },
+
+ //
+ // see nsIAsyncOutputStream.asyncWait
+ //
+ asyncWait: function asyncWait(callback, flags, requestedCount, target)
+ {
+ dumpn("*** [" + this.name + "].asyncWait");
+
+ do_check_true(callback && typeof callback !== "function");
+
+ var closureOnly =
+ (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0;
+
+ do_check_true(this._waiter === null ||
+ (this._waiter.closureOnly && !closureOnly),
+ "asyncWait already called with a non-closure-only " +
+ "callback? unexpected!");
+
+ this._waiter =
+ {
+ callback:
+ this._streamReadyInterceptCreator
+ ? new this._streamReadyInterceptCreator(callback)
+ : callback,
+ closureOnly: closureOnly,
+ requestedCount: requestedCount,
+ eventTarget: target,
+ toString: function toString()
+ {
+ return "waiter(" + (closureOnly ? "closure only, " : "") +
+ "requestedCount: " + requestedCount + ", target: " +
+ target + ")";
+ }
+ };
+
+ if ((!closureOnly && this._writable >= requestedCount) ||
+ !Components.isSuccessCode(this.status))
+ {
+ this._notify();
+ }
+ },
+
+ //
+ // see nsIAsyncOutputStream.closeWithStatus
+ //
+ closeWithStatus: function closeWithStatus(status)
+ {
+ dumpn("*** [" + this.name + "].closeWithStatus(" + status + ")");
+
+ if (!Components.isSuccessCode(self._status))
+ {
+ dumpn("*** ignoring redundant closure of [input " + this.name + "] " +
+ "because it's already closed (status " + self._status + ")");
+ return;
+ }
+
+ if (Components.isSuccessCode(status))
+ status = Cr.NS_BASE_STREAM_CLOSED;
+
+ self._status = status;
+
+ if (input._waiter)
+ input._notify();
+ if (this._waiter)
+ this._notify();
+ },
+
+ //
+ // see nsIBinaryOutputStream.writeByteArray
+ //
+ writeByteArray: function writeByteArray(bytes, length)
+ {
+ dumpn("*** [" + this.name + "].writeByteArray" +
+ "([" + bytes + "], " + length + ")");
+
+ do_check_eq(bytes.length, length, "sanity");
+ if (!Components.isSuccessCode(self._status))
+ throw self._status;
+
+ do_check_eq(this._writableAmounts.length, 0,
+ "writeByteArray can't support specified-length writes");
+
+ if (this._writable < length)
+ throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
+
+ self._data.push.apply(self._data, bytes);
+ this._writable -= length;
+
+ if (input._readable === Infinity && input._waiter &&
+ !input._waiter.closureOnly)
+ {
+ input._notify();
+ }
+ },
+
+ //
+ // see nsIOutputStream.write
+ //
+ write: function write(str, length)
+ {
+ dumpn("*** [" + this.name + "].write");
+
+ do_check_eq(str.length, length, "sanity");
+ if (!Components.isSuccessCode(self._status))
+ throw self._status;
+ if (this._writable === 0)
+ throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
+
+ var actualWritten;
+ if (this._writableAmounts.length === 0)
+ {
+ actualWritten = Math.min(this._writable, length);
+ }
+ else
+ {
+ do_check_true(this._writable >= this._writableAmounts[0],
+ "writable amounts value greater than writable data?");
+ do_check_eq(this._writable, sum(this._writableAmounts),
+ "total writable amount not equal to sum of writable " +
+ "increments");
+ actualWritten = this._writableAmounts.shift();
+ }
+
+ var bytes = str.substring(0, actualWritten)
+ .split("")
+ .map(function(v) { return v.charCodeAt(0); });
+
+ self._data.push.apply(self._data, bytes);
+ this._writable -= actualWritten;
+
+ if (input._readable === Infinity && input._waiter &&
+ !input._waiter.closureOnly)
+ {
+ input._notify();
+ }
+
+ return actualWritten;
+ },
+
+ /**
+ * Increase the amount of data that can be written without blocking by the
+ * given number of bytes, triggering future notifications when required.
+ *
+ * @param count : uint
+ * the number of bytes of additional data to make writable
+ */
+ makeWritable: function makeWritable(count)
+ {
+ dumpn("*** [" + this.name + "].makeWritable(" + count + ")");
+
+ do_check_true(Components.isSuccessCode(self._status));
+
+ this._writable += count;
+
+ var waiter = this._waiter;
+ if (waiter && !waiter.closureOnly &&
+ waiter.requestedCount <= this._writable)
+ {
+ this._notify();
+ }
+ },
+
+ /**
+ * Increase the amount of data that can be written without blocking, but
+ * do so by specifying a number of bytes that will be written each time
+ * a write occurs, even as asyncWait notifications are initially triggered
+ * as usual. Thus, rather than writes eagerly writing everything possible
+ * at each step, attempts to write out data by segment devolve into a
+ * partial segment write, then another, and so on until the amount of data
+ * specified as permitted to be written, has been written.
+ *
+ * Note that the writeByteArray method is incompatible with the previous
+ * calling of this method, in that, until all increments provided to this
+ * method have been consumed, writeByteArray cannot be called. Once all
+ * increments have been consumed, writeByteArray may again be called.
+ *
+ * @param increments : [uint]
+ * an array whose elements are positive numbers of bytes to permit to be
+ * written each time write() is subsequently called on this, ignoring
+ * the total amount of writable space specified by the sum of all
+ * increments
+ */
+ makeWritableByIncrements: function makeWritableByIncrements(increments)
+ {
+ dumpn("*** [" + this.name + "].makeWritableByIncrements" +
+ "([" + increments.join(", ") + "])");
+
+ do_check_true(increments.length > 0, "bad increments");
+ do_check_true(increments.every(function(v) { return v > 0; }),
+ "zero increment?");
+
+ do_check_true(Components.isSuccessCode(self._status));
+
+ this._writable += sum(increments);
+ this._writableAmounts = increments;
+
+ var waiter = this._waiter;
+ if (waiter && !waiter.closureOnly &&
+ waiter.requestedCount <= this._writable)
+ {
+ this._notify();
+ }
+ },
+
+ /**
+ * Dispatches an event to call a previously-registered stream-ready
+ * callback.
+ */
+ _notify: function _notify()
+ {
+ dumpn("*** [" + this.name + "]._notify()");
+
+ var waiter = this._waiter;
+ do_check_true(waiter !== null, "no waiter?");
+
+ if (this._event === null)
+ {
+ var event = this._event =
+ {
+ run: function run()
+ {
+ output._waiter = null;
+ output._event = null;
+
+ try
+ {
+ waiter.callback.onOutputStreamReady(output);
+ }
+ catch (e)
+ {
+ do_throw("error calling onOutputStreamReady: " + e);
+ }
+ }
+ };
+ waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
+ }
+ },
+
+ QueryInterface: function QueryInterface(iid)
+ {
+ if (iid.equals(Ci.nsIAsyncOutputStream) ||
+ iid.equals(Ci.nsIOutputStream) ||
+ iid.equals(Ci.nsISupports))
+ {
+ return this;
+ }
+
+ throw Cr.NS_ERROR_NO_INTERFACE;
+ }
+ };
+}
+
+/**
+ * Represents a sequence of interactions to perform with a copier, in a given
+ * order and at the desired time intervals.
+ *
+ * @param name : string
+ * test name, used in debugging output
+ */
+function CopyTest(name, next)
+{
+ /** Name used in debugging output. */
+ this.name = name;
+
+ /** A function called when the test completes. */
+ this._done = next;
+
+ var sourcePipe = new CustomPipe(name + "-source");
+
+ /** The source of data for the copier to copy. */
+ this._source = sourcePipe.inputStream;
+
+ /**
+ * The sink to which to write data which will appear in the copier's source.
+ */
+ this._copyableDataStream = sourcePipe.outputStream;
+
+ var sinkPipe = new CustomPipe(name + "-sink");
+
+ /** The sink to which the copier copies data. */
+ this._sink = sinkPipe.outputStream;
+
+ /** Input stream from which to read data the copier's written to its sink. */
+ this._copiedDataStream = sinkPipe.inputStream;
+
+ this._copiedDataStream.disableReadabilityLimit();
+
+ /**
+ * True if there's a callback waiting to read data written by the copier to
+ * its output, from the input end of the pipe representing the copier's sink.
+ */
+ this._waitingForData = false;
+
+ /**
+ * An array of the bytes of data expected to be written to output by the
+ * copier when this test runs.
+ */
+ this._expectedData = undefined;
+
+ /** Array of bytes of data received so far. */
+ this._receivedData = [];
+
+ /** The expected final status returned by the copier. */
+ this._expectedStatus = -1;
+
+ /** The actual final status returned by the copier. */
+ this._actualStatus = -1;
+
+ /** The most recent sequence of bytes written to output by the copier. */
+ this._lastQuantum = [];
+
+ /**
+ * True iff we've received the last quantum of data written to the sink by the
+ * copier.
+ */
+ this._allDataWritten = false;
+
+ /**
+ * True iff the copier has notified its associated stream listener of
+ * completion.
+ */
+ this._copyingFinished = false;
+
+ /** Index of the next task to execute while driving the copier. */
+ this._currentTask = 0;
+
+ /** Array containing all tasks to run. */
+ this._tasks = [];
+
+ /** The copier used by this test. */
+ this._copier =
+ new WriteThroughCopier(this._source, this._sink, this, null);
+
+ // Start watching for data written by the copier to the sink.
+ this._waitForWrittenData();
+}
+CopyTest.prototype =
+{
+ /**
+ * Adds the given array of bytes to data in the copier's source.
+ *
+ * @param bytes : [uint]
+ * array of bytes of data to add to the source for the copier
+ */
+ addToSource: function addToSource(bytes)
+ {
+ var self = this;
+ this._addToTasks(function addToSourceTask()
+ {
+ note("addToSourceTask");
+
+ try
+ {
+ self._copyableDataStream.makeWritable(bytes.length);
+ self._copyableDataStream.writeByteArray(bytes, bytes.length);
+ }
+ finally
+ {
+ self._stageNextTask();
+ }
+ });
+ },
+
+ /**
+ * Makes bytes of data previously added to the source available to be read by
+ * the copier.
+ *
+ * @param count : uint
+ * number of bytes to make available for reading
+ */
+ makeSourceReadable: function makeSourceReadable(count)
+ {
+ var self = this;
+ this._addToTasks(function makeSourceReadableTask()
+ {
+ note("makeSourceReadableTask");
+
+ self._source.makeReadable(count);
+ self._stageNextTask();
+ });
+ },
+
+ /**
+ * Increases available space in the sink by the given amount, waits for the
+ * given series of arrays of bytes to be written to sink by the copier, and
+ * causes execution to asynchronously continue to the next task when the last
+ * of those arrays of bytes is received.
+ *
+ * @param bytes : uint
+ * number of bytes of space to make available in the sink
+ * @param dataQuantums : [[uint]]
+ * array of byte arrays to expect to be written in sequence to the sink
+ */
+ makeSinkWritableAndWaitFor:
+ function makeSinkWritableAndWaitFor(bytes, dataQuantums)
+ {
+ var self = this;
+
+ do_check_eq(bytes,
+ dataQuantums.reduce(function(partial, current)
+ {
+ return partial + current.length;
+ }, 0),
+ "bytes/quantums mismatch");
+
+ function increaseSinkSpaceTask()
+ {
+ /* Now do the actual work to trigger the interceptor. */
+ self._sink.makeWritable(bytes);
+ }
+
+ this._waitForHelper("increaseSinkSpaceTask",
+ dataQuantums, increaseSinkSpaceTask);
+ },
+
+ /**
+ * Increases available space in the sink by the given amount, waits for the
+ * given series of arrays of bytes to be written to sink by the copier, and
+ * causes execution to asynchronously continue to the next task when the last
+ * of those arrays of bytes is received.
+ *
+ * @param bytes : uint
+ * number of bytes of space to make available in the sink
+ * @param dataQuantums : [[uint]]
+ * array of byte arrays to expect to be written in sequence to the sink
+ */
+ makeSinkWritableByIncrementsAndWaitFor:
+ function makeSinkWritableByIncrementsAndWaitFor(bytes, dataQuantums)
+ {
+ var self = this;
+
+ var desiredAmounts = dataQuantums.map(function(v) { return v.length; });
+ do_check_eq(bytes, sum(desiredAmounts), "bytes/quantums mismatch");
+
+ function increaseSinkSpaceByIncrementsTask()
+ {
+ /* Now do the actual work to trigger the interceptor incrementally. */
+ self._sink.makeWritableByIncrements(desiredAmounts);
+ }
+
+ this._waitForHelper("increaseSinkSpaceByIncrementsTask",
+ dataQuantums, increaseSinkSpaceByIncrementsTask);
+ },
+
+ /**
+ * Close the copier's source stream, then asynchronously continue to the next
+ * task.
+ *
+ * @param status : nsresult
+ * the status to provide when closing the copier's source stream
+ */
+ closeSource: function closeSource(status)
+ {
+ var self = this;
+
+ this._addToTasks(function closeSourceTask()
+ {
+ note("closeSourceTask");
+
+ self._source.closeWithStatus(status);
+ self._stageNextTask();
+ });
+ },
+
+ /**
+ * Close the copier's source stream, then wait for the given number of bytes
+ * and for the given series of arrays of bytes to be written to the sink, then
+ * asynchronously continue to the next task.
+ *
+ * @param status : nsresult
+ * the status to provide when closing the copier's source stream
+ * @param bytes : uint
+ * number of bytes of space to make available in the sink
+ * @param dataQuantums : [[uint]]
+ * array of byte arrays to expect to be written in sequence to the sink
+ */
+ closeSourceAndWaitFor:
+ function closeSourceAndWaitFor(status, bytes, dataQuantums)
+ {
+ var self = this;
+
+ do_check_eq(bytes, sum(dataQuantums.map(function(v) { return v.length; })),
+ "bytes/quantums mismatch");
+
+ function closeSourceAndWaitForTask()
+ {
+ self._sink.makeWritable(bytes);
+ self._copyableDataStream.closeWithStatus(status);
+ }
+
+ this._waitForHelper("closeSourceAndWaitForTask",
+ dataQuantums, closeSourceAndWaitForTask);
+ },
+
+ /**
+ * Closes the copier's sink stream, providing the given status, then
+ * asynchronously continue to the next task.
+ *
+ * @param status : nsresult
+ * the status to provide when closing the copier's sink stream
+ */
+ closeSink: function closeSink(status)
+ {
+ var self = this;
+ this._addToTasks(function closeSinkTask()
+ {
+ note("closeSinkTask");
+
+ self._sink.closeWithStatus(status);
+ self._stageNextTask();
+ });
+ },
+
+ /**
+ * Closes the copier's source stream, then immediately closes the copier's
+ * sink stream, then asynchronously continues to the next task.
+ *
+ * @param sourceStatus : nsresult
+ * the status to provide when closing the copier's source stream
+ * @param sinkStatus : nsresult
+ * the status to provide when closing the copier's sink stream
+ */
+ closeSourceThenSink: function closeSourceThenSink(sourceStatus, sinkStatus)
+ {
+ var self = this;
+ this._addToTasks(function closeSourceThenSinkTask()
+ {
+ note("closeSourceThenSinkTask");
+
+ self._source.closeWithStatus(sourceStatus);
+ self._sink.closeWithStatus(sinkStatus);
+ self._stageNextTask();
+ });
+ },
+
+ /**
+ * Closes the copier's sink stream, then immediately closes the copier's
+ * source stream, then asynchronously continues to the next task.
+ *
+ * @param sinkStatus : nsresult
+ * the status to provide when closing the copier's sink stream
+ * @param sourceStatus : nsresult
+ * the status to provide when closing the copier's source stream
+ */
+ closeSinkThenSource: function closeSinkThenSource(sinkStatus, sourceStatus)
+ {
+ var self = this;
+ this._addToTasks(function closeSinkThenSourceTask()
+ {
+ note("closeSinkThenSource");
+
+ self._sink.closeWithStatus(sinkStatus);
+ self._source.closeWithStatus(sourceStatus);
+ self._stageNextTask();
+ });
+ },
+
+ /**
+ * Indicates that the given status is expected to be returned when the stream
+ * listener for the copy indicates completion, that the expected data copied
+ * by the copier to sink are the concatenation of the arrays of bytes in
+ * receivedData, and kicks off the tasks in this test.
+ *
+ * @param expectedStatus : nsresult
+ * the status expected to be returned by the copier at completion
+ * @param receivedData : [[uint]]
+ * an array containing arrays of bytes whose concatenation constitutes the
+ * expected copied data
+ */
+ expect: function expect(expectedStatus, receivedData)
+ {
+ this._expectedStatus = expectedStatus;
+ this._expectedData = [];
+ for (var i = 0, sz = receivedData.length; i < sz; i++)
+ this._expectedData.push.apply(this._expectedData, receivedData[i]);
+
+ this._stageNextTask();
+ },
+
+ /**
+ * Sets up a stream interceptor that will verify that each piece of data
+ * written to the sink by the copier corresponds to the currently expected
+ * pieces of data, calls the trigger, then waits for those pieces of data to
+ * be received. Once all have been received, the interceptor is removed and
+ * the next task is asynchronously executed.
+ *
+ * @param name : string
+ * name of the task created by this, used in debugging output
+ * @param dataQuantums : [[uint]]
+ * array of expected arrays of bytes to be written to the sink by the copier
+ * @param trigger : function() : void
+ * function to call after setting up the interceptor to wait for
+ * notifications (which will be generated as a result of this function's
+ * actions)
+ */
+ _waitForHelper: function _waitForHelper(name, dataQuantums, trigger)
+ {
+ var self = this;
+ this._addToTasks(function waitForHelperTask()
+ {
+ note(name);
+
+ var quantumIndex = 0;
+
+ /*
+ * Intercept all data-available notifications so we can continue when all
+ * the ones we expect have been received.
+ */
+ var streamReadyCallback =
+ {
+ onInputStreamReady: function wrapperOnInputStreamReady(input)
+ {
+ dumpn("*** streamReadyCallback.onInputStreamReady" +
+ "(" + input.name + ")");
+
+ do_check_eq(this, streamReadyCallback, "sanity");
+
+ try
+ {
+ if (quantumIndex < dataQuantums.length)
+ {
+ var quantum = dataQuantums[quantumIndex++];
+ var sz = quantum.length;
+ do_check_eq(self._lastQuantum.length, sz,
+ "different quantum lengths");
+ for (var i = 0; i < sz; i++)
+ {
+ do_check_eq(self._lastQuantum[i], quantum[i],
+ "bad data at " + i);
+ }
+
+ dumpn("*** waiting to check remaining " +
+ (dataQuantums.length - quantumIndex) + " quantums...");
+ }
+ }
+ finally
+ {
+ if (quantumIndex === dataQuantums.length)
+ {
+ dumpn("*** data checks completed! next task...");
+ self._copiedDataStream.removeStreamReadyInterceptor();
+ self._stageNextTask();
+ }
+ }
+ }
+ };
+
+ var interceptor =
+ createStreamReadyInterceptor(streamReadyCallback, "onInputStreamReady");
+ self._copiedDataStream.interceptStreamReadyCallbacks(interceptor);
+
+ /* Do the deed. */
+ trigger();
+ });
+ },
+
+ /**
+ * Initiates asynchronous waiting for data written to the copier's sink to be
+ * available for reading from the input end of the sink's pipe. The callback
+ * stores the received data for comparison in the interceptor used in the
+ * callback added by _waitForHelper and signals test completion when it
+ * receives a zero-data-available notification (if the copier has notified
+ * that it is finished; otherwise allows execution to continue until that has
+ * occurred).
+ */
+ _waitForWrittenData: function _waitForWrittenData()
+ {
+ dumpn("*** _waitForWrittenData (" + this.name + ")");
+
+ var self = this;
+ var outputWrittenWatcher =
+ {
+ onInputStreamReady: function onInputStreamReady(input)
+ {
+ dumpn("*** outputWrittenWatcher.onInputStreamReady" +
+ "(" + input.name + ")");
+
+ if (self._allDataWritten)
+ {
+ do_throw("ruh-roh! why are we getting notified of more data " +
+ "after we should have received all of it?");
+ }
+
+ self._waitingForData = false;
+
+ try
+ {
+ var avail = input.available();
+ }
+ catch (e)
+ {
+ dumpn("*** available() threw! error: " + e);
+ if (self._completed)
+ {
+ dumpn("*** NB: this isn't a problem, because we've copied " +
+ "completely now, and this notify may have been expedited " +
+ "by maybeNotifyFinally such that we're being called when " +
+ "we can *guarantee* nothing is available any more");
+ }
+ avail = 0;
+ }
+
+ if (avail > 0)
+ {
+ var data = input.readByteArray(avail);
+ do_check_eq(data.length, avail,
+ "readByteArray returned wrong number of bytes?");
+ self._lastQuantum = data;
+ self._receivedData.push.apply(self._receivedData, data);
+ }
+
+ if (avail === 0)
+ {
+ dumpn("*** all data received!");
+
+ self._allDataWritten = true;
+
+ if (self._copyingFinished)
+ {
+ dumpn("*** copying already finished, continuing to next test");
+ self._testComplete();
+ }
+ else
+ {
+ dumpn("*** copying not finished, waiting for that to happen");
+ }
+
+ return;
+ }
+
+ self._waitForWrittenData();
+ }
+ };
+
+ this._copiedDataStream.asyncWait(outputWrittenWatcher, 0, 1,
+ gThreadManager.currentThread);
+ this._waitingForData = true;
+ },
+
+ /**
+ * Indicates this test is complete, does the final data-received and copy
+ * status comparisons, and calls the test-completion function provided when
+ * this test was first created.
+ */
+ _testComplete: function _testComplete()
+ {
+ dumpn("*** CopyTest(" + this.name + ") complete! " +
+ "On to the next test...");
+
+ try
+ {
+ do_check_true(this._allDataWritten, "expect all data written now!");
+ do_check_true(this._copyingFinished, "expect copying finished now!");
+
+ do_check_eq(this._actualStatus, this._expectedStatus,
+ "wrong final status");
+
+ var expected = this._expectedData, received = this._receivedData;
+ dumpn("received: [" + received + "], expected: [" + expected + "]");
+ do_check_eq(received.length, expected.length, "wrong data");
+ for (var i = 0, sz = expected.length; i < sz; i++)
+ do_check_eq(received[i], expected[i], "bad data at " + i);
+ }
+ catch (e)
+ {
+ dumpn("!!! ERROR PERFORMING FINAL " + this.name + " CHECKS! " + e);
+ throw e;
+ }
+ finally
+ {
+ dumpn("*** CopyTest(" + this.name + ") complete! " +
+ "Invoking test-completion callback...");
+ this._done();
+ }
+ },
+
+ /** Dispatches an event at this thread which will run the next task. */
+ _stageNextTask: function _stageNextTask()
+ {
+ dumpn("*** CopyTest(" + this.name + ")._stageNextTask()");
+
+ if (this._currentTask === this._tasks.length)
+ {
+ dumpn("*** CopyTest(" + this.name + ") tasks complete!");
+ return;
+ }
+
+ var task = this._tasks[this._currentTask++];
+ var self = this;
+ var event =
+ {
+ run: function run()
+ {
+ try
+ {
+ task();
+ }
+ catch (e)
+ {
+ do_throw("exception thrown running task: " + e);
+ }
+ }
+ };
+ gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
+ },
+
+ /**
+ * Adds the given function as a task to be run at a later time.
+ *
+ * @param task : function() : void
+ * the function to call as a task
+ */
+ _addToTasks: function _addToTasks(task)
+ {
+ this._tasks.push(task);
+ },
+
+ //
+ // see nsIRequestObserver.onStartRequest
+ //
+ onStartRequest: function onStartRequest(self, _)
+ {
+ dumpn("*** CopyTest.onStartRequest (" + self.name + ")");
+
+ do_check_true(_ === null);
+ do_check_eq(this._receivedData.length, 0);
+ do_check_eq(this._lastQuantum.length, 0);
+ },
+
+ //
+ // see nsIRequestObserver.onStopRequest
+ //
+ onStopRequest: function onStopRequest(self, _, status)
+ {
+ dumpn("*** CopyTest.onStopRequest (" + self.name + ", " + status + ")");
+
+ do_check_true(_ === null);
+ this._actualStatus = status;
+
+ this._copyingFinished = true;
+
+ if (this._allDataWritten)
+ {
+ dumpn("*** all data written, continuing with remaining tests...");
+ this._testComplete();
+ }
+ else
+ {
+ /*
+ * Everything's copied as far as the copier is concerned. However, there
+ * may be a backup transferring from the output end of the copy sink to
+ * the input end where we can actually verify that the expected data was
+ * written as expected, because that transfer occurs asynchronously. If
+ * we do final data-received checks now, we'll miss still-pending data.
+ * Therefore, to wrap up this copy test we still need to asynchronously
+ * wait on the input end of the sink until we hit end-of-stream or some
+ * error condition. Then we know we're done and can continue with the
+ * next test.
+ */
+ dumpn("*** not all data copied, waiting for that to happen...");
+
+ if (!this._waitingForData)
+ this._waitForWrittenData();
+
+ this._copiedDataStream.maybeNotifyFinally();
+ }
+ }
+};