Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(688)

Unified Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp

Issue 1506023003: Response construction with a ReadableStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..ace6a73db1f2ed353f1f11822294bbc6cee899c2
--- /dev/null
+++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
@@ -0,0 +1,289 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "config.h"
+#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
+
+#include "bindings/core/v8/ExceptionState.h"
+#include "bindings/core/v8/ReadableStreamOperations.h"
+#include "bindings/core/v8/ScriptFunction.h"
+#include "bindings/core/v8/ScriptState.h"
+#include "bindings/core/v8/ScriptValue.h"
+#include "bindings/core/v8/V8BindingMacros.h"
+#include "bindings/core/v8/V8IteratorResultValue.h"
+#include "bindings/core/v8/V8RecursionScope.h"
+#include "bindings/core/v8/V8Uint8Array.h"
+#include "core/dom/DOMTypedArray.h"
+#include "public/platform/Platform.h"
+#include "public/platform/WebTaskRunner.h"
+#include "public/platform/WebThread.h"
+#include "public/platform/WebTraceLocation.h"
+#include "wtf/Assertions.h"
+#include "wtf/Functional.h"
+#include "wtf/RefCounted.h"
+#include "wtf/WeakPtr.h"
+#include <algorithm>
+#include <string.h>
+#include <v8.h>
+
+namespace blink {
+
+using Result = WebDataConsumerHandle::Result;
+using Flags = WebDataConsumerHandle::Flags;
+
+// This context is not yet thread-safe.
+class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted<ReadingContext> {
+ WTF_MAKE_NONCOPYABLE(ReadingContext);
+public:
+ class OnFulfilled final : public ScriptFunction {
+ public:
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
+ {
+ return (new OnFulfilled(scriptState, context))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override
+ {
+ RefPtr<ReadingContext> readingContext(m_readingContext.get());
+ if (!readingContext)
+ return v;
+ bool done;
+ v8::Local<v8::Value> item = v.v8Value();
+ ASSERT(item->IsObject());
+ v8::Local<v8::Value> value = v8CallOrCrash(v8UnpackIteratorResult(v.scriptState(), item.As<v8::Object>(), &done));
+ if (done) {
+ readingContext->onReadDone();
+ return v;
+ }
+ if (!V8Uint8Array::hasInstance(value, v.isolate())) {
+ readingContext->onRejected();
+ return ScriptValue();
+ }
+ readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
+ return v;
+ }
+
+ private:
+ OnFulfilled(ScriptState* scriptState, WeakPtr<ReadingContext> context)
+ : ScriptFunction(scriptState), m_readingContext(context) {}
+
+ WeakPtr<ReadingContext> m_readingContext;
+ };
+
+ class OnRejected final : public ScriptFunction {
+ public:
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
+ {
+ return (new OnRejected(scriptState, context))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override
+ {
+ RefPtr<ReadingContext> readingContext(m_readingContext.get());
+ if (!readingContext)
+ return v;
+ readingContext->onRejected();
+ return v;
+ }
+
+ private:
+ OnRejected(ScriptState* scriptState, WeakPtr<ReadingContext> context)
+ : ScriptFunction(scriptState), m_readingContext(context) {}
+
+ WeakPtr<ReadingContext> m_readingContext;
+ };
+
+ class ReaderImpl final : public FetchDataConsumerHandle::Reader {
+ public:
+ ReaderImpl(PassRefPtr<ReadingContext> context, Client* client)
+ : m_readingContext(context)
+ {
+ m_readingContext->attachReader(client);
+ }
+ ~ReaderImpl() override
+ {
+ m_readingContext->detachReader();
+ }
+
+ Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override
+ {
+ *readSize = 0;
+ const void* src = nullptr;
+ size_t available;
+ Result r = beginRead(&src, flags, &available);
+ if (r != WebDataConsumerHandle::Ok)
+ return r;
+ *readSize = std::min(available, size);
+ memcpy(buffer, src, *readSize);
+ return endRead(*readSize);
+ }
+
+ Result beginRead(const void** buffer, Flags, size_t* available) override
+ {
+ return m_readingContext->beginRead(buffer, available);
+ }
+
+ Result endRead(size_t readSize) override
+ {
+ return m_readingContext->endRead(readSize);
+ }
+
+ private:
+ RefPtr<ReadingContext> m_readingContext;
+ };
+
+ static PassRefPtr<ReadingContext> create(ScriptState* scriptState, v8::Local<v8::Value> stream)
+ {
+ return adoptRef(new ReadingContext(scriptState, stream));
+ }
+
+ void attachReader(WebDataConsumerHandle::Client* client)
+ {
+ m_client = client;
+ notifyLater();
+ }
+
+ void detachReader()
+ {
+ m_client = nullptr;
+ }
+
+ Result beginRead(const void** buffer, size_t* available)
+ {
+ *buffer = nullptr;
+ *available = 0;
+ if (m_hasError)
+ return WebDataConsumerHandle::UnexpectedError;
+ if (m_isDone)
+ return WebDataConsumerHandle::Done;
+
+ if (m_pendingBuffer) {
+ ASSERT(m_pendingOffset < m_pendingBuffer->length());
+ *buffer = m_pendingBuffer->data() + m_pendingOffset;
+ *available = m_pendingBuffer->length() - m_pendingOffset;
+ return WebDataConsumerHandle::Ok;
+ }
+ ASSERT(!m_reader.isEmpty());
+ m_isInRecursion = true;
+ if (!m_isReading) {
+ m_isReading = true;
+ ScriptState::Scope scope(m_reader.scriptState());
+ V8RecursionScope recursionScope(m_reader.isolate());
+ ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then(
+ OnFulfilled::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()),
+ OnRejected::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()));
+ // Note: Microtasks may run here.
+ }
+ m_isInRecursion = false;
+ return WebDataConsumerHandle::ShouldWait;
+ }
+
+ Result endRead(size_t readSize)
+ {
+ ASSERT(m_pendingBuffer);
+ ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length());
+ m_pendingOffset += readSize;
+ if (m_pendingOffset == m_pendingBuffer->length()) {
+ m_pendingBuffer = nullptr;
+ m_pendingOffset = 0;
+ }
+ return WebDataConsumerHandle::Ok;
+ }
+
+ void onRead(DOMUint8Array* buffer)
+ {
+ ASSERT(m_isReading);
+ ASSERT(buffer);
+ ASSERT(!m_pendingBuffer);
+ ASSERT(!m_pendingOffset);
+ m_isReading = false;
+ m_pendingBuffer = buffer;
+ notify();
+ }
+
+ void onReadDone()
+ {
+ ASSERT(m_isReading);
+ ASSERT(!m_pendingBuffer);
+ m_isReading = false;
+ m_isDone = true;
+ m_reader.clear();
+ notify();
+ }
+
+ void onRejected()
+ {
+ ASSERT(m_isReading);
+ ASSERT(!m_pendingBuffer);
+ m_hasError = true;
+ m_isReading = false;
+ m_reader.clear();
+ notify();
+ }
+
+ void notify()
+ {
+ if (!m_client)
+ return;
+ if (m_isInRecursion) {
+ notifyLater();
+ return;
+ }
+ m_client->didGetReadable();
+ }
+
+ void notifyLater()
+ {
+ ASSERT(m_client);
+ Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this)));
+ }
+
+private:
+ ReadingContext(ScriptState* scriptState, v8::Local<v8::Value> stream)
+ : m_client(nullptr)
+ , m_weakPtrFactory(this)
+ , m_pendingOffset(0)
+ , m_isReading(false)
+ , m_isDone(false)
+ , m_hasError(false)
+ , m_isInRecursion(false)
+ {
+ if (!ReadableStreamOperations::isLocked(scriptState, stream)) {
+ // Here the stream implementation must not throw an exception.
+ NonThrowableExceptionState es;
+ m_reader = ReadableStreamOperations::getReader(scriptState, stream, es);
+ }
+ m_hasError = m_reader.isEmpty();
+ }
+
+ // This ScriptValue is leaky because it stores a strong reference to a
+ // JavaScript object.
+ // TODO(yhirano): Fix it.
+ //
+ // Holding a ScriptValue here is safe in terms of cross-world wrapper
+ // leakage because we read only Uint8Array chunks from the reader.
+ ScriptValue m_reader;
+ WebDataConsumerHandle::Client* m_client;
+ RefPtr<DOMUint8Array> m_pendingBuffer;
+ WeakPtrFactory<ReadingContext> m_weakPtrFactory;
+ size_t m_pendingOffset;
+ bool m_isReading;
+ bool m_isDone;
+ bool m_hasError;
+ bool m_isInRecursion;
+};
+
+ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream)
+ : m_readingContext(ReadingContext::create(scriptState, stream))
+{
+}
+ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default;
+
+FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client)
+{
+ return new ReadingContext::ReaderImpl(m_readingContext, client);
+}
+
+} // namespace blink
+

Powered by Google App Engine
This is Rietveld 408576698