summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Smith <brian@dbsoft.org>2023-09-30 11:19:06 -0500
committerBrian Smith <brian@dbsoft.org>2023-09-30 11:19:06 -0500
commitb7a5d73ba9752ab9d9c62497997592b290f38a15 (patch)
treeda74b60f852830ad1d91b41edbb88e45c49d6af4
parentaa1caf5ade21ed691579ef1245c25c354d2d4707 (diff)
downloaduxp-b7a5d73ba9752ab9d9c62497997592b290f38a15.tar.gz
Issue #1442 - Part 23 - Align FetchStream with Firefox 68ESR.
https://bugzilla.mozilla.org/show_bug.cgi?id=1612308 https://bugzilla.mozilla.org/show_bug.cgi?id=1445587 Partial part 2 implementing synchronization changes.
-rw-r--r--dom/fetch/Fetch.h8
-rw-r--r--dom/fetch/FetchStream.cpp132
-rw-r--r--dom/fetch/FetchStream.h31
3 files changed, 139 insertions, 32 deletions
diff --git a/dom/fetch/Fetch.h b/dom/fetch/Fetch.h
index 0f79a63021..d6fdd84f19 100644
--- a/dom/fetch/Fetch.h
+++ b/dom/fetch/Fetch.h
@@ -101,6 +101,8 @@ public:
virtual void NullifyStream() = 0;
+ virtual void MarkAsRead() = 0;
+
virtual JSObject* ReadableStreamBody() = 0;
};
@@ -237,6 +239,12 @@ public:
return mReadableStreamBody;
}
+ void
+ MarkAsRead() override
+ {
+ mBodyUsed = true;
+ }
+
virtual AbortSignal*
GetSignal() const = 0;
diff --git a/dom/fetch/FetchStream.cpp b/dom/fetch/FetchStream.cpp
index 63e6f150b9..8d2ac7b1df 100644
--- a/dom/fetch/FetchStream.cpp
+++ b/dom/fetch/FetchStream.cpp
@@ -4,6 +4,9 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "FetchStream.h"
+#include "mozilla/CycleCollectedJSContext.h"
+#include "mozilla/dom/ScriptSettings.h"
+#include "mozilla/Maybe.h"
#include "nsNetCID.h"
#include "nsITransport.h"
#include "nsIStreamTransportService.h"
@@ -181,8 +184,12 @@ FetchStream::RequestDataCallback(JSContext* aCx,
MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream));
RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+ stream->AssertIsOnOwningThread();
- MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting ||
+ MutexAutoLock lock(stream->mMutex);
+
+ MOZ_DIAGNOSTIC_ASSERT(stream->mState == eInitializing ||
+ stream->mState == eWaiting ||
stream->mState == eChecking ||
stream->mState == eReading);
@@ -199,6 +206,11 @@ FetchStream::RequestDataCallback(JSContext* aCx,
return;
}
+ if (stream->mState == eInitializing) {
+ // The stream has been used for the first time.
+ stream->mStreamHolder->MarkAsRead();
+ }
+
stream->mState = eReading;
if (!stream->mInputStream) {
@@ -209,7 +221,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
bool nonBlocking = false;
nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking);
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -219,7 +231,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
nsCOMPtr<nsIStreamTransportService> sts =
do_GetService(kStreamTransportServiceCID, &rv);
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -230,7 +242,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
/* aCloseWhenDone */ true,
getter_AddRefs(transport));
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -240,7 +252,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
/* aSegmentCount */ 0,
getter_AddRefs(wrapper));
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -257,7 +269,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
nsresult rv =
stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -277,6 +289,9 @@ FetchStream::WriteIntoReadRequestCallback(JSContext* aCx,
MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+ stream->AssertIsOnOwningThread();
+
+ MutexAutoLock lock(stream->mMutex);
MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting);
@@ -286,20 +301,20 @@ FetchStream::WriteIntoReadRequestCallback(JSContext* aCx,
nsresult rv =
stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written);
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
*aByteWritten = written;
if (written == 0) {
- stream->CloseAndReleaseObjects(aCx, aStream);
+ stream->CloseAndReleaseObjects(aCx, lock, aStream);
return;
}
rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
- stream->ErrorPropagation(aCx, aStream, rv);
+ stream->ErrorPropagation(aCx, lock, aStream, rv);
return;
}
@@ -319,6 +334,7 @@ FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream,
// We are guaranteed that won't happen until the js ReadableStream object
// is finalized.
FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
+ stream->AssertIsOnOwningThread();
if (stream->mInputStream) {
stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
@@ -344,6 +360,24 @@ FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
{
MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
+
+ // This is safe because we created an extra reference in FetchStream::Create()
+ // that won't be released until FetchStream::FinalizeCallback() is called.
+ // We are guaranteed that won't happen until the js ReadableStream object
+ // is finalized.
+ FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
+ stream->AssertIsOnOwningThread();
+
+ if (stream->mState == eInitializing) {
+ // The stream has been used for the first time.
+ stream->mStreamHolder->MarkAsRead();
+ }
+
+ if (stream->mInputStream) {
+ stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+ }
+
+ stream->ReleaseObjects();
}
void
@@ -364,7 +398,8 @@ FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags)
FetchStream::FetchStream(nsIGlobalObject* aGlobal,
FetchStreamHolder* aStreamHolder,
nsIInputStream* aInputStream)
- : mState(eWaiting)
+ : mMutex("FetchStream::mMutex")
+ , mState(eInitializing)
, mGlobal(aGlobal)
, mStreamHolder(aStreamHolder)
, mOriginalInputStream(aInputStream)
@@ -379,9 +414,13 @@ FetchStream::~FetchStream()
}
void
-FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream,
+FetchStream::ErrorPropagation(JSContext* aCx,
+ const MutexAutoLock& aProofOfLock,
+ JS::HandleObject aStream,
nsresult aError)
{
+ AssertIsOnOwningThread();
+
// Nothing to do.
if (mState == eClosed) {
return;
@@ -389,7 +428,7 @@ FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream,
// Let's close the stream.
if (aError == NS_BASE_STREAM_CLOSED) {
- CloseAndReleaseObjects(aCx, aStream);
+ CloseAndReleaseObjects(aCx, aProofOfLock, aStream);
return;
}
@@ -400,31 +439,34 @@ FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream,
JS::Rooted<JS::Value> errorValue(aCx);
if (ToJSValue(aCx, error, &errorValue)) {
+ MutexAutoUnlock unlock(mMutex);
JS::ReadableStreamError(aCx, aStream, errorValue);
}
+
+ ReleaseObjects(aProofOfLock);
}
NS_IMETHODIMP
FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
{
+ AssertIsOnOwningThread();
MOZ_DIAGNOSTIC_ASSERT(aStream);
+ Maybe<MutexAutoLock> lock;
+ lock.emplace(mMutex);
+
// Already closed. We have nothing else to do here.
if (mState == eClosed) {
return NS_OK;
}
+ nsAutoMicroTask mt;
+ AutoEntryScript aes(mGlobal, "fetch body data available");
+
MOZ_DIAGNOSTIC_ASSERT(mInputStream);
MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
-
- AutoJSAPI jsapi;
- if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
- // Without JSContext we are not able to close the stream or to propagate the
- // error.
- return NS_ERROR_FAILURE;
- }
-
- JSContext* cx = jsapi.cx();
+
+ JSContext* cx = aes.cx();
JS::Rooted<JSObject*> stream(cx, mStreamHolder->ReadableStreamBody());
uint64_t size = 0;
@@ -437,7 +479,7 @@ FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
// No warning for stream closed.
if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
- ErrorPropagation(cx, stream, rv);
+ ErrorPropagation(cx, *lock, stream, rv);
return NS_OK;
}
@@ -448,10 +490,10 @@ FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
}
mState = eWriting;
- JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
- // The WriteInto callback changes mState to eChecking.
- MOZ_DIAGNOSTIC_ASSERT(mState == eChecking);
+ lock.reset();
+
+ JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
return NS_OK;
}
@@ -465,6 +507,7 @@ FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource,
RefPtr<FetchStream> stream =
static_cast<FetchStream*>(aUnderlyingReadableStreamSource);
+ stream->AssertIsOnOwningThread();
// if mOriginalInputStream is null, the reading already started. We don't want
// to expose the internal inputStream.
@@ -480,28 +523,37 @@ FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource,
void
FetchStream::Close()
{
+ AssertIsOnOwningThread();
+
+ MutexAutoLock lock(mMutex);
+
if (mState == eClosed) {
return;
}
AutoJSAPI jsapi;
if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
- ReleaseObjects();
+ ReleaseObjects(lock);
return;
}
JSContext* cx = jsapi.cx();
JS::Rooted<JSObject*> stream(cx, mStreamHolder->ReadableStreamBody());
- CloseAndReleaseObjects(cx, stream);
+ CloseAndReleaseObjects(cx, lock, stream);
}
void
-FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream)
+FetchStream::CloseAndReleaseObjects(JSContext* aCx,
+ const MutexAutoLock& aProofOfLock,
+ JS::HandleObject aStream)
{
+ AssertIsOnOwningThread();
MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
- ReleaseObjects();
+ ReleaseObjects(aProofOfLock);
+
+ MutexAutoUnlock unlock(mMutex);
if (JS::ReadableStreamIsReadable(aStream)) {
JS::ReadableStreamClose(aCx, aStream);
@@ -511,7 +563,20 @@ FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream)
void
FetchStream::ReleaseObjects()
{
+ MutexAutoLock lock(mMutex);
+ ReleaseObjects(lock);
+}
+
+void
+FetchStream::ReleaseObjects(const MutexAutoLock& aProofOfLock)
+{
+ // This method can be called on 2 possible threads: the owning one and a JS
+ // thread used to release resources. If we are on the JS thread, we need to
+ // dispatch a runnable to go back to the owning thread in order to release
+ // resources correctly.
+
if (mState == eClosed) {
+ // Already gone. Nothing to do.
return;
}
@@ -541,6 +606,14 @@ FetchStream::ReleaseObjects()
}
}
+#ifdef DEBUG
+void
+FetchStream::AssertIsOnOwningThread()
+{
+ NS_ASSERT_OWNINGTHREAD(FetchStream);
+}
+#endif
+
// nsIObserver
// -----------
@@ -549,6 +622,7 @@ FetchStream::Observe(nsISupports* aSubject, const char* aTopic,
const char16_t* aData)
{
AssertIsOnMainThread();
+ AssertIsOnOwningThread();
MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0);
diff --git a/dom/fetch/FetchStream.h b/dom/fetch/FetchStream.h
index 80d120616d..66e1de5642 100644
--- a/dom/fetch/FetchStream.h
+++ b/dom/fetch/FetchStream.h
@@ -52,6 +52,14 @@ private:
nsIInputStream* aInputStream);
~FetchStream();
+#ifdef DEBUG
+ void
+ AssertIsOnOwningThread();
+#else
+ void
+ AssertIsOnOwningThread() {}
+#endif
+
static void
RequestDataCallback(JSContext* aCx, JS::HandleObject aStream,
void* aUnderlyingSource, uint8_t aFlags,
@@ -81,10 +89,19 @@ private:
FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags);
void
- ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aRv);
+ ErrorPropagation(JSContext* aCx,
+ const MutexAutoLock& aProofOfLock,
+ JS::HandleObject aStream, nsresult aRv);
+
+ void
+ CloseAndReleaseObjects(JSContext* aCx,
+ const MutexAutoLock& aProofOfLock,
+ JS::HandleObject aSteam);
+
+ class WorkerShutdown;
void
- CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream);
+ ReleaseObjects(const MutexAutoLock& aProofOfLock);
void
ReleaseObjects();
@@ -92,6 +109,9 @@ private:
// Common methods
enum State {
+ // This is the beginning state before any reading operation.
+ eInitializing,
+
// RequestDataCallback has not been called yet. We haven't started to read
// data from the stream yet.
eWaiting,
@@ -111,7 +131,12 @@ private:
eClosed,
};
- // Touched only on the target thread.
+ // We need a mutex because JS engine can release FetchStream on a non-owning
+ // thread. We must be sure that the releasing of resources doesn't trigger
+ // race conditions.
+ Mutex mMutex;
+
+ // Protected by mutex.
State mState;
nsCOMPtr<nsIGlobalObject> mGlobal;