Index: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ace6a73db1f2ed353f1f11822294bbc6cee899c2 |
--- /dev/null |
+++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
@@ -0,0 +1,289 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "config.h" |
+#include "modules/fetch/ReadableStreamDataConsumerHandle.h" |
+ |
+#include "bindings/core/v8/ExceptionState.h" |
+#include "bindings/core/v8/ReadableStreamOperations.h" |
+#include "bindings/core/v8/ScriptFunction.h" |
+#include "bindings/core/v8/ScriptState.h" |
+#include "bindings/core/v8/ScriptValue.h" |
+#include "bindings/core/v8/V8BindingMacros.h" |
+#include "bindings/core/v8/V8IteratorResultValue.h" |
+#include "bindings/core/v8/V8RecursionScope.h" |
+#include "bindings/core/v8/V8Uint8Array.h" |
+#include "core/dom/DOMTypedArray.h" |
+#include "public/platform/Platform.h" |
+#include "public/platform/WebTaskRunner.h" |
+#include "public/platform/WebThread.h" |
+#include "public/platform/WebTraceLocation.h" |
+#include "wtf/Assertions.h" |
+#include "wtf/Functional.h" |
+#include "wtf/RefCounted.h" |
+#include "wtf/WeakPtr.h" |
+#include <algorithm> |
+#include <string.h> |
+#include <v8.h> |
+ |
+namespace blink { |
+ |
+using Result = WebDataConsumerHandle::Result; |
+using Flags = WebDataConsumerHandle::Flags; |
+ |
+// This context is not yet thread-safe. |
+class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted<ReadingContext> { |
+ WTF_MAKE_NONCOPYABLE(ReadingContext); |
+public: |
+ class OnFulfilled final : public ScriptFunction { |
+ public: |
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context) |
+ { |
+ return (new OnFulfilled(scriptState, context))->bindToV8Function(); |
+ } |
+ |
+ ScriptValue call(ScriptValue v) override |
+ { |
+ RefPtr<ReadingContext> readingContext(m_readingContext.get()); |
+ if (!readingContext) |
+ return v; |
+ bool done; |
+ v8::Local<v8::Value> item = v.v8Value(); |
+ ASSERT(item->IsObject()); |
+ v8::Local<v8::Value> value = v8CallOrCrash(v8UnpackIteratorResult(v.scriptState(), item.As<v8::Object>(), &done)); |
+ if (done) { |
+ readingContext->onReadDone(); |
+ return v; |
+ } |
+ if (!V8Uint8Array::hasInstance(value, v.isolate())) { |
+ readingContext->onRejected(); |
+ return ScriptValue(); |
+ } |
+ readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>())); |
+ return v; |
+ } |
+ |
+ private: |
+ OnFulfilled(ScriptState* scriptState, WeakPtr<ReadingContext> context) |
+ : ScriptFunction(scriptState), m_readingContext(context) {} |
+ |
+ WeakPtr<ReadingContext> m_readingContext; |
+ }; |
+ |
+ class OnRejected final : public ScriptFunction { |
+ public: |
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context) |
+ { |
+ return (new OnRejected(scriptState, context))->bindToV8Function(); |
+ } |
+ |
+ ScriptValue call(ScriptValue v) override |
+ { |
+ RefPtr<ReadingContext> readingContext(m_readingContext.get()); |
+ if (!readingContext) |
+ return v; |
+ readingContext->onRejected(); |
+ return v; |
+ } |
+ |
+ private: |
+ OnRejected(ScriptState* scriptState, WeakPtr<ReadingContext> context) |
+ : ScriptFunction(scriptState), m_readingContext(context) {} |
+ |
+ WeakPtr<ReadingContext> m_readingContext; |
+ }; |
+ |
+ class ReaderImpl final : public FetchDataConsumerHandle::Reader { |
+ public: |
+ ReaderImpl(PassRefPtr<ReadingContext> context, Client* client) |
+ : m_readingContext(context) |
+ { |
+ m_readingContext->attachReader(client); |
+ } |
+ ~ReaderImpl() override |
+ { |
+ m_readingContext->detachReader(); |
+ } |
+ |
+ Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override |
+ { |
+ *readSize = 0; |
+ const void* src = nullptr; |
+ size_t available; |
+ Result r = beginRead(&src, flags, &available); |
+ if (r != WebDataConsumerHandle::Ok) |
+ return r; |
+ *readSize = std::min(available, size); |
+ memcpy(buffer, src, *readSize); |
+ return endRead(*readSize); |
+ } |
+ |
+ Result beginRead(const void** buffer, Flags, size_t* available) override |
+ { |
+ return m_readingContext->beginRead(buffer, available); |
+ } |
+ |
+ Result endRead(size_t readSize) override |
+ { |
+ return m_readingContext->endRead(readSize); |
+ } |
+ |
+ private: |
+ RefPtr<ReadingContext> m_readingContext; |
+ }; |
+ |
+ static PassRefPtr<ReadingContext> create(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ { |
+ return adoptRef(new ReadingContext(scriptState, stream)); |
+ } |
+ |
+ void attachReader(WebDataConsumerHandle::Client* client) |
+ { |
+ m_client = client; |
+ notifyLater(); |
+ } |
+ |
+ void detachReader() |
+ { |
+ m_client = nullptr; |
+ } |
+ |
+ Result beginRead(const void** buffer, size_t* available) |
+ { |
+ *buffer = nullptr; |
+ *available = 0; |
+ if (m_hasError) |
+ return WebDataConsumerHandle::UnexpectedError; |
+ if (m_isDone) |
+ return WebDataConsumerHandle::Done; |
+ |
+ if (m_pendingBuffer) { |
+ ASSERT(m_pendingOffset < m_pendingBuffer->length()); |
+ *buffer = m_pendingBuffer->data() + m_pendingOffset; |
+ *available = m_pendingBuffer->length() - m_pendingOffset; |
+ return WebDataConsumerHandle::Ok; |
+ } |
+ ASSERT(!m_reader.isEmpty()); |
+ m_isInRecursion = true; |
+ if (!m_isReading) { |
+ m_isReading = true; |
+ ScriptState::Scope scope(m_reader.scriptState()); |
+ V8RecursionScope recursionScope(m_reader.isolate()); |
+ ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then( |
+ OnFulfilled::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()), |
+ OnRejected::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr())); |
+ // Note: Microtasks may run here. |
+ } |
+ m_isInRecursion = false; |
+ return WebDataConsumerHandle::ShouldWait; |
+ } |
+ |
+ Result endRead(size_t readSize) |
+ { |
+ ASSERT(m_pendingBuffer); |
+ ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length()); |
+ m_pendingOffset += readSize; |
+ if (m_pendingOffset == m_pendingBuffer->length()) { |
+ m_pendingBuffer = nullptr; |
+ m_pendingOffset = 0; |
+ } |
+ return WebDataConsumerHandle::Ok; |
+ } |
+ |
+ void onRead(DOMUint8Array* buffer) |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(buffer); |
+ ASSERT(!m_pendingBuffer); |
+ ASSERT(!m_pendingOffset); |
+ m_isReading = false; |
+ m_pendingBuffer = buffer; |
+ notify(); |
+ } |
+ |
+ void onReadDone() |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(!m_pendingBuffer); |
+ m_isReading = false; |
+ m_isDone = true; |
+ m_reader.clear(); |
+ notify(); |
+ } |
+ |
+ void onRejected() |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(!m_pendingBuffer); |
+ m_hasError = true; |
+ m_isReading = false; |
+ m_reader.clear(); |
+ notify(); |
+ } |
+ |
+ void notify() |
+ { |
+ if (!m_client) |
+ return; |
+ if (m_isInRecursion) { |
+ notifyLater(); |
+ return; |
+ } |
+ m_client->didGetReadable(); |
+ } |
+ |
+ void notifyLater() |
+ { |
+ ASSERT(m_client); |
+ Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this))); |
+ } |
+ |
+private: |
+ ReadingContext(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ : m_client(nullptr) |
+ , m_weakPtrFactory(this) |
+ , m_pendingOffset(0) |
+ , m_isReading(false) |
+ , m_isDone(false) |
+ , m_hasError(false) |
+ , m_isInRecursion(false) |
+ { |
+ if (!ReadableStreamOperations::isLocked(scriptState, stream)) { |
+ // Here the stream implementation must not throw an exception. |
+ NonThrowableExceptionState es; |
+ m_reader = ReadableStreamOperations::getReader(scriptState, stream, es); |
+ } |
+ m_hasError = m_reader.isEmpty(); |
+ } |
+ |
+ // This ScriptValue is leaky because it stores a strong reference to a |
+ // JavaScript object. |
+ // TODO(yhirano): Fix it. |
+ // |
+ // Holding a ScriptValue here is safe in terms of cross-world wrapper |
+ // leakage because we read only Uint8Array chunks from the reader. |
+ ScriptValue m_reader; |
+ WebDataConsumerHandle::Client* m_client; |
+ RefPtr<DOMUint8Array> m_pendingBuffer; |
+ WeakPtrFactory<ReadingContext> m_weakPtrFactory; |
+ size_t m_pendingOffset; |
+ bool m_isReading; |
+ bool m_isDone; |
+ bool m_hasError; |
+ bool m_isInRecursion; |
+}; |
+ |
+ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ : m_readingContext(ReadingContext::create(scriptState, stream)) |
+{ |
+} |
+ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default; |
+ |
+FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client) |
+{ |
+ return new ReadingContext::ReaderImpl(m_readingContext, client); |
+} |
+ |
+} // namespace blink |
+ |