/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "FetchStream.h" #include "mozilla/CycleCollectedJSContext.h" #include "mozilla/dom/ScriptSettings.h" #include "mozilla/Maybe.h" #include "nsNetCID.h" #include "nsITransport.h" #include "nsIStreamTransportService.h" #include "nsProxyRelease.h" #include "WorkerPrivate.h" #include "WorkerRunnable.h" #include "Workers.h" #include "mozilla/dom/DOMError.h" #define FETCH_STREAM_FLAG 0 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); namespace mozilla { namespace dom { using namespace workers; namespace { class FetchStreamWorkerHolder final : public WorkerHolder { public: explicit FetchStreamWorkerHolder(FetchStream* aStream) : WorkerHolder() , mStream(aStream) , mWasNotified(false) {} bool Notify(Status aStatus) override { if (!mWasNotified) { mWasNotified = true; mStream->Close(); } return true; } WorkerPrivate* GetWorkerPrivate() const { return mWorkerPrivate; } private: RefPtr mStream; bool mWasNotified; }; class FetchStreamWorkerHolderShutdown final : public WorkerControlRunnable { public: FetchStreamWorkerHolderShutdown(WorkerPrivate* aWorkerPrivate, UniquePtr&& aHolder, nsCOMPtr&& aGlobal, RefPtr&& aStreamHolder) : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount) , mHolder(Move(aHolder)) , mGlobal(Move(aGlobal)) , mStreamHolder(Move(aStreamHolder)) {} bool WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override { mHolder = nullptr; mGlobal = nullptr; mStreamHolder->NullifyStream(); mStreamHolder = nullptr; return true; } // This runnable starts from a JS Thread. We need to disable a couple of // assertions by overriding the following methods. bool PreDispatch(WorkerPrivate* aWorkerPrivate) override { return true; } void PostDispatch(WorkerPrivate* aWorkerPrivate, bool aDispatchResult) override {} private: UniquePtr mHolder; nsCOMPtr mGlobal; RefPtr mStreamHolder; }; } // anonymous NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback, nsIObserver, nsISupportsWeakReference) /* static */ void FetchStream::Create(JSContext* aCx, FetchStreamHolder* aStreamHolder, nsIGlobalObject* aGlobal, nsIInputStream* aInputStream, JS::MutableHandle aStream, ErrorResult& aRv) { MOZ_DIAGNOSTIC_ASSERT(aCx); MOZ_DIAGNOSTIC_ASSERT(aInputStream); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); RefPtr stream = new FetchStream(aGlobal, aStreamHolder, aInputStream); if (NS_IsMainThread()) { nsCOMPtr os = mozilla::services::GetObserverService(); if (NS_WARN_IF(!os)) { aRv.Throw(NS_ERROR_FAILURE); return; } aRv = os->AddObserver(stream, DOM_WINDOW_DESTROYED_TOPIC, true); if (NS_WARN_IF(aRv.Failed())) { return; } } else { WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); MOZ_ASSERT(workerPrivate); UniquePtr holder( new FetchStreamWorkerHolder(stream)); if (NS_WARN_IF(!holder->HoldWorker(workerPrivate, Closing))) { aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR); return; } // Note, this will create a ref-cycle between the holder and the stream. // The cycle is broken when the stream is closed or the worker begins // shutting down. stream->mWorkerHolder = Move(holder); } if (!JS::HasReadableStreamCallbacks(aCx)) { JS::SetReadableStreamCallbacks(aCx, &FetchStream::RequestDataCallback, &FetchStream::WriteIntoReadRequestCallback, &FetchStream::CancelCallback, &FetchStream::ClosedCallback, &FetchStream::ErroredCallback, &FetchStream::FinalizeCallback); } JS::Rooted body(aCx, JS::NewReadableExternalSourceStreamObject(aCx, stream, FETCH_STREAM_FLAG)); if (!body) { aRv.StealExceptionFromJSContext(aCx); return; } // This will be released in FetchStream::FinalizeCallback(). We are // guaranteed the jsapi will call FinalizeCallback when ReadableStream // js object is finalized. NS_ADDREF(stream.get()); aStream.set(body); } /* static */ void FetchStream::RequestDataCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, size_t aDesiredSize) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)); RefPtr stream = static_cast(aUnderlyingSource); stream->AssertIsOnOwningThread(); MutexAutoLock lock(stream->mMutex); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eInitializing || stream->mState == eWaiting || stream->mState == eChecking || stream->mState == eReading); if (stream->mState == eReading) { // We are already reading data. return; } if (stream->mState == eChecking) { // If we are looking for more data, there is nothing else we should do: // let's move this checking operation in a reading. MOZ_ASSERT(stream->mInputStream); stream->mState = eReading; return; } if (stream->mState == eInitializing) { // The stream has been used for the first time. stream->mStreamHolder->MarkAsRead(); } stream->mState = eReading; if (!stream->mInputStream) { // This is the first use of the stream. Let's convert the // mOriginalInputStream into an nsIAsyncInputStream. MOZ_ASSERT(stream->mOriginalInputStream); bool nonBlocking = false; nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } nsCOMPtr asyncStream = do_QueryInterface(stream->mOriginalInputStream); if (!nonBlocking || !asyncStream) { nsCOMPtr sts = do_GetService(kStreamTransportServiceCID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } nsCOMPtr transport; rv = sts->CreateInputTransport(stream->mOriginalInputStream, /* aStartOffset */ 0, /* aReadLimit */ -1, /* aCloseWhenDone */ true, getter_AddRefs(transport)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } nsCOMPtr wrapper; rv = transport->OpenInputStream(/* aFlags */ 0, /* aSegmentSize */ 0, /* aSegmentCount */ 0, getter_AddRefs(wrapper)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } asyncStream = do_QueryInterface(wrapper); } stream->mInputStream = asyncStream; stream->mOriginalInputStream = nullptr; } MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream); nsresult rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } // All good. } /* static */ void FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, void* aBuffer, size_t aLength, size_t* aByteWritten) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(aBuffer); MOZ_DIAGNOSTIC_ASSERT(aByteWritten); RefPtr stream = static_cast(aUnderlyingSource); stream->AssertIsOnOwningThread(); MutexAutoLock lock(stream->mMutex); MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting); stream->mState = eChecking; uint32_t written; nsresult rv = stream->mInputStream->Read(static_cast(aBuffer), aLength, &written); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } *aByteWritten = written; if (written == 0) { stream->CloseAndReleaseObjects(aCx, lock, aStream); return; } rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, lock, aStream, rv); return; } // All good. } /* static */ JS::Value FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This is safe because we created an extra reference in FetchStream::Create() // that won't be released until FetchStream::FinalizeCallback() is called. // We are guaranteed that won't happen until the js ReadableStream object // is finalized. FetchStream* stream = static_cast(aUnderlyingSource); stream->AssertIsOnOwningThread(); if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } stream->ReleaseObjects(); return JS::UndefinedValue(); } /* static */ void FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); } /* static */ void FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This is safe because we created an extra reference in FetchStream::Create() // that won't be released until FetchStream::FinalizeCallback() is called. // We are guaranteed that won't happen until the js ReadableStream object // is finalized. FetchStream* stream = static_cast(aUnderlyingSource); stream->AssertIsOnOwningThread(); if (stream->mState == eInitializing) { // The stream has been used for the first time. stream->mStreamHolder->MarkAsRead(); } if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } stream->ReleaseObjects(); } void FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This can be called in any thread. // This takes ownership of the ref created in FetchStream::Create(). RefPtr stream = dont_AddRef(static_cast(aUnderlyingSource)); stream->ReleaseObjects(); } FetchStream::FetchStream(nsIGlobalObject* aGlobal, FetchStreamHolder* aStreamHolder, nsIInputStream* aInputStream) : mMutex("FetchStream::mMutex") , mState(eInitializing) , mGlobal(aGlobal) , mStreamHolder(aStreamHolder) , mOriginalInputStream(aInputStream) // TODO: Replace with mGlobal->EventTargetFor(TaskCategory::Other) // When we have the Dispatcher API in the tree, see Issue #1442 , mOwningEventTarget(NS_GetCurrentThread()) { MOZ_DIAGNOSTIC_ASSERT(aInputStream); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); } FetchStream::~FetchStream() { } void FetchStream::ErrorPropagation(JSContext* aCx, const MutexAutoLock& aProofOfLock, JS::HandleObject aStream, nsresult aError) { AssertIsOnOwningThread(); // Nothing to do. if (mState == eClosed) { return; } // Let's close the stream. if (aError == NS_BASE_STREAM_CLOSED) { CloseAndReleaseObjects(aCx, aProofOfLock, aStream); return; } nsCOMPtr window = do_QueryInterface(mGlobal); // Let's use a generic error. RefPtr error = new DOMError(window, NS_ERROR_DOM_TYPE_ERR); JS::Rooted errorValue(aCx); if (ToJSValue(aCx, error, &errorValue)) { MutexAutoUnlock unlock(mMutex); JS::ReadableStreamError(aCx, aStream, errorValue); } ReleaseObjects(aProofOfLock); } NS_IMETHODIMP FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(aStream); Maybe lock; lock.emplace(mMutex); // Already closed. We have nothing else to do here. if (mState == eClosed) { return NS_OK; } nsAutoMicroTask mt; AutoEntryScript aes(mGlobal, "fetch body data available"); MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); JSContext* cx = aes.cx(); JS::Rooted stream(cx, mStreamHolder->ReadableStreamBody()); uint64_t size = 0; nsresult rv = mInputStream->Available(&size); if (NS_SUCCEEDED(rv) && size == 0) { // In theory this should not happen. If size is 0, the stream should be // considered closed. rv = NS_BASE_STREAM_CLOSED; } // No warning for stream closed. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(cx, *lock, stream, rv); return NS_OK; } // This extra checking is completed. Let's wait for the next read request. if (mState == eChecking) { mState = eWaiting; return NS_OK; } mState = eWriting; lock.reset(); JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); return NS_OK; } /* static */ nsresult FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource, nsIInputStream** aInputStream) { MOZ_ASSERT(aUnderlyingReadableStreamSource); MOZ_ASSERT(aInputStream); RefPtr stream = static_cast(aUnderlyingReadableStreamSource); stream->AssertIsOnOwningThread(); // if mOriginalInputStream is null, the reading already started. We don't want // to expose the internal inputStream. if (NS_WARN_IF(!stream->mOriginalInputStream)) { return NS_ERROR_DOM_INVALID_STATE_ERR; } nsCOMPtr inputStream = stream->mOriginalInputStream; inputStream.forget(aInputStream); return NS_OK; } void FetchStream::Close() { AssertIsOnOwningThread(); MutexAutoLock lock(mMutex); if (mState == eClosed) { return; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { ReleaseObjects(lock); return; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, mStreamHolder->ReadableStreamBody()); CloseAndReleaseObjects(cx, lock, stream); } void FetchStream::CloseAndReleaseObjects(JSContext* aCx, const MutexAutoLock& aProofOfLock, JS::HandleObject aStream) { AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(mState != eClosed); ReleaseObjects(aProofOfLock); MutexAutoUnlock unlock(mMutex); if (JS::ReadableStreamIsReadable(aStream)) { JS::ReadableStreamClose(aCx, aStream); } } void FetchStream::ReleaseObjects() { MutexAutoLock lock(mMutex); ReleaseObjects(lock); } void FetchStream::ReleaseObjects(const MutexAutoLock& aProofOfLock) { // This method can be called on 2 possible threads: the owning one and a JS // thread used to release resources. If we are on the JS thread, we need to // dispatch a runnable to go back to the owning thread in order to release // resources correctly. if (mState == eClosed) { // Already gone. Nothing to do. return; } mState = eClosed; if (mWorkerHolder) { RefPtr r = new FetchStreamWorkerHolderShutdown( static_cast(mWorkerHolder.get())->GetWorkerPrivate(), Move(mWorkerHolder), Move(mGlobal), Move(mStreamHolder)); r->Dispatch(); } else { RefPtr self = this; RefPtr r = NS_NewRunnableFunction( [self] () { nsCOMPtr os = mozilla::services::GetObserverService(); if (os) { os->RemoveObserver(self, DOM_WINDOW_DESTROYED_TOPIC); } self->mGlobal = nullptr; self->mStreamHolder->NullifyStream(); self->mStreamHolder = nullptr; }); NS_DispatchToMainThread(r); } } #ifdef DEBUG void FetchStream::AssertIsOnOwningThread() { NS_ASSERT_OWNINGTHREAD(FetchStream); } #endif // nsIObserver // ----------- NS_IMETHODIMP FetchStream::Observe(nsISupports* aSubject, const char* aTopic, const char16_t* aData) { AssertIsOnMainThread(); AssertIsOnOwningThread(); MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0); nsCOMPtr window = do_QueryInterface(mGlobal); if (SameCOMIdentity(aSubject, window)) { Close(); } return NS_OK; } } // dom namespace } // mozilla namespace