diff options
author | Brian Smith <brian@dbsoft.org> | 2023-09-27 19:20:08 -0500 |
---|---|---|
committer | Brian Smith <brian@dbsoft.org> | 2023-09-27 19:20:08 -0500 |
commit | 4a179ff8855552d2c19b72e82f1d7f7a84a401fa (patch) | |
tree | 3aa2d56896f9d01a80511c253c9f38d35bcb598b /dom/fetch | |
parent | 8c5a0f0de9ece809942d8e412246194540d4e2b0 (diff) | |
download | uxp-4a179ff8855552d2c19b72e82f1d7f7a84a401fa.tar.gz |
Issue #1442 - Part 8: Fetch implementation of streams.
https://bugzilla.mozilla.org/show_bug.cgi?id=1128959
+worker-friendly pref checking for the DOM API.
Diffstat (limited to 'dom/fetch')
-rw-r--r-- | dom/fetch/Fetch.cpp | 97 | ||||
-rw-r--r-- | dom/fetch/Fetch.h | 426 | ||||
-rw-r--r-- | dom/fetch/FetchStream.cpp | 336 | ||||
-rw-r--r-- | dom/fetch/FetchStream.h | 107 | ||||
-rw-r--r-- | dom/fetch/Request.cpp | 19 | ||||
-rw-r--r-- | dom/fetch/Response.cpp | 26 | ||||
-rw-r--r-- | dom/fetch/moz.build | 1 |
7 files changed, 790 insertions, 222 deletions
diff --git a/dom/fetch/Fetch.cpp b/dom/fetch/Fetch.cpp index 42c9fc74f9..63e2c83eaa 100644 --- a/dom/fetch/Fetch.cpp +++ b/dom/fetch/Fetch.cpp @@ -5,6 +5,7 @@ #include "Fetch.h" #include "FetchConsumer.h" +#include "FetchStream.h" #include "nsIDocument.h" #include "nsIGlobalObject.h" @@ -22,6 +23,7 @@ #include "mozilla/ErrorResult.h" #include "mozilla/dom/BodyUtil.h" +#include "mozilla/dom/DOMError.h" #include "mozilla/dom/EncodingUtils.h" #include "mozilla/dom/Exceptions.h" #include "mozilla/dom/FetchDriver.h" @@ -35,6 +37,7 @@ #include "mozilla/dom/Response.h" #include "mozilla/dom/ScriptSettings.h" #include "mozilla/dom/URLSearchParams.h" +#include "mozilla/dom/WorkerPrivate.h" #include "mozilla/dom/workers/ServiceWorkerManager.h" #include "FetchObserver.h" @@ -918,6 +921,7 @@ ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUS template <class Derived> FetchBody<Derived>::FetchBody() : mWorkerPrivate(nullptr) + , mReadableStreamBody(nullptr) , mBodyUsed(false) { if (!NS_IsMainThread()) { @@ -938,8 +942,44 @@ FetchBody<Derived>::~FetchBody() } template <class Derived> +bool +FetchBody<Derived>::BodyUsed() const +{ + if (mBodyUsed) { + return true; + } + + // If this object is disturbed or locked, return false. + if (mReadableStreamBody) { + AutoJSAPI jsapi; + // If we start tracking ownership, we need something like this. + //if (!jsapi.Init(mOwner)) { + // return true; + //} + + JSContext* cx = jsapi.cx(); + + JS::Rooted<JSObject*> body(cx, mReadableStreamBody); + if (JS::ReadableStreamIsDisturbed(body) || + JS::ReadableStreamIsLocked(body)) { + return true; + } + } + + return false; +} + +template +bool +FetchBody<Request>::BodyUsed() const; + +template +bool +FetchBody<Response>::BodyUsed() const; + +template <class Derived> already_AddRefed<Promise> -FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv) +FetchBody<Derived>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv) { RefPtr<AbortSignal> signal = DerivedClass()->GetSignal(); if (signal && signal->Aborted()) { @@ -954,9 +994,16 @@ FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv) SetBodyUsed(); + nsCOMPtr<nsIGlobalObject> global = DerivedClass()->GetParentObject(); + + // If we already created a ReadableStreamBody we have to close it now. + if (mReadableStreamBody) { + JS::Rooted<JSObject*> body(aCx, mReadableStreamBody); + JS::ReadableStreamClose(aCx, body); + } + RefPtr<Promise> promise = - FetchBodyConsumer<Derived>::Create(DerivedClass()->GetParentObject(), - this, signal, aType, aRv); + FetchBodyConsumer<Derived>::Create(global, this, signal, aType, aRv); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } @@ -966,11 +1013,11 @@ FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv) template already_AddRefed<Promise> -FetchBody<Request>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv); +FetchBody<Request>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv); template already_AddRefed<Promise> -FetchBody<Response>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv); +FetchBody<Response>::ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv); template <class Derived> void @@ -1003,20 +1050,52 @@ FetchBody<Response>::SetMimeType(); template <class Derived> void FetchBody<Derived>::GetBody(JSContext* aCx, - JS::MutableHandle<JSObject*> aMessage) + JS::MutableHandle<JSObject*> aBodyOut, + ErrorResult& aRv) { - // TODO + nsCOMPtr<nsIInputStream> inputStream; + DerivedClass()->GetBody(getter_AddRefs(inputStream)); + + if (!inputStream) { + aBodyOut.set(nullptr); + return; + } + + if (!mReadableStreamBody) { + JS::Rooted<JSObject*> body(aCx, + FetchStream::Create(aCx, + DerivedClass()->GetParentObject(), + inputStream, + aRv)); + if (NS_WARN_IF(aRv.Failed())) { + return; + } + + MOZ_ASSERT(body); + + // If the body has been already consumed, we close the stream. + if (BodyUsed() && !JS::ReadableStreamClose(aCx, body)) { + aRv.StealExceptionFromJSContext(aCx); + return; + } + + mReadableStreamBody = body; + } + + aBodyOut.set(mReadableStreamBody); } template void FetchBody<Request>::GetBody(JSContext* aCx, - JS::MutableHandle<JSObject*> aMessage); + JS::MutableHandle<JSObject*> aMessage, + ErrorResult& aRv); template void FetchBody<Response>::GetBody(JSContext* aCx, - JS::MutableHandle<JSObject*> aMessage); + JS::MutableHandle<JSObject*> aMessage, + ErrorResult& aRv); } // namespace dom diff --git a/dom/fetch/Fetch.h b/dom/fetch/Fetch.h index bc2df62689..c605e45bc5 100644 --- a/dom/fetch/Fetch.h +++ b/dom/fetch/Fetch.h @@ -1,211 +1,215 @@ -/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -#ifndef mozilla_dom_Fetch_h -#define mozilla_dom_Fetch_h - -#include "nsAutoPtr.h" -#include "nsIStreamLoader.h" - -#include "nsCOMPtr.h" -#include "nsError.h" -#include "nsProxyRelease.h" -#include "nsString.h" - -#include "mozilla/DebugOnly.h" -#include "mozilla/ErrorResult.h" -#include "mozilla/dom/Promise.h" -#include "mozilla/dom/RequestBinding.h" - -class nsIGlobalObject; - -namespace mozilla { -namespace dom { - -class ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams; -class BlobImpl; -class InternalRequest; -class OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams; -class RequestOrUSVString; - -namespace workers { -class WorkerPrivate; -} // namespace workers - -already_AddRefed<Promise> -FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput, - const RequestInit& aInit, ErrorResult& aRv); - -nsresult -UpdateRequestReferrer(nsIGlobalObject* aGlobal, InternalRequest* aRequest); - -/* - * Creates an nsIInputStream based on the fetch specifications 'extract a byte - * stream algorithm' - http://fetch.spec.whatwg.org/#concept-bodyinit-extract. - * Stores content type in out param aContentType. - */ -nsresult -ExtractByteStreamFromBody(const OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit, - nsIInputStream** aStream, - nsCString& aContentType, - uint64_t& aContentLength); - -/* - * Non-owning version. - */ -nsresult -ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit, - nsIInputStream** aStream, - nsCString& aContentType, - uint64_t& aContentLength); - -template <class Derived> class FetchBodyConsumer; - -enum FetchConsumeType -{ - CONSUME_ARRAYBUFFER, - CONSUME_BLOB, - CONSUME_FORMDATA, - CONSUME_JSON, - CONSUME_TEXT, -}; - -/* - * FetchBody's body consumption uses nsIInputStreamPump to read from the - * underlying stream to a block of memory, which is then adopted by - * ContinueConsumeBody() and converted to the right type based on the JS - * function called. - * - * Use of the nsIInputStreamPump complicates things on the worker thread. - * The solution used here is similar to WebSockets. - * The difference is that we are only interested in completion and not data - * events, and nsIInputStreamPump can only deliver completion on the main thread. - * - * Before starting the pump on the main thread, we addref the FetchBody to keep - * it alive. Then we add a feature, to track the status of the worker. - * - * ContinueConsumeBody() is the function that cleans things up in both success - * and error conditions and so all callers call it with the appropriate status. - * - * Once the read is initiated on the main thread there are two possibilities. - * - * 1) Pump finishes before worker has finished Running. - * In this case we adopt the data and dispatch a runnable to the worker, - * which derefs FetchBody and removes the feature and resolves the Promise. - * - * 2) Pump still working while worker has stopped Running. - * The feature is Notify()ed and ContinueConsumeBody() is called with - * NS_BINDING_ABORTED. We first Cancel() the pump using a sync runnable to - * ensure that mFetchBody remains alive (since mConsumeBodyPump is strongly - * held by it) until pump->Cancel() is called. OnStreamComplete() will not - * do anything if the error code is NS_BINDING_ABORTED, so we don't have to - * worry about keeping anything alive. - * - * The pump is always released on the main thread. - */ -template <class Derived> -class FetchBody -{ -public: - friend class FetchBodyConsumer<Derived>; - - NS_IMETHOD_(MozExternalRefCountType) AddRef(void) = 0; - NS_IMETHOD_(MozExternalRefCountType) Release(void) = 0; - - bool - BodyUsed() const { return mBodyUsed; } - - already_AddRefed<Promise> - ArrayBuffer(ErrorResult& aRv) - { - return ConsumeBody(CONSUME_ARRAYBUFFER, aRv); - } - - already_AddRefed<Promise> - Blob(ErrorResult& aRv) - { - return ConsumeBody(CONSUME_BLOB, aRv); - } - - already_AddRefed<Promise> - FormData(ErrorResult& aRv) - { - return ConsumeBody(CONSUME_FORMDATA, aRv); - } - - already_AddRefed<Promise> - Json(ErrorResult& aRv) - { - return ConsumeBody(CONSUME_JSON, aRv); - } - - already_AddRefed<Promise> - Text(ErrorResult& aRv) - { - return ConsumeBody(CONSUME_TEXT, aRv); - } - - void - GetBody(JSContext* aCx, - JS::MutableHandle<JSObject*> aMessage); - - // Utility public methods accessed by various runnables. - - void - SetBodyUsed() - { - mBodyUsed = true; - } - - const nsCString& - MimeType() const - { - return mMimeType; - } - - virtual AbortSignal* - GetSignal() const = 0; - -protected: - FetchBody(); - - // Always set whenever the FetchBody is created on the worker thread. - workers::WorkerPrivate* mWorkerPrivate; - - virtual ~FetchBody(); - - void - SetMimeType(); -private: - Derived* - DerivedClass() const - { - return static_cast<Derived*>(const_cast<FetchBody*>(this)); - } - - already_AddRefed<Promise> - ConsumeBody(FetchConsumeType aType, ErrorResult& aRv); - - bool - IsOnTargetThread() - { - return NS_IsMainThread() == !mWorkerPrivate; - } - - void - AssertIsOnTargetThread() - { - MOZ_ASSERT(IsOnTargetThread()); - } - - // Only ever set once, always on target thread. - bool mBodyUsed; - nsCString mMimeType; -}; - -} // namespace dom -} // namespace mozilla - -#endif // mozilla_dom_Fetch_h +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_dom_Fetch_h
+#define mozilla_dom_Fetch_h
+
+#include "nsAutoPtr.h"
+#include "nsIStreamLoader.h"
+
+#include "nsCOMPtr.h"
+#include "nsError.h"
+#include "nsProxyRelease.h"
+#include "nsString.h"
+
+#include "mozilla/DebugOnly.h"
+#include "mozilla/ErrorResult.h"
+#include "mozilla/dom/Promise.h"
+#include "mozilla/dom/RequestBinding.h"
+
+class nsIGlobalObject;
+
+namespace mozilla {
+namespace dom {
+
+class ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
+class BlobImpl;
+class InternalRequest;
+class OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams;
+class RequestOrUSVString;
+
+namespace workers {
+class WorkerPrivate;
+} // namespace workers
+
+already_AddRefed<Promise>
+FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
+ const RequestInit& aInit, ErrorResult& aRv);
+
+nsresult
+UpdateRequestReferrer(nsIGlobalObject* aGlobal, InternalRequest* aRequest);
+
+/*
+ * Creates an nsIInputStream based on the fetch specifications 'extract a byte
+ * stream algorithm' - http://fetch.spec.whatwg.org/#concept-bodyinit-extract.
+ * Stores content type in out param aContentType.
+ */
+nsresult
+ExtractByteStreamFromBody(const OwningArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
+ nsIInputStream** aStream,
+ nsCString& aContentType,
+ uint64_t& aContentLength);
+
+/*
+ * Non-owning version.
+ */
+nsresult
+ExtractByteStreamFromBody(const ArrayBufferOrArrayBufferViewOrBlobOrFormDataOrUSVStringOrURLSearchParams& aBodyInit,
+ nsIInputStream** aStream,
+ nsCString& aContentType,
+ uint64_t& aContentLength);
+
+template <class Derived> class FetchBodyConsumer;
+
+enum FetchConsumeType
+{
+ CONSUME_ARRAYBUFFER,
+ CONSUME_BLOB,
+ CONSUME_FORMDATA,
+ CONSUME_JSON,
+ CONSUME_TEXT,
+};
+
+/*
+ * FetchBody's body consumption uses nsIInputStreamPump to read from the
+ * underlying stream to a block of memory, which is then adopted by
+ * ContinueConsumeBody() and converted to the right type based on the JS
+ * function called.
+ *
+ * Use of the nsIInputStreamPump complicates things on the worker thread.
+ * The solution used here is similar to WebSockets.
+ * The difference is that we are only interested in completion and not data
+ * events, and nsIInputStreamPump can only deliver completion on the main thread.
+ *
+ * Before starting the pump on the main thread, we addref the FetchBody to keep
+ * it alive. Then we add a feature, to track the status of the worker.
+ *
+ * ContinueConsumeBody() is the function that cleans things up in both success
+ * and error conditions and so all callers call it with the appropriate status.
+ *
+ * Once the read is initiated on the main thread there are two possibilities.
+ *
+ * 1) Pump finishes before worker has finished Running.
+ * In this case we adopt the data and dispatch a runnable to the worker,
+ * which derefs FetchBody and removes the feature and resolves the Promise.
+ *
+ * 2) Pump still working while worker has stopped Running.
+ * The feature is Notify()ed and ContinueConsumeBody() is called with
+ * NS_BINDING_ABORTED. We first Cancel() the pump using a sync runnable to
+ * ensure that mFetchBody remains alive (since mConsumeBodyPump is strongly
+ * held by it) until pump->Cancel() is called. OnStreamComplete() will not
+ * do anything if the error code is NS_BINDING_ABORTED, so we don't have to
+ * worry about keeping anything alive.
+ *
+ * The pump is always released on the main thread.
+ */
+template <class Derived>
+class FetchBody
+{
+public:
+ friend class FetchBodyConsumer<Derived>;
+
+ NS_IMETHOD_(MozExternalRefCountType) AddRef(void) = 0;
+ NS_IMETHOD_(MozExternalRefCountType) Release(void) = 0;
+
+ bool
+ BodyUsed() const;
+
+ already_AddRefed<Promise>
+ ArrayBuffer(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_ARRAYBUFFER, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Blob(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_BLOB, aRv);
+ }
+
+ already_AddRefed<Promise>
+ FormData(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_FORMDATA, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Json(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_JSON, aRv);
+ }
+
+ already_AddRefed<Promise>
+ Text(JSContext* aCx, ErrorResult& aRv)
+ {
+ return ConsumeBody(aCx, CONSUME_TEXT, aRv);
+ }
+
+ void
+ GetBody(JSContext* aCx,
+ JS::MutableHandle<JSObject*> aBodyOut,
+ ErrorResult& aRv);
+
+ // Utility public methods accessed by various runnables.
+
+ void
+ SetBodyUsed()
+ {
+ mBodyUsed = true;
+ }
+
+ const nsCString&
+ MimeType() const
+ {
+ return mMimeType;
+ }
+
+ virtual AbortSignal*
+ GetSignal() const = 0;
+
+protected:
+ FetchBody();
+
+ // Always set whenever the FetchBody is created on the worker thread.
+ workers::WorkerPrivate* mWorkerPrivate;
+
+ // This is the ReadableStream exposed to content. Its underlying source is a FetchStream object.
+ JS::Heap<JSObject*> mReadableStreamBody;
+
+ virtual ~FetchBody();
+
+ void
+ SetMimeType();
+private:
+ Derived*
+ DerivedClass() const
+ {
+ return static_cast<Derived*>(const_cast<FetchBody*>(this));
+ }
+
+ already_AddRefed<Promise>
+ ConsumeBody(JSContext* aCx, FetchConsumeType aType, ErrorResult& aRv);
+
+ bool
+ IsOnTargetThread()
+ {
+ return NS_IsMainThread() == !mWorkerPrivate;
+ }
+
+ void
+ AssertIsOnTargetThread()
+ {
+ MOZ_ASSERT(IsOnTargetThread());
+ }
+
+ // Only ever set once, always on target thread.
+ bool mBodyUsed;
+ nsCString mMimeType;
+};
+
+} // namespace dom
+} // namespace mozilla
+
+#endif // mozilla_dom_Fetch_h
diff --git a/dom/fetch/FetchStream.cpp b/dom/fetch/FetchStream.cpp new file mode 100644 index 0000000000..7b165b2a9d --- /dev/null +++ b/dom/fetch/FetchStream.cpp @@ -0,0 +1,336 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "FetchStream.h" +#include "nsNetCID.h" +#include "nsITransport.h" +#include "nsIStreamTransportService.h" +#include "nsProxyRelease.h" + +#include "mozilla/dom/DOMError.h" + +#define FETCH_STREAM_FLAG 0 + +static NS_DEFINE_CID(kStreamTransportServiceCID, + NS_STREAMTRANSPORTSERVICE_CID); + +namespace mozilla { +namespace dom { + +NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback) + +/* static */ JSObject* +FetchStream::Create(JSContext* aCx, nsIGlobalObject* aGlobal, + nsIInputStream* aInputStream, ErrorResult& aRv) +{ + MOZ_DIAGNOSTIC_ASSERT(aCx); + MOZ_DIAGNOSTIC_ASSERT(aInputStream); + + RefPtr<FetchStream> stream = new FetchStream(aGlobal, aInputStream); + + if (!JS::HasReadableStreamCallbacks(aCx)) { + JS::SetReadableStreamCallbacks(aCx, + &FetchStream::RequestDataCallback, + &FetchStream::WriteIntoReadRequestCallback, + &FetchStream::CancelCallback, + &FetchStream::ClosedCallback, + &FetchStream::ErroredCallback, + &FetchStream::FinalizeCallback); + } + + JS::Rooted<JSObject*> body(aCx, + JS::NewReadableExternalSourceStreamObject(aCx, stream, FETCH_STREAM_FLAG)); + if (!body) { + aRv.StealExceptionFromJSContext(aCx); + return nullptr; + } + + stream->mReadableStream = body; + + // JS engine will call the finalize callback. + NS_ADDREF(stream.get()); + return body; +} + +/* static */ void +FetchStream::RequestDataCallback(JSContext* aCx, + JS::HandleObject aStream, + void* aUnderlyingSource, + uint8_t aFlags, + size_t aDesiredSize) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); + MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)); + + RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource); + + MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting || + stream->mState == eChecking); + + if (stream->mState == eChecking) { + // If we are looking for more data, there is nothing else we should do: + // let's move this checking operation in a reading. + MOZ_ASSERT(stream->mInputStream); + stream->mState = eReading; + return; + } + + stream->mState = eReading; + + if (!stream->mInputStream) { + // This is the first use of the stream. Let's convert the + // mOriginalInputStream into an nsIAsyncInputStream. + MOZ_ASSERT(stream->mOriginalInputStream); + + bool nonBlocking = false; + nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + nsCOMPtr<nsIAsyncInputStream> asyncStream = + do_QueryInterface(stream->mOriginalInputStream); + if (!nonBlocking || !asyncStream) { + nsCOMPtr<nsIStreamTransportService> sts = + do_GetService(kStreamTransportServiceCID, &rv); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + nsCOMPtr<nsITransport> transport; + rv = sts->CreateInputTransport(stream->mOriginalInputStream, + /* aStartOffset */ 0, + /* aReadLimit */ -1, + /* aCloseWhenDone */ true, + getter_AddRefs(transport)); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + nsCOMPtr<nsIInputStream> wrapper; + rv = transport->OpenInputStream(/* aFlags */ 0, + /* aSegmentSize */ 0, + /* aSegmentCount */ 0, + getter_AddRefs(wrapper)); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + asyncStream = do_QueryInterface(wrapper); + } + + stream->mInputStream = asyncStream; + stream->mOriginalInputStream = nullptr; + } + + MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); + MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream); + + nsresult rv = + stream->mInputStream->AsyncWait(stream, 0, 0, nullptr); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + // All good. +} + +/* static */ void +FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, + JS::HandleObject aStream, + void* aUnderlyingSource, + uint8_t aFlags, void* aBuffer, + size_t aLength, size_t* aByteWritten) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); + MOZ_DIAGNOSTIC_ASSERT(aBuffer); + MOZ_DIAGNOSTIC_ASSERT(aByteWritten); + + RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource); + + MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); + MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting); + stream->mState = eChecking; + + uint32_t written; + nsresult rv = + stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + *aByteWritten = written; + + if (written == 0) { + stream->mState = eClosed; + JS::ReadableStreamClose(aCx, aStream); + return; + } + + rv = stream->mInputStream->AsyncWait(stream, 0, 0, nullptr); + if (NS_WARN_IF(NS_FAILED(rv))) { + stream->ErrorPropagation(aCx, aStream, rv); + return; + } + + // All good. +} + +/* static */ JS::Value +FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + JS::HandleValue aReason) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); + + RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource); + + if (stream->mInputStream) { + stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); + } + + stream->mState = eClosed; + + return JS::UndefinedValue(); +} + +/* static */ void +FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); +} + +/* static */ void +FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + JS::HandleValue aReason) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); +} + +void +FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags) +{ + MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); + MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); + + RefPtr<FetchStream> stream = + dont_AddRef(static_cast<FetchStream*>(aUnderlyingSource)); + + stream->mState = eClosed; + stream->mReadableStream = nullptr; +} + +FetchStream::FetchStream(nsIGlobalObject* aGlobal, + nsIInputStream* aInputStream) + : mState(eWaiting) + , mGlobal(aGlobal) + , mOriginalInputStream(aInputStream) + , mOwningEventTarget(nullptr) + , mReadableStream(nullptr) +{ + MOZ_DIAGNOSTIC_ASSERT(aInputStream); +} + +FetchStream::~FetchStream() +{ + NS_ProxyRelease(mOwningEventTarget, mGlobal.forget()); +} + +void +FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, + nsresult aError) +{ + // Nothing to do. + if (mState == eClosed) { + return; + } + + // We cannot continue with any other operation. + mState = eClosed; + + // Let's close the stream. + if (aError == NS_BASE_STREAM_CLOSED) { + JS::ReadableStreamClose(aCx, aStream); + return; + } + + nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal); + + // Let's use a generic error. + RefPtr<DOMError> error = new DOMError(window, NS_ERROR_DOM_TYPE_ERR); + + JS::Rooted<JS::Value> errorValue(aCx); + if (ToJSValue(aCx, error, &errorValue)) { + JS::ReadableStreamError(aCx, aStream, errorValue); + } +} + +NS_IMETHODIMP +FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) +{ + MOZ_DIAGNOSTIC_ASSERT(aStream); + + // Already closed. We have nothing else to do here. + if (mState == eClosed) { + return NS_OK; + } + + MOZ_DIAGNOSTIC_ASSERT(mInputStream); + MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); + + AutoJSAPI jsapi; + if (NS_WARN_IF(!jsapi.Init(mGlobal))) { + // Without JSContext we are not able to close the stream or to propagate the + // error. + return NS_ERROR_FAILURE; + } + + JSContext* cx = jsapi.cx(); + JS::Rooted<JSObject*> stream(cx, mReadableStream); + + uint64_t size = 0; + nsresult rv = mInputStream->Available(&size); + if (NS_SUCCEEDED(rv) && size == 0) { + // In theory this should not happen. If size is 0, the stream should be + // considered closed. + rv = NS_BASE_STREAM_CLOSED; + } + + // No warning for stream closed. + if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { + ErrorPropagation(cx, stream, rv); + return NS_OK; + } + + // This extra checking is completed. Let's wait for the next read request. + if (mState == eChecking) { + mState = eWaiting; + return NS_OK; + } + + mState = eWriting; + JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); + + // The WriteInto callback changes mState to eChecking. + MOZ_DIAGNOSTIC_ASSERT(mState == eChecking); + + return NS_OK; +} + +} // dom namespace +} // mozilla namespace diff --git a/dom/fetch/FetchStream.h b/dom/fetch/FetchStream.h new file mode 100644 index 0000000000..ebe94e205b --- /dev/null +++ b/dom/fetch/FetchStream.h @@ -0,0 +1,107 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_dom_FetchStream_h +#define mozilla_dom_FetchStream_h + +#include "Fetch.h" +#include "jsapi.h" +#include "nsIAsyncInputStream.h" +#include "nsISupportsImpl.h" + +class nsIGlobalObject; + +class nsIInputStream; + +namespace mozilla { +namespace dom { + +class FetchStream final : public nsIInputStreamCallback +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIINPUTSTREAMCALLBACK + + static JSObject* + Create(JSContext* aCx, nsIGlobalObject* aGlobal, + nsIInputStream* aInputStream, ErrorResult& aRv); + +private: + FetchStream(nsIGlobalObject* aGlobal, nsIInputStream* aInputStream); + ~FetchStream(); + + static void + RequestDataCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + size_t aDesiredSize); + + static void + WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + void* aBuffer, size_t aLength, + size_t* aByteWritten); + + static JS::Value + CancelCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + JS::HandleValue aReason); + + static void + ClosedCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags); + + static void + ErroredCallback(JSContext* aCx, JS::HandleObject aStream, + void* aUnderlyingSource, uint8_t aFlags, + JS::HandleValue reason); + + static void + FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags); + + void + ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aRv); + + // Common methods + + enum State { + // RequestDataCallback has not been called yet. We haven't started to read + // data from the stream yet. + eWaiting, + + // We are reading data in a separate I/O thread. + eReading, + + // We are ready to write something in the JS Buffer. + eWriting, + + // After a writing, we want to check if the stream is closed. After the + // check, we go back to eWaiting. If a reading request happens in the + // meantime, we move to eReading state. + eChecking, + + // Operation completed. + eClosed, + }; + + // Touched only on the target thread. + State mState; + + nsCOMPtr<nsIGlobalObject> mGlobal; + + // This is the original inputStream received during the CTOR. It will be + // converted into an nsIAsyncInputStream and stored into mInputStream at the + // first use. + nsCOMPtr<nsIInputStream> mOriginalInputStream; + nsCOMPtr<nsIAsyncInputStream> mInputStream; + + nsCOMPtr<nsIEventTarget> mOwningEventTarget; + + JS::Heap<JSObject*> mReadableStream; +}; + +} // dom namespace +} // mozilla namespace + +#endif // mozilla_dom_FetchStream_h diff --git a/dom/fetch/Request.cpp b/dom/fetch/Request.cpp index ab87a3215a..47fb62e885 100644 --- a/dom/fetch/Request.cpp +++ b/dom/fetch/Request.cpp @@ -29,7 +29,24 @@ using namespace workers; NS_IMPL_CYCLE_COLLECTING_ADDREF(Request) NS_IMPL_CYCLE_COLLECTING_RELEASE(Request) -NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(Request, mOwner, mHeaders) + +NS_IMPL_CYCLE_COLLECTION_CLASS(Request) + +NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(Request) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders) + NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER +NS_IMPL_CYCLE_COLLECTION_UNLINK_END + +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(Request) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders) +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END + +NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Request) + NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody) + NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER +NS_IMPL_CYCLE_COLLECTION_TRACE_END NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Request) NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY diff --git a/dom/fetch/Response.cpp b/dom/fetch/Response.cpp index 42b25ae1d9..20487906ba 100644 --- a/dom/fetch/Response.cpp +++ b/dom/fetch/Response.cpp @@ -26,7 +26,27 @@ namespace dom { NS_IMPL_CYCLE_COLLECTING_ADDREF(Response) NS_IMPL_CYCLE_COLLECTING_RELEASE(Response) -NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(Response, mOwner, mHeaders) + +NS_IMPL_CYCLE_COLLECTION_CLASS(Response) + +NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(Response) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders) + + tmp->mReadableStreamBody = nullptr; + + NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER +NS_IMPL_CYCLE_COLLECTION_UNLINK_END + +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(Response) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders) +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END + +NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Response) + NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody) + NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER +NS_IMPL_CYCLE_COLLECTION_TRACE_END NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Response) NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY @@ -42,10 +62,14 @@ Response::Response(nsIGlobalObject* aGlobal, InternalResponse* aInternalResponse MOZ_ASSERT(aInternalResponse->Headers()->Guard() == HeadersGuardEnum::Immutable || aInternalResponse->Headers()->Guard() == HeadersGuardEnum::Response); SetMimeType(); + + // Keep a firm grip! + mozilla::HoldJSObjects(this); } Response::~Response() { + mozilla::DropJSObjects(this); } /* static */ already_AddRefed<Response> diff --git a/dom/fetch/moz.build b/dom/fetch/moz.build index d1a8a49323..cd41620f93 100644 --- a/dom/fetch/moz.build +++ b/dom/fetch/moz.build @@ -24,6 +24,7 @@ UNIFIED_SOURCES += [ 'FetchConsumer.cpp', 'FetchDriver.cpp', 'FetchObserver.cpp', + 'FetchStream.cpp', 'FetchUtil.cpp', 'Headers.cpp', 'InternalHeaders.cpp', |