summaryrefslogtreecommitdiff
path: root/dom/fetch
diff options
context:
space:
mode:
authorBrian Smith <brian@dbsoft.org>2023-09-27 19:20:08 -0500
committerBrian Smith <brian@dbsoft.org>2023-09-27 19:20:08 -0500
commit4a179ff8855552d2c19b72e82f1d7f7a84a401fa (patch)
tree3aa2d56896f9d01a80511c253c9f38d35bcb598b /dom/fetch
parent8c5a0f0de9ece809942d8e412246194540d4e2b0 (diff)
downloaduxp-4a179ff8855552d2c19b72e82f1d7f7a84a401fa.tar.gz
Issue #1442 - Part 8: Fetch implementation of streams.
https://bugzilla.mozilla.org/show_bug.cgi?id=1128959 +worker-friendly pref checking for the DOM API.
Diffstat (limited to 'dom/fetch')
-rw-r--r--dom/fetch/Fetch.cpp97
-rw-r--r--dom/fetch/Fetch.h426
-rw-r--r--dom/fetch/FetchStream.cpp336
-rw-r--r--dom/fetch/FetchStream.h107
-rw-r--r--dom/fetch/Request.cpp19
-rw-r--r--dom/fetch/Response.cpp26
-rw-r--r--dom/fetch/moz.build1
7 files changed, 790 insertions, 222 deletions
diff --git a/dom/fetch/Fetch.cpp b/dom/fetch/Fetch.cpp
index 42c9fc74f9..63e2c83eaa 100644
--- a/dom/fetch/Fetch.cpp
+++ b/dom/fetch/Fetch.cpp
@@ -5,6 +5,7 @@
#include "Fetch.h"
#include "FetchConsumer.h"
+#include "FetchStream.h"
#include "nsIDocument.h"
#include "nsIGlobalObject.h"
@@ -22,6 +23,7 @@
#include "mozilla/ErrorResult.h"
#include "mozilla/dom/BodyUtil.h"
+#include "mozilla/dom/DOMError.h"
#include "mozilla/dom/EncodingUtils.h"
#include "mozilla/dom/Exceptions.h"
#include "mozilla/dom/FetchDriver.h"
@@ -35,6 +37,7 @@
#include "mozilla/dom/Response.h"
#include "mozilla/dom/ScriptSettings.h"
#include "mozilla/dom/URLSearchParams.h"
+#include "mozilla/dom/WorkerPrivate.h"
#include "mozilla/dom/workers/ServiceWorkerManager.h"
#include "FetchObserver.h"
@@ -918,6 +921,7 @@ ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUS
template <class Derived>
FetchBody<Derived>::FetchBody()
: mWorkerPrivate(nullptr)
+ , mReadableStreamBody(nullptr)
, mBodyUsed(false)
{
if (!NS_IsMainThread()) {
@@ -938,8 +942,44 @@ FetchBody<Derived>::~FetchBody()
}
template <class Derived>
+bool
+FetchBody<Derived>::BodyUsed() const
+{
+ if (mBodyUsed) {
+ return true;
+ }
+
+ // If this object is disturbed or locked, return false.
+ if (mReadableStreamBody) {
+ AutoJSAPI jsapi;
+ // If we start tracking ownership, we need something like this.
+ //if (!jsapi.Init(mOwner)) {
+ // return true;
+ //}
+
+ JSContext* cx = jsapi.cx();
+
+ JS::Rooted<JSObject*> body(cx, mReadableStreamBody);
+ if (JS::ReadableStreamIsDisturbed(body) ||
+ JS::ReadableStreamIsLocked(body)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+template
+bool
+FetchBody<Request>::BodyUsed() const;
+
+template
+bool
+FetchBody<Response>::BodyUsed() const;
+
+template <class Derived>
already_AddRefed<Promise>
-FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv)
+FetchBody<Derived>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv)
{
RefPtr<AbortSignal> signal = DerivedClass()->GetSignal();
if (signal && signal->Aborted()) {
@@ -954,9 +994,16 @@ FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv)
SetBodyUsed();
+ nsCOMPtr<nsIGlobalObject> global = DerivedClass()->GetParentObject();
+
+ // If we already created a ReadableStreamBody we have to close it now.
+ if (mReadableStreamBody) {
+ JS::Rooted<JSObject*> body(aCx, mReadableStreamBody);
+ JS::ReadableStreamClose(aCx, body);
+ }
+
RefPtr<Promise> promise =
- FetchBodyConsumer<Derived>::Create(DerivedClass()->GetParentObject(),
- this, signal, aType, aRv);
+ FetchBodyConsumer<Derived>::Create(global, this, signal, aType, aRv);
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
@@ -966,11 +1013,11 @@ FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv)
template
already_AddRefed<Promise>
-FetchBody<Request>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
+FetchBody<Request>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv);
template
already_AddRefed<Promise>
-FetchBody<Response>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
+FetchBody<Response>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv);
template <class Derived>
void
@@ -1003,20 +1050,52 @@ FetchBody<Response>::SetMimeType();
template <class Derived>
void
FetchBody<Derived>::GetBody(JSContext* aCx,
- JS::MutableHandle<JSObject*> aMessage)
+ JS::MutableHandle<JSObject*> aBodyOut,
+ ErrorResult& aRv)
{
- // TODO
+ nsCOMPtr<nsIInputStream> inputStream;
+ DerivedClass()->GetBody(getter_AddRefs(inputStream));
+
+ if (!inputStream) {
+ aBodyOut.set(nullptr);
+ return;
+ }
+
+ if (!mReadableStreamBody) {
+ JS::Rooted<JSObject*> body(aCx,
+ FetchStream::Create(aCx,
+ DerivedClass()->GetParentObject(),
+ inputStream,
+ aRv));
+ if (NS_WARN_IF(aRv.Failed())) {
+ return;
+ }
+
+ MOZ_ASSERT(body);
+
+ // If the body has been already consumed, we close the stream.
+ if (BodyUsed() && !JS::ReadableStreamClose(aCx, body)) {
+ aRv.StealExceptionFromJSContext(aCx);
+ return;
+ }
+
+ mReadableStreamBody = body;
+ }
+
+ aBodyOut.set(mReadableStreamBody);
}
template
void
FetchBody<Request>::GetBody(JSContext* aCx,
- JS::MutableHandle<JSObject*> aMessage);
+ JS::MutableHandle<JSObject*> aMessage,
+ ErrorResult& aRv);
template
void
FetchBody<Response>::GetBody(JSContext* aCx,
- JS::MutableHandle<JSObject*> aMessage);
+ JS::MutableHandle<JSObject*> aMessage,
+ ErrorResult& aRv);
} // namespace dom
diff --git a/dom/fetch/Fetch.h b/dom/fetch/Fetch.h
index bc2df62689..c605e45bc5 100644
--- a/dom/fetch/Fetch.h
+++ b/dom/fetch/Fetch.h
@@ -1,211 +1,215 @@
-/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-#ifndef mozilla_dom_Fetch_h
-#define mozilla_dom_Fetch_h
-
-#include "nsAutoPtr.h"
-#include "nsIStreamLoader.h"
-
-#include "nsCOMPtr.h"
-#include "nsError.h"
-#include "nsProxyRelease.h"
-#include "nsString.h"
-
-#include "mozilla/DebugOnly.h"
-#include "mozilla/ErrorResult.h"
-#include "mozilla/dom/Promise.h"
-#include "mozilla/dom/RequestBinding.h"
-
-class nsIGlobalObject;
-
-namespace mozilla {
-namespace dom {
-
-class ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
-class BlobImpl;
-class InternalRequest;
-class OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
-class RequestOrUSVString;
-
-namespace workers {
-class WorkerPrivate;
-} // namespace workers
-
-already_AddRefed<Promise>
-FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
- const RequestInit& aInit, ErrorResult& aRv);
-
-nsresult
-UpdateRequestReferrer(nsIGlobalObject* aGlobal, InternalRequest* aRequest);
-
-/*
- * Creates an nsIInputStream based on the fetch specifications 'extract a byte
- * stream algorithm' - http://fetch.spec.whatwg.org/#concept-bodyinit-extract.
- * Stores content type in out param aContentType.
- */
-nsresult
-ExtractByteStreamFromBody(const OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
- nsIInputStream** aStream,
- nsCString& aContentType,
- uint64_t& aContentLength);
-
-/*
- * Non-owning version.
- */
-nsresult
-ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
- nsIInputStream** aStream,
- nsCString& aContentType,
- uint64_t& aContentLength);
-
-template <class Derived> class FetchBodyConsumer;
-
-enum FetchConsumeType
-{
- CONSUME_ARRAYBUFFER,
- CONSUME_BLOB,
- CONSUME_FORMDATA,
- CONSUME_JSON,
- CONSUME_TEXT,
-};
-
-/*
- * FetchBody's body consumption uses nsIInputStreamPump to read from the
- * underlying stream to a block of memory, which is then adopted by
- * ContinueConsumeBody() and converted to the right type based on the JS
- * function called.
- *
- * Use of the nsIInputStreamPump complicates things on the worker thread.
- * The solution used here is similar to WebSockets.
- * The difference is that we are only interested in completion and not data
- * events, and nsIInputStreamPump can only deliver completion on the main thread.
- *
- * Before starting the pump on the main thread, we addref the FetchBody to keep
- * it alive. Then we add a feature, to track the status of the worker.
- *
- * ContinueConsumeBody() is the function that cleans things up in both success
- * and error conditions and so all callers call it with the appropriate status.
- *
- * Once the read is initiated on the main thread there are two possibilities.
- *
- * 1) Pump finishes before worker has finished Running.
- * In this case we adopt the data and dispatch a runnable to the worker,
- * which derefs FetchBody and removes the feature and resolves the Promise.
- *
- * 2) Pump still working while worker has stopped Running.
- * The feature is Notify()ed and ContinueConsumeBody() is called with
- * NS_BINDING_ABORTED. We first Cancel() the pump using a sync runnable to
- * ensure that mFetchBody remains alive (since mConsumeBodyPump is strongly
- * held by it) until pump->Cancel() is called. OnStreamComplete() will not
- * do anything if the error code is NS_BINDING_ABORTED, so we don't have to
- * worry about keeping anything alive.
- *
- * The pump is always released on the main thread.
- */
-template <class Derived>
-class FetchBody
-{
-public:
- friend class FetchBodyConsumer<Derived>;
-
- NS_IMETHOD_(MozExternalRefCountType) AddRef(void) = 0;
- NS_IMETHOD_(MozExternalRefCountType) Release(void) = 0;
-
- bool
- BodyUsed() const { return mBodyUsed; }
-
- already_AddRefed<Promise>
- ArrayBuffer(ErrorResult& aRv)
- {
- return ConsumeBody(CONSUME_ARRAYBUFFER, aRv);
- }
-
- already_AddRefed<Promise>
- Blob(ErrorResult& aRv)
- {
- return ConsumeBody(CONSUME_BLOB, aRv);
- }
-
- already_AddRefed<Promise>
- FormData(ErrorResult& aRv)
- {
- return ConsumeBody(CONSUME_FORMDATA, aRv);
- }
-
- already_AddRefed<Promise>
- Json(ErrorResult& aRv)
- {
- return ConsumeBody(CONSUME_JSON, aRv);
- }
-
- already_AddRefed<Promise>
- Text(ErrorResult& aRv)
- {
- return ConsumeBody(CONSUME_TEXT, aRv);
- }
-
- void
- GetBody(JSContext* aCx,
- JS::MutableHandle<JSObject*> aMessage);
-
- // Utility public methods accessed by various runnables.
-
- void
- SetBodyUsed()
- {
- mBodyUsed = true;
- }
-
- const nsCString&
- MimeType() const
- {
- return mMimeType;
- }
-
- virtual AbortSignal*
- GetSignal() const = 0;
-
-protected:
- FetchBody();
-
- // Always set whenever the FetchBody is created on the worker thread.
- workers::WorkerPrivate* mWorkerPrivate;
-
- virtual ~FetchBody();
-
- void
- SetMimeType();
-private:
- Derived*
- DerivedClass() const
- {
- return static_cast<Derived*>(const_cast<FetchBody*>(this));
- }
-
- already_AddRefed<Promise>
- ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
-
- bool
- IsOnTargetThread()
- {
- return NS_IsMainThread() == !mWorkerPrivate;
- }
-
- void
- AssertIsOnTargetThread()
- {
- MOZ_ASSERT(IsOnTargetThread());
- }
-
- // Only ever set once, always on target thread.
- bool mBodyUsed;
- nsCString mMimeType;
-};
-
-} // namespace dom
-} // namespace mozilla
-
-#endif // mozilla_dom_Fetch_h
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_dom_Fetch_h
+#define mozilla_dom_Fetch_h
+
+#include "nsAutoPtr.h"
+#include "nsIStreamLoader.h"
+
+#include "nsCOMPtr.h"
+#include "nsError.h"
+#include "nsProxyRelease.h"
+#include "nsString.h"
+
+#include "mozilla/DebugOnly.h"
+#include "mozilla/ErrorResult.h"
+#include "mozilla/dom/Promise.h"
+#include "mozilla/dom/RequestBinding.h"
+
+class nsIGlobalObject;
+
+namespace mozilla {
+namespace dom {
+
+class ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
+class BlobImpl;
+class InternalRequest;
+class OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
+class RequestOrUSVString;
+
+namespace workers {
+class WorkerPrivate;
+} // namespace workers
+
+already_AddRefed<Promise>
+FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
+ const RequestInit& aInit, ErrorResult& aRv);
+
+nsresult
+UpdateRequestReferrer(nsIGlobalObject* aGlobal, InternalRequest* aRequest);
+
+/*
+ * Creates an nsIInputStream based on the fetch specifications 'extract a byte
+ * stream algorithm' - http://fetch.spec.whatwg.org/#concept-bodyinit-extract.
+ * Stores content type in out param aContentType.
+ */
+nsresult
+ExtractByteStreamFromBody(const OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
+ nsIInputStream** aStream,
+ nsCString& aContentType,
+ uint64_t& aContentLength);
+
+/*
+ * Non-owning version.
+ */
+nsresult
+ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
+ nsIInputStream** aStream,
+ nsCString& aContentType,
+ uint64_t& aContentLength);
+
+template <class Derived> class FetchBodyConsumer;
+
+enum FetchConsumeType
+{
+ CONSUME_ARRAYBUFFER,
+ CONSUME_BLOB,
+ CONSUME_FORMDATA,
+ CONSUME_JSON,
+ CONSUME_TEXT,
+};
+
+/*
+ * FetchBody's body consumption uses nsIInputStreamPump to read from the
+ * underlying stream to a block of memory, which is then adopted by
+ * ContinueConsumeBody() and converted to the right type based on the JS
+ * function called.
+ *
+ * Use of the nsIInputStreamPump complicates things on the worker thread.
+ * The solution used here is similar to WebSockets.
+ * The difference is that we are only interested in completion and not data
+ * events, and nsIInputStreamPump can only deliver completion on the main thread.
+ *
+ * Before starting the pump on the main thread, we addref the FetchBody to keep
+ * it alive. Then we add a feature, to track the status of the worker.
+ *
+ * ContinueConsumeBody() is the function that cleans things up in both success
+ * and error conditions and so all callers call it with the appropriate status.
+ *
+ * Once the read is initiated on the main thread there are two possibilities.
+ *
+ * 1) Pump finishes before worker has finished Running.
+ * In this case we adopt the data and dispatch a runnable to the worker,
+ * which derefs FetchBody and removes the feature and resolves the Promise.
+ *
+ * 2) Pump still working while worker has stopped Running.
+ * The feature is Notify()ed and ContinueConsumeBody() is called with
+ * NS_BINDING_ABORTED. We first Cancel() the pump using a sync runnable to
+ * ensure that mFetchBody remains alive (since mConsumeBodyPump is strongly
+ * held by it) until pump->Cancel() is called. OnStreamComplete() will not
+ * do anything if the error code is NS_BINDING_ABORTED, so we don't have to
+ * worry about keeping anything alive.
+ *
+ * The pump is always released on the main thread.
+ */
+template <class Derived>
+class FetchBody
+{
+public:
+ friend class FetchBodyConsumer<Derived>;
+
+ NS_IMETHOD_(MozExternalRefCountType) AddRef(void) = 0;
+ NS_IMETHOD_(MozExternalRefCountType) Release(void) = 0;
+
+ bool
+ BodyUsed() const;
+
+ already_AddRefed<Promise>
+ ArrayBuffer(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_ARRAYBUFFER, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Blob(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_BLOB, aRv);
+ }
+
+ already_AddRefed<Promise>
+ FormData(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_FORMDATA, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Json(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_JSON, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Text(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_TEXT, aRv);
+ }
+
+ void
+ GetBody(JSContext* aCx,
+ JS::MutableHandle<JSObject*> aBodyOut,
+ ErrorResult& aRv);
+
+ // Utility public methods accessed by various runnables.
+
+ void
+ SetBodyUsed()
+ {
+ mBodyUsed = true;
+ }
+
+ const nsCString&
+ MimeType() const
+ {
+ return mMimeType;
+ }
+
+ virtual AbortSignal*
+ GetSignal() const = 0;
+
+protected:
+ FetchBody();
+
+ // Always set whenever the FetchBody is created on the worker thread.
+ workers::WorkerPrivate* mWorkerPrivate;
+
+ // This is the ReadableStream exposed to content. Its underlying source is a FetchStream object.
+ JS::Heap<JSObject*> mReadableStreamBody;
+
+ virtual ~FetchBody();
+
+ void
+ SetMimeType();
+private:
+ Derived*
+ DerivedClass() const
+ {
+ return static_cast<Derived*>(const_cast<FetchBody*>(this));
+ }
+
+ already_AddRefed<Promise>
+ ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv);
+
+ bool
+ IsOnTargetThread()
+ {
+ return NS_IsMainThread() == !mWorkerPrivate;
+ }
+
+ void
+ AssertIsOnTargetThread()
+ {
+ MOZ_ASSERT(IsOnTargetThread());
+ }
+
+ // Only ever set once, always on target thread.
+ bool mBodyUsed;
+ nsCString mMimeType;
+};
+
+} // namespace dom
+} // namespace mozilla
+
+#endif // mozilla_dom_Fetch_h
diff --git a/dom/fetch/FetchStream.cpp b/dom/fetch/FetchStream.cpp
new file mode 100644
index 0000000000..7b165b2a9d
--- /dev/null
+++ b/dom/fetch/FetchStream.cpp
@@ -0,0 +1,336 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "FetchStream.h"
+#include "nsNetCID.h"
+#include "nsITransport.h"
+#include "nsIStreamTransportService.h"
+#include "nsProxyRelease.h"
+
+#include "mozilla/dom/DOMError.h"
+
+#define FETCH_STREAM_FLAG 0
+
+static NS_DEFINE_CID(kStreamTransportServiceCID,
+ NS_STREAMTRANSPORTSERVICE_CID);
+
+namespace mozilla {
+namespace dom {
+
+NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback)
+
+/* static */ JSObject*
+FetchStream::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
+ nsIInputStream* aInputStream, ErrorResult& aRv)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aCx);
+ MOZ_DIAGNOSTIC_ASSERT(aInputStream);
+
+ RefPtr<FetchStream> stream = new FetchStream(aGlobal, aInputStream);
+
+ if (!JS::HasReadableStreamCallbacks(aCx)) {
+ JS::SetReadableStreamCallbacks(aCx,
+ &FetchStream::RequestDataCallback,
+ &FetchStream::WriteIntoReadRequestCallback,
+ &FetchStream::CancelCallback,
+ &FetchStream::ClosedCallback,
+ &FetchStream::ErroredCallback,
+ &FetchStream::FinalizeCallback);
+ }
+
+ JS::Rooted<JSObject*> body(aCx,
+ JS::NewReadableExternalSourceStreamObject(aCx, stream, FETCH_STREAM_FLAG));
+ if (!body) {
+ aRv.StealExceptionFromJSContext(aCx);
+ return nullptr;
+ }
+
+ stream->mReadableStream = body;
+
+ // JS engine will call the finalize callback.
+ NS_ADDREF(stream.get());
+ return body;
+}
+
+/* static */ void
+FetchStream::RequestDataCallback(JSContext* aCx,
+ JS::HandleObject aStream,
+ void* aUnderlyingSource,
+ uint8_t aFlags,
+ size_t aDesiredSize)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+ MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream));
+
+ RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+
+ MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting ||
+ stream->mState == eChecking);
+
+ if (stream->mState == eChecking) {
+ // If we are looking for more data, there is nothing else we should do:
+ // let's move this checking operation in a reading.
+ MOZ_ASSERT(stream->mInputStream);
+ stream->mState = eReading;
+ return;
+ }
+
+ stream->mState = eReading;
+
+ if (!stream->mInputStream) {
+ // This is the first use of the stream. Let's convert the
+ // mOriginalInputStream into an nsIAsyncInputStream.
+ MOZ_ASSERT(stream->mOriginalInputStream);
+
+ bool nonBlocking = false;
+ nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ nsCOMPtr<nsIAsyncInputStream> asyncStream =
+ do_QueryInterface(stream->mOriginalInputStream);
+ if (!nonBlocking || !asyncStream) {
+ nsCOMPtr<nsIStreamTransportService> sts =
+ do_GetService(kStreamTransportServiceCID, &rv);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ nsCOMPtr<nsITransport> transport;
+ rv = sts->CreateInputTransport(stream->mOriginalInputStream,
+ /* aStartOffset */ 0,
+ /* aReadLimit */ -1,
+ /* aCloseWhenDone */ true,
+ getter_AddRefs(transport));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ nsCOMPtr<nsIInputStream> wrapper;
+ rv = transport->OpenInputStream(/* aFlags */ 0,
+ /* aSegmentSize */ 0,
+ /* aSegmentCount */ 0,
+ getter_AddRefs(wrapper));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ asyncStream = do_QueryInterface(wrapper);
+ }
+
+ stream->mInputStream = asyncStream;
+ stream->mOriginalInputStream = nullptr;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
+ MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream);
+
+ nsresult rv =
+ stream->mInputStream->AsyncWait(stream, 0, 0, nullptr);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ // All good.
+}
+
+/* static */ void
+FetchStream::WriteIntoReadRequestCallback(JSContext* aCx,
+ JS::HandleObject aStream,
+ void* aUnderlyingSource,
+ uint8_t aFlags, void* aBuffer,
+ size_t aLength, size_t* aByteWritten)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+ MOZ_DIAGNOSTIC_ASSERT(aBuffer);
+ MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
+
+ RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+
+ MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
+ MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting);
+ stream->mState = eChecking;
+
+ uint32_t written;
+ nsresult rv =
+ stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ *aByteWritten = written;
+
+ if (written == 0) {
+ stream->mState = eClosed;
+ JS::ReadableStreamClose(aCx, aStream);
+ return;
+ }
+
+ rv = stream->mInputStream->AsyncWait(stream, 0, 0, nullptr);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ stream->ErrorPropagation(aCx, aStream, rv);
+ return;
+ }
+
+ // All good.
+}
+
+/* static */ JS::Value
+FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ JS::HandleValue aReason)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+
+ RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+
+ if (stream->mInputStream) {
+ stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+ }
+
+ stream->mState = eClosed;
+
+ return JS::UndefinedValue();
+}
+
+/* static */ void
+FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+}
+
+/* static */ void
+FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ JS::HandleValue aReason)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+}
+
+void
+FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+ MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+
+ RefPtr<FetchStream> stream =
+ dont_AddRef(static_cast<FetchStream*>(aUnderlyingSource));
+
+ stream->mState = eClosed;
+ stream->mReadableStream = nullptr;
+}
+
+FetchStream::FetchStream(nsIGlobalObject* aGlobal,
+ nsIInputStream* aInputStream)
+ : mState(eWaiting)
+ , mGlobal(aGlobal)
+ , mOriginalInputStream(aInputStream)
+ , mOwningEventTarget(nullptr)
+ , mReadableStream(nullptr)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aInputStream);
+}
+
+FetchStream::~FetchStream()
+{
+ NS_ProxyRelease(mOwningEventTarget, mGlobal.forget());
+}
+
+void
+FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream,
+ nsresult aError)
+{
+ // Nothing to do.
+ if (mState == eClosed) {
+ return;
+ }
+
+ // We cannot continue with any other operation.
+ mState = eClosed;
+
+ // Let's close the stream.
+ if (aError == NS_BASE_STREAM_CLOSED) {
+ JS::ReadableStreamClose(aCx, aStream);
+ return;
+ }
+
+ nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
+
+ // Let's use a generic error.
+ RefPtr<DOMError> error = new DOMError(window, NS_ERROR_DOM_TYPE_ERR);
+
+ JS::Rooted<JS::Value> errorValue(aCx);
+ if (ToJSValue(aCx, error, &errorValue)) {
+ JS::ReadableStreamError(aCx, aStream, errorValue);
+ }
+}
+
+NS_IMETHODIMP
+FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
+{
+ MOZ_DIAGNOSTIC_ASSERT(aStream);
+
+ // Already closed. We have nothing else to do here.
+ if (mState == eClosed) {
+ return NS_OK;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(mInputStream);
+ MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
+
+ AutoJSAPI jsapi;
+ if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
+ // Without JSContext we are not able to close the stream or to propagate the
+ // error.
+ return NS_ERROR_FAILURE;
+ }
+
+ JSContext* cx = jsapi.cx();
+ JS::Rooted<JSObject*> stream(cx, mReadableStream);
+
+ uint64_t size = 0;
+ nsresult rv = mInputStream->Available(&size);
+ if (NS_SUCCEEDED(rv) && size == 0) {
+ // In theory this should not happen. If size is 0, the stream should be
+ // considered closed.
+ rv = NS_BASE_STREAM_CLOSED;
+ }
+
+ // No warning for stream closed.
+ if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
+ ErrorPropagation(cx, stream, rv);
+ return NS_OK;
+ }
+
+ // This extra checking is completed. Let's wait for the next read request.
+ if (mState == eChecking) {
+ mState = eWaiting;
+ return NS_OK;
+ }
+
+ mState = eWriting;
+ JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
+
+ // The WriteInto callback changes mState to eChecking.
+ MOZ_DIAGNOSTIC_ASSERT(mState == eChecking);
+
+ return NS_OK;
+}
+
+} // dom namespace
+} // mozilla namespace
diff --git a/dom/fetch/FetchStream.h b/dom/fetch/FetchStream.h
new file mode 100644
index 0000000000..ebe94e205b
--- /dev/null
+++ b/dom/fetch/FetchStream.h
@@ -0,0 +1,107 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_dom_FetchStream_h
+#define mozilla_dom_FetchStream_h
+
+#include "Fetch.h"
+#include "jsapi.h"
+#include "nsIAsyncInputStream.h"
+#include "nsISupportsImpl.h"
+
+class nsIGlobalObject;
+
+class nsIInputStream;
+
+namespace mozilla {
+namespace dom {
+
+class FetchStream final : public nsIInputStreamCallback
+{
+public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+ NS_DECL_NSIINPUTSTREAMCALLBACK
+
+ static JSObject*
+ Create(JSContext* aCx, nsIGlobalObject* aGlobal,
+ nsIInputStream* aInputStream, ErrorResult& aRv);
+
+private:
+ FetchStream(nsIGlobalObject* aGlobal, nsIInputStream* aInputStream);
+ ~FetchStream();
+
+ static void
+ RequestDataCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ size_t aDesiredSize);
+
+ static void
+ WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ void* aBuffer, size_t aLength,
+ size_t* aByteWritten);
+
+ static JS::Value
+ CancelCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ JS::HandleValue aReason);
+
+ static void
+ ClosedCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags);
+
+ static void
+ ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
+ void* aUnderlyingSource, uint8_t aFlags,
+ JS::HandleValue reason);
+
+ static void
+ FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags);
+
+ void
+ ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aRv);
+
+ // Common methods
+
+ enum State {
+ // RequestDataCallback has not been called yet. We haven't started to read
+ // data from the stream yet.
+ eWaiting,
+
+ // We are reading data in a separate I/O thread.
+ eReading,
+
+ // We are ready to write something in the JS Buffer.
+ eWriting,
+
+ // After a writing, we want to check if the stream is closed. After the
+ // check, we go back to eWaiting. If a reading request happens in the
+ // meantime, we move to eReading state.
+ eChecking,
+
+ // Operation completed.
+ eClosed,
+ };
+
+ // Touched only on the target thread.
+ State mState;
+
+ nsCOMPtr<nsIGlobalObject> mGlobal;
+
+ // This is the original inputStream received during the CTOR. It will be
+ // converted into an nsIAsyncInputStream and stored into mInputStream at the
+ // first use.
+ nsCOMPtr<nsIInputStream> mOriginalInputStream;
+ nsCOMPtr<nsIAsyncInputStream> mInputStream;
+
+ nsCOMPtr<nsIEventTarget> mOwningEventTarget;
+
+ JS::Heap<JSObject*> mReadableStream;
+};
+
+} // dom namespace
+} // mozilla namespace
+
+#endif // mozilla_dom_FetchStream_h
diff --git a/dom/fetch/Request.cpp b/dom/fetch/Request.cpp
index ab87a3215a..47fb62e885 100644
--- a/dom/fetch/Request.cpp
+++ b/dom/fetch/Request.cpp
@@ -29,7 +29,24 @@ using namespace workers;
NS_IMPL_CYCLE_COLLECTING_ADDREF(Request)
NS_IMPL_CYCLE_COLLECTING_RELEASE(Request)
-NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(Request, mOwner, mHeaders)
+
+NS_IMPL_CYCLE_COLLECTION_CLASS(Request)
+
+NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(Request)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
+NS_IMPL_CYCLE_COLLECTION_UNLINK_END
+
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(Request)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders)
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
+
+NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Request)
+ NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
+ NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
+NS_IMPL_CYCLE_COLLECTION_TRACE_END
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Request)
NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
diff --git a/dom/fetch/Response.cpp b/dom/fetch/Response.cpp
index 42b25ae1d9..20487906ba 100644
--- a/dom/fetch/Response.cpp
+++ b/dom/fetch/Response.cpp
@@ -26,7 +26,27 @@ namespace dom {
NS_IMPL_CYCLE_COLLECTING_ADDREF(Response)
NS_IMPL_CYCLE_COLLECTING_RELEASE(Response)
-NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(Response, mOwner, mHeaders)
+
+NS_IMPL_CYCLE_COLLECTION_CLASS(Response)
+
+NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(Response)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders)
+
+ tmp->mReadableStreamBody = nullptr;
+
+ NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
+NS_IMPL_CYCLE_COLLECTION_UNLINK_END
+
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(Response)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders)
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
+
+NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Response)
+ NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
+ NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
+NS_IMPL_CYCLE_COLLECTION_TRACE_END
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Response)
NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
@@ -42,10 +62,14 @@ Response::Response(nsIGlobalObject* aGlobal, InternalResponse* aInternalResponse
MOZ_ASSERT(aInternalResponse->Headers()->Guard() == HeadersGuardEnum::Immutable ||
aInternalResponse->Headers()->Guard() == HeadersGuardEnum::Response);
SetMimeType();
+
+ // Keep a firm grip!
+ mozilla::HoldJSObjects(this);
}
Response::~Response()
{
+ mozilla::DropJSObjects(this);
}
/* static */ already_AddRefed<Response>
diff --git a/dom/fetch/moz.build b/dom/fetch/moz.build
index d1a8a49323..cd41620f93 100644
--- a/dom/fetch/moz.build
+++ b/dom/fetch/moz.build
@@ -24,6 +24,7 @@ UNIFIED_SOURCES += [
'FetchConsumer.cpp',
'FetchDriver.cpp',
'FetchObserver.cpp',
+ 'FetchStream.cpp',
'FetchUtil.cpp',
'Headers.cpp',
'InternalHeaders.cpp',