diff options
Diffstat (limited to 'dom/fetch/Fetch.cpp')
-rw-r--r-- | dom/fetch/Fetch.cpp | 289 |
1 files changed, 259 insertions, 30 deletions
diff --git a/dom/fetch/Fetch.cpp b/dom/fetch/Fetch.cpp index f944352e3..191f4cfc3 100644 --- a/dom/fetch/Fetch.cpp +++ b/dom/fetch/Fetch.cpp @@ -39,6 +39,7 @@ #include "mozilla/dom/URLSearchParams.h" #include "mozilla/dom/workers/ServiceWorkerManager.h" +#include "FetchObserver.h" #include "InternalRequest.h" #include "InternalResponse.h" @@ -52,38 +53,141 @@ namespace dom { using namespace workers; +// This class helps the proxying of AbortSignal changes cross threads. +class AbortSignalProxy final : public AbortSignal::Follower +{ + // This is created and released on the main-thread. + RefPtr<AbortSignal> mSignalMainThread; + + // This value is used only for the creation of AbortSignal on the + // main-thread. They are not updated. + const bool mAborted; + + // This runnable propagates changes from the AbortSignal on workers to the + // AbortSignal on main-thread. + class AbortSignalProxyRunnable final : public Runnable + { + RefPtr<AbortSignalProxy> mProxy; + + public: + explicit AbortSignalProxyRunnable(AbortSignalProxy* aProxy) + : mProxy(aProxy) + {} + + NS_IMETHOD + Run() override + { + MOZ_ASSERT(NS_IsMainThread()); + AbortSignal* signal = mProxy->GetOrCreateSignalForMainThread(); + signal->Abort(); + return NS_OK; + } + }; + +public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbortSignalProxy) + + explicit AbortSignalProxy(AbortSignal* aSignal) + : mAborted(aSignal->Aborted()) + { + Follow(aSignal); + } + + void + Aborted() override + { + RefPtr<AbortSignalProxyRunnable> runnable = + new AbortSignalProxyRunnable(this); + NS_DispatchToMainThread(runnable); + } + + AbortSignal* + GetOrCreateSignalForMainThread() + { + MOZ_ASSERT(NS_IsMainThread()); + if (!mSignalMainThread) { + mSignalMainThread = new AbortSignal(mAborted); + } + return mSignalMainThread; + } + + void + Shutdown() + { + Unfollow(); + } + +private: + ~AbortSignalProxy() + { + NS_ReleaseOnMainThread(mSignalMainThread.forget()); + } +}; + class WorkerFetchResolver final : public FetchDriverObserver { friend class MainThreadFetchRunnable; + friend class WorkerDataAvailableRunnable; + friend class WorkerFetchResponseEndBase; friend class WorkerFetchResponseEndRunnable; friend class WorkerFetchResponseRunnable; RefPtr<PromiseWorkerProxy> mPromiseProxy; + RefPtr<AbortSignalProxy> mSignalProxy; + RefPtr<FetchObserver> mFetchObserver; + public: // Returns null if worker is shutting down. static already_AddRefed<WorkerFetchResolver> - Create(workers::WorkerPrivate* aWorkerPrivate, Promise* aPromise) + Create(workers::WorkerPrivate* aWorkerPrivate, Promise* aPromise, + AbortSignal* aSignal, FetchObserver* aObserver) { MOZ_ASSERT(aWorkerPrivate); aWorkerPrivate->AssertIsOnWorkerThread(); - RefPtr<PromiseWorkerProxy> proxy = PromiseWorkerProxy::Create(aWorkerPrivate, aPromise); + RefPtr<PromiseWorkerProxy> proxy = + PromiseWorkerProxy::Create(aWorkerPrivate, aPromise); if (!proxy) { return nullptr; } - RefPtr<WorkerFetchResolver> r = new WorkerFetchResolver(proxy); + RefPtr<AbortSignalProxy> signalProxy; + if (aSignal) { + signalProxy = new AbortSignalProxy(aSignal); + } + + RefPtr<WorkerFetchResolver> r = + new WorkerFetchResolver(proxy, signalProxy, aObserver); return r.forget(); } + AbortSignal* + GetAbortSignal() + { + MOZ_ASSERT(NS_IsMainThread()); + + if (!mSignalProxy) { + return nullptr; + } + + return mSignalProxy->GetOrCreateSignalForMainThread(); + } + void OnResponseAvailableInternal(InternalResponse* aResponse) override; void - OnResponseEnd() override; + OnResponseEnd(FetchDriverObserver::EndReason eReason) override; + + void + OnDataAvailable() override; private: - explicit WorkerFetchResolver(PromiseWorkerProxy* aProxy) + WorkerFetchResolver(PromiseWorkerProxy* aProxy, + AbortSignalProxy* aSignalProxy, + FetchObserver* aObserver) : mPromiseProxy(aProxy) + , mSignalProxy(aSignalProxy) + , mFetchObserver(aObserver) { MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(mPromiseProxy); @@ -100,12 +204,16 @@ class MainThreadFetchResolver final : public FetchDriverObserver { RefPtr<Promise> mPromise; RefPtr<Response> mResponse; + RefPtr<FetchObserver> mFetchObserver; nsCOMPtr<nsIDocument> mDocument; NS_DECL_OWNINGTHREAD public: - explicit MainThreadFetchResolver(Promise* aPromise); + MainThreadFetchResolver(Promise* aPromise, FetchObserver* aObserver) + : mPromise(aPromise) + , mFetchObserver(aObserver) + {} void OnResponseAvailableInternal(InternalResponse* aResponse) override; @@ -115,11 +223,20 @@ public: mDocument = aDocument; } - virtual void OnResponseEnd() override + void OnResponseEnd(FetchDriverObserver::EndReason aReason) override { + if (aReason == eAborted) { + mPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR); + } + + mFetchObserver = nullptr; + FlushConsoleReport(); } + void + OnDataAvailable() override; + private: ~MainThreadFetchResolver(); @@ -170,9 +287,11 @@ public: fetch->SetWorkerScript(spec); } + RefPtr<AbortSignal> signal = mResolver->GetAbortSignal(); + // ...but release it before calling Fetch, because mResolver's callback can // be called synchronously and they want the mutex, too. - return fetch->Fetch(mResolver); + return fetch->Fetch(signal, mResolver); } }; @@ -210,6 +329,23 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput, RefPtr<InternalRequest> r = request->GetInternalRequest(); + RefPtr<AbortSignal> signal; + if (aInit.mSignal.WasPassed()) { + signal = &aInit.mSignal.Value(); + } + + if (signal && signal->Aborted()) { + // An already aborted signal should reject immediately. + aRv.Throw(NS_ERROR_DOM_ABORT_ERR); + return nullptr; + } + + RefPtr<FetchObserver> observer; + if (aInit.mObserve.WasPassed()) { + observer = new FetchObserver(aGlobal, signal); + aInit.mObserve.Value().HandleEvent(*observer); + } + if (NS_IsMainThread()) { nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(aGlobal); nsCOMPtr<nsIDocument> doc; @@ -236,11 +372,12 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput, } } - RefPtr<MainThreadFetchResolver> resolver = new MainThreadFetchResolver(p); + RefPtr<MainThreadFetchResolver> resolver = + new MainThreadFetchResolver(p, observer); RefPtr<FetchDriver> fetch = new FetchDriver(r, principal, loadGroup); fetch->SetDocument(doc); resolver->SetDocument(doc); - aRv = fetch->Fetch(resolver); + aRv = fetch->Fetch(signal, resolver); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } @@ -252,7 +389,8 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput, r->SetSkipServiceWorker(); } - RefPtr<WorkerFetchResolver> resolver = WorkerFetchResolver::Create(worker, p); + RefPtr<WorkerFetchResolver> resolver = + WorkerFetchResolver::Create(worker, p, signal, observer); if (!resolver) { NS_WARNING("Could not add WorkerFetchResolver workerHolder to worker"); aRv.Throw(NS_ERROR_DOM_ABORT_ERR); @@ -266,11 +404,6 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput, return p.forget(); } -MainThreadFetchResolver::MainThreadFetchResolver(Promise* aPromise) - : mPromise(aPromise) -{ -} - void MainThreadFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse) { @@ -278,16 +411,39 @@ MainThreadFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse AssertIsOnMainThread(); if (aResponse->Type() != ResponseType::Error) { + if (mFetchObserver) { + mFetchObserver->SetState(FetchState::Complete); + } + nsCOMPtr<nsIGlobalObject> go = mPromise->GetParentObject(); mResponse = new Response(go, aResponse); mPromise->MaybeResolve(mResponse); } else { + if (mFetchObserver) { + mFetchObserver->SetState(FetchState::Errored); + } + ErrorResult result; result.ThrowTypeError<MSG_FETCH_FAILED>(); mPromise->MaybeReject(result); } } +void +MainThreadFetchResolver::OnDataAvailable() +{ + NS_ASSERT_OWNINGTHREAD(MainThreadFetchResolver); + AssertIsOnMainThread(); + + if (!mFetchObserver) { + return; + } + + if (mFetchObserver->State() == FetchState::Requesting) { + mFetchObserver->SetState(FetchState::Responding); + } +} + MainThreadFetchResolver::~MainThreadFetchResolver() { NS_ASSERT_OWNINGTHREAD(MainThreadFetchResolver); @@ -306,6 +462,7 @@ public: , mResolver(aResolver) , mInternalResponse(aResponse) { + MOZ_ASSERT(mResolver); } bool @@ -317,10 +474,18 @@ public: RefPtr<Promise> promise = mResolver->mPromiseProxy->WorkerPromise(); if (mInternalResponse->Type() != ResponseType::Error) { + if (mResolver->mFetchObserver) { + mResolver->mFetchObserver->SetState(FetchState::Complete); + } + RefPtr<nsIGlobalObject> global = aWorkerPrivate->GlobalScope(); RefPtr<Response> response = new Response(global, mInternalResponse); promise->MaybeResolve(response); } else { + if (mResolver->mFetchObserver) { + mResolver->mFetchObserver->SetState(FetchState::Errored); + } + ErrorResult result; result.ThrowTypeError<MSG_FETCH_FAILED>(); promise->MaybeReject(result); @@ -329,14 +494,42 @@ public: } }; +class WorkerDataAvailableRunnable final : public MainThreadWorkerRunnable +{ + RefPtr<WorkerFetchResolver> mResolver; +public: + WorkerDataAvailableRunnable(WorkerPrivate* aWorkerPrivate, + WorkerFetchResolver* aResolver) + : MainThreadWorkerRunnable(aWorkerPrivate) + , mResolver(aResolver) + { + } + + bool + WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override + { + MOZ_ASSERT(aWorkerPrivate); + aWorkerPrivate->AssertIsOnWorkerThread(); + + if (mResolver->mFetchObserver && + mResolver->mFetchObserver->State() == FetchState::Requesting) { + mResolver->mFetchObserver->SetState(FetchState::Responding); + } + + return true; + } +}; + class WorkerFetchResponseEndBase { - RefPtr<PromiseWorkerProxy> mPromiseProxy; +protected: + RefPtr<WorkerFetchResolver> mResolver; + public: - explicit WorkerFetchResponseEndBase(PromiseWorkerProxy* aPromiseProxy) - : mPromiseProxy(aPromiseProxy) + explicit WorkerFetchResponseEndBase(WorkerFetchResolver* aResolver) + : mResolver(aResolver) { - MOZ_ASSERT(mPromiseProxy); + MOZ_ASSERT(aResolver); } void @@ -344,23 +537,41 @@ public: { MOZ_ASSERT(aWorkerPrivate); aWorkerPrivate->AssertIsOnWorkerThread(); - mPromiseProxy->CleanUp(); + + mResolver->mPromiseProxy->CleanUp(); + + mResolver->mFetchObserver = nullptr; + + if (mResolver->mSignalProxy) { + mResolver->mSignalProxy->Shutdown(); + mResolver->mSignalProxy = nullptr; + } } }; class WorkerFetchResponseEndRunnable final : public MainThreadWorkerRunnable , public WorkerFetchResponseEndBase { + FetchDriverObserver::EndReason mReason; + public: - explicit WorkerFetchResponseEndRunnable(PromiseWorkerProxy* aPromiseProxy) - : MainThreadWorkerRunnable(aPromiseProxy->GetWorkerPrivate()) - , WorkerFetchResponseEndBase(aPromiseProxy) + WorkerFetchResponseEndRunnable(WorkerPrivate* aWorkerPrivate, + WorkerFetchResolver* aResolver, + FetchDriverObserver::EndReason aReason) + : MainThreadWorkerRunnable(aWorkerPrivate) + , WorkerFetchResponseEndBase(aResolver) + , mReason(aReason) { } bool WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override { + if (mReason == FetchDriverObserver::eAborted) { + RefPtr<Promise> promise = mResolver->mPromiseProxy->WorkerPromise(); + promise->MaybeReject(NS_ERROR_DOM_ABORT_ERR); + } + WorkerRunInternal(aWorkerPrivate); return true; } @@ -379,9 +590,10 @@ class WorkerFetchResponseEndControlRunnable final : public MainThreadWorkerContr , public WorkerFetchResponseEndBase { public: - explicit WorkerFetchResponseEndControlRunnable(PromiseWorkerProxy* aPromiseProxy) - : MainThreadWorkerControlRunnable(aPromiseProxy->GetWorkerPrivate()) - , WorkerFetchResponseEndBase(aPromiseProxy) + WorkerFetchResponseEndControlRunnable(WorkerPrivate* aWorkerPrivate, + WorkerFetchResolver* aResolver) + : MainThreadWorkerControlRunnable(aWorkerPrivate) + , WorkerFetchResponseEndBase(aResolver) { } @@ -415,7 +627,22 @@ WorkerFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse) } void -WorkerFetchResolver::OnResponseEnd() +WorkerFetchResolver::OnDataAvailable() +{ + AssertIsOnMainThread(); + + MutexAutoLock lock(mPromiseProxy->Lock()); + if (mPromiseProxy->CleanedUp()) { + return; + } + + RefPtr<WorkerDataAvailableRunnable> r = + new WorkerDataAvailableRunnable(mPromiseProxy->GetWorkerPrivate(), this); + Unused << r->Dispatch(); +} + +void +WorkerFetchResolver::OnResponseEnd(FetchDriverObserver::EndReason aReason) { AssertIsOnMainThread(); MutexAutoLock lock(mPromiseProxy->Lock()); @@ -426,11 +653,13 @@ WorkerFetchResolver::OnResponseEnd() FlushConsoleReport(); RefPtr<WorkerFetchResponseEndRunnable> r = - new WorkerFetchResponseEndRunnable(mPromiseProxy); + new WorkerFetchResponseEndRunnable(mPromiseProxy->GetWorkerPrivate(), + this, aReason); if (!r->Dispatch()) { RefPtr<WorkerFetchResponseEndControlRunnable> cr = - new WorkerFetchResponseEndControlRunnable(mPromiseProxy); + new WorkerFetchResponseEndControlRunnable(mPromiseProxy->GetWorkerPrivate(), + this); // This can fail if the worker thread is canceled or killed causing // the PromiseWorkerProxy to give up its WorkerHolder immediately, // allowing the worker thread to become Dead. |