diff options
author | Brian Smith <brian@dbsoft.org> | 2023-09-30 11:19:06 -0500 |
---|---|---|
committer | Brian Smith <brian@dbsoft.org> | 2023-09-30 11:19:06 -0500 |
commit | b7a5d73ba9752ab9d9c62497997592b290f38a15 (patch) | |
tree | da74b60f852830ad1d91b41edbb88e45c49d6af4 | |
parent | aa1caf5ade21ed691579ef1245c25c354d2d4707 (diff) | |
download | uxp-b7a5d73ba9752ab9d9c62497997592b290f38a15.tar.gz |
Issue #1442 - Part 23 - Align FetchStream with Firefox 68ESR.
https://bugzilla.mozilla.org/show_bug.cgi?id=1612308
https://bugzilla.mozilla.org/show_bug.cgi?id=1445587
Partial part 2 implementing synchronization changes.
-rw-r--r-- | dom/fetch/Fetch.h | 8 | ||||
-rw-r--r-- | dom/fetch/FetchStream.cpp | 132 | ||||
-rw-r--r-- | dom/fetch/FetchStream.h | 31 |
3 files changed, 139 insertions, 32 deletions
diff --git a/dom/fetch/Fetch.h b/dom/fetch/Fetch.h index 0f79a63021..d6fdd84f19 100644 --- a/dom/fetch/Fetch.h +++ b/dom/fetch/Fetch.h @@ -101,6 +101,8 @@ public: virtual void NullifyStream() = 0;
+ virtual void MarkAsRead() = 0;
+
virtual JSObject* ReadableStreamBody() = 0;
};
@@ -237,6 +239,12 @@ public: return mReadableStreamBody;
}
+ void
+ MarkAsRead() override
+ {
+ mBodyUsed = true;
+ }
+
virtual AbortSignal*
GetSignal() const = 0;
diff --git a/dom/fetch/FetchStream.cpp b/dom/fetch/FetchStream.cpp index 63e6f150b9..8d2ac7b1df 100644 --- a/dom/fetch/FetchStream.cpp +++ b/dom/fetch/FetchStream.cpp @@ -4,6 +4,9 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "FetchStream.h" +#include "mozilla/CycleCollectedJSContext.h" +#include "mozilla/dom/ScriptSettings.h" +#include "mozilla/Maybe.h" #include "nsNetCID.h" #include "nsITransport.h" #include "nsIStreamTransportService.h" @@ -181,8 +184,12 @@ FetchStream::RequestDataCallback(JSContext* aCx, MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)); RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource); + stream->AssertIsOnOwningThread(); - MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting || + MutexAutoLock lock(stream->mMutex); + + MOZ_DIAGNOSTIC_ASSERT(stream->mState == eInitializing || + stream->mState == eWaiting || stream->mState == eChecking || stream->mState == eReading); @@ -199,6 +206,11 @@ FetchStream::RequestDataCallback(JSContext* aCx, return; } + if (stream->mState == eInitializing) { + // The stream has been used for the first time. + stream->mStreamHolder->MarkAsRead(); + } + stream->mState = eReading; if (!stream->mInputStream) { @@ -209,7 +221,7 @@ FetchStream::RequestDataCallback(JSContext* aCx, bool nonBlocking = false; nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -219,7 +231,7 @@ FetchStream::RequestDataCallback(JSContext* aCx, nsCOMPtr<nsIStreamTransportService> sts = do_GetService(kStreamTransportServiceCID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -230,7 +242,7 @@ FetchStream::RequestDataCallback(JSContext* aCx, /* aCloseWhenDone */ true, getter_AddRefs(transport)); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -240,7 +252,7 @@ FetchStream::RequestDataCallback(JSContext* aCx, /* aSegmentCount */ 0, getter_AddRefs(wrapper)); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -257,7 +269,7 @@ FetchStream::RequestDataCallback(JSContext* aCx, nsresult rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -277,6 +289,9 @@ FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, MOZ_DIAGNOSTIC_ASSERT(aByteWritten); RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource); + stream->AssertIsOnOwningThread(); + + MutexAutoLock lock(stream->mMutex); MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting); @@ -286,20 +301,20 @@ FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, nsresult rv = stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } *aByteWritten = written; if (written == 0) { - stream->CloseAndReleaseObjects(aCx, aStream); + stream->CloseAndReleaseObjects(aCx, lock, aStream); return; } rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { - stream->ErrorPropagation(aCx, aStream, rv); + stream->ErrorPropagation(aCx, lock, aStream, rv); return; } @@ -319,6 +334,7 @@ FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream, // We are guaranteed that won't happen until the js ReadableStream object // is finalized. FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource); + stream->AssertIsOnOwningThread(); if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); @@ -344,6 +360,24 @@ FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream, { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); + + // This is safe because we created an extra reference in FetchStream::Create() + // that won't be released until FetchStream::FinalizeCallback() is called. + // We are guaranteed that won't happen until the js ReadableStream object + // is finalized. + FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource); + stream->AssertIsOnOwningThread(); + + if (stream->mState == eInitializing) { + // The stream has been used for the first time. + stream->mStreamHolder->MarkAsRead(); + } + + if (stream->mInputStream) { + stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); + } + + stream->ReleaseObjects(); } void @@ -364,7 +398,8 @@ FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags) FetchStream::FetchStream(nsIGlobalObject* aGlobal, FetchStreamHolder* aStreamHolder, nsIInputStream* aInputStream) - : mState(eWaiting) + : mMutex("FetchStream::mMutex") + , mState(eInitializing) , mGlobal(aGlobal) , mStreamHolder(aStreamHolder) , mOriginalInputStream(aInputStream) @@ -379,9 +414,13 @@ FetchStream::~FetchStream() } void -FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, +FetchStream::ErrorPropagation(JSContext* aCx, + const MutexAutoLock& aProofOfLock, + JS::HandleObject aStream, nsresult aError) { + AssertIsOnOwningThread(); + // Nothing to do. if (mState == eClosed) { return; @@ -389,7 +428,7 @@ FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, // Let's close the stream. if (aError == NS_BASE_STREAM_CLOSED) { - CloseAndReleaseObjects(aCx, aStream); + CloseAndReleaseObjects(aCx, aProofOfLock, aStream); return; } @@ -400,31 +439,34 @@ FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, JS::Rooted<JS::Value> errorValue(aCx); if (ToJSValue(aCx, error, &errorValue)) { + MutexAutoUnlock unlock(mMutex); JS::ReadableStreamError(aCx, aStream, errorValue); } + + ReleaseObjects(aProofOfLock); } NS_IMETHODIMP FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { + AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(aStream); + Maybe<MutexAutoLock> lock; + lock.emplace(mMutex); + // Already closed. We have nothing else to do here. if (mState == eClosed) { return NS_OK; } + nsAutoMicroTask mt; + AutoEntryScript aes(mGlobal, "fetch body data available"); + MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); - - AutoJSAPI jsapi; - if (NS_WARN_IF(!jsapi.Init(mGlobal))) { - // Without JSContext we are not able to close the stream or to propagate the - // error. - return NS_ERROR_FAILURE; - } - - JSContext* cx = jsapi.cx(); + + JSContext* cx = aes.cx(); JS::Rooted<JSObject*> stream(cx, mStreamHolder->ReadableStreamBody()); uint64_t size = 0; @@ -437,7 +479,7 @@ FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) // No warning for stream closed. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { - ErrorPropagation(cx, stream, rv); + ErrorPropagation(cx, *lock, stream, rv); return NS_OK; } @@ -448,10 +490,10 @@ FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) } mState = eWriting; - JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); - // The WriteInto callback changes mState to eChecking. - MOZ_DIAGNOSTIC_ASSERT(mState == eChecking); + lock.reset(); + + JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); return NS_OK; } @@ -465,6 +507,7 @@ FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource, RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingReadableStreamSource); + stream->AssertIsOnOwningThread(); // if mOriginalInputStream is null, the reading already started. We don't want // to expose the internal inputStream. @@ -480,28 +523,37 @@ FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource, void FetchStream::Close() { + AssertIsOnOwningThread(); + + MutexAutoLock lock(mMutex); + if (mState == eClosed) { return; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { - ReleaseObjects(); + ReleaseObjects(lock); return; } JSContext* cx = jsapi.cx(); JS::Rooted<JSObject*> stream(cx, mStreamHolder->ReadableStreamBody()); - CloseAndReleaseObjects(cx, stream); + CloseAndReleaseObjects(cx, lock, stream); } void -FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream) +FetchStream::CloseAndReleaseObjects(JSContext* aCx, + const MutexAutoLock& aProofOfLock, + JS::HandleObject aStream) { + AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(mState != eClosed); - ReleaseObjects(); + ReleaseObjects(aProofOfLock); + + MutexAutoUnlock unlock(mMutex); if (JS::ReadableStreamIsReadable(aStream)) { JS::ReadableStreamClose(aCx, aStream); @@ -511,7 +563,20 @@ FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream) void FetchStream::ReleaseObjects() { + MutexAutoLock lock(mMutex); + ReleaseObjects(lock); +} + +void +FetchStream::ReleaseObjects(const MutexAutoLock& aProofOfLock) +{ + // This method can be called on 2 possible threads: the owning one and a JS + // thread used to release resources. If we are on the JS thread, we need to + // dispatch a runnable to go back to the owning thread in order to release + // resources correctly. + if (mState == eClosed) { + // Already gone. Nothing to do. return; } @@ -541,6 +606,14 @@ FetchStream::ReleaseObjects() } } +#ifdef DEBUG +void +FetchStream::AssertIsOnOwningThread() +{ + NS_ASSERT_OWNINGTHREAD(FetchStream); +} +#endif + // nsIObserver // ----------- @@ -549,6 +622,7 @@ FetchStream::Observe(nsISupports* aSubject, const char* aTopic, const char16_t* aData) { AssertIsOnMainThread(); + AssertIsOnOwningThread(); MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0); diff --git a/dom/fetch/FetchStream.h b/dom/fetch/FetchStream.h index 80d120616d..66e1de5642 100644 --- a/dom/fetch/FetchStream.h +++ b/dom/fetch/FetchStream.h @@ -52,6 +52,14 @@ private: nsIInputStream* aInputStream); ~FetchStream(); +#ifdef DEBUG + void + AssertIsOnOwningThread(); +#else + void + AssertIsOnOwningThread() {} +#endif + static void RequestDataCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, @@ -81,10 +89,19 @@ private: FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags); void - ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aRv); + ErrorPropagation(JSContext* aCx, + const MutexAutoLock& aProofOfLock, + JS::HandleObject aStream, nsresult aRv); + + void + CloseAndReleaseObjects(JSContext* aCx, + const MutexAutoLock& aProofOfLock, + JS::HandleObject aSteam); + + class WorkerShutdown; void - CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream); + ReleaseObjects(const MutexAutoLock& aProofOfLock); void ReleaseObjects(); @@ -92,6 +109,9 @@ private: // Common methods enum State { + // This is the beginning state before any reading operation. + eInitializing, + // RequestDataCallback has not been called yet. We haven't started to read // data from the stream yet. eWaiting, @@ -111,7 +131,12 @@ private: eClosed, }; - // Touched only on the target thread. + // We need a mutex because JS engine can release FetchStream on a non-owning + // thread. We must be sure that the releasing of resources doesn't trigger + // race conditions. + Mutex mMutex; + + // Protected by mutex. State mState; nsCOMPtr<nsIGlobalObject> mGlobal; |