Chromium Code Reviews| Index: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
| diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..fb92230d038cb2ecb3a7de59a56cc13f29811ae5 |
| --- /dev/null |
| +++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
| @@ -0,0 +1,282 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "config.h" |
| +#include "modules/fetch/ReadableStreamDataConsumerHandle.h" |
| + |
| +#include "bindings/core/v8/ExceptionState.h" |
| +#include "bindings/core/v8/ReadableStreamOperations.h" |
| +#include "bindings/core/v8/ScriptFunction.h" |
| +#include "bindings/core/v8/ScriptState.h" |
| +#include "bindings/core/v8/ScriptValue.h" |
| +#include "bindings/core/v8/V8BindingMacros.h" |
| +#include "bindings/core/v8/V8IteratorResultValue.h" |
| +#include "bindings/core/v8/V8RecursionScope.h" |
| +#include "bindings/core/v8/V8Uint8Array.h" |
| +#include "core/dom/DOMTypedArray.h" |
| +#include "public/platform/Platform.h" |
| +#include "public/platform/WebTaskRunner.h" |
| +#include "public/platform/WebThread.h" |
| +#include "public/platform/WebTraceLocation.h" |
| +#include "wtf/Assertions.h" |
| +#include "wtf/Functional.h" |
| +#include "wtf/RefCounted.h" |
| +#include <algorithm> |
| +#include <string.h> |
| +#include <v8.h> |
| + |
| +namespace blink { |
| + |
| +using Result = WebDataConsumerHandle::Result; |
| +using Flags = WebDataConsumerHandle::Flags; |
| + |
| +// This context is not yet thread-safe. |
| +class ReadableStreamDataConsumerHandle::Context final : public RefCounted<Context> { |
| + WTF_MAKE_NONCOPYABLE(Context); |
| +public: |
| + class OnFulfilled final : public ScriptFunction { |
| + public: |
| + static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<Context> context) |
| + { |
| + return (new OnFulfilled(scriptState, context))->bindToV8Function(); |
| + } |
| + |
| + ScriptValue call(ScriptValue v) override |
| + { |
| + bool done; |
| + v8::Local<v8::Value> value; |
| + v8::Local<v8::Value> item = v.v8Value(); |
| + if (!item->IsObject() || !v8Call(v8IterationItemUnpack(v.scriptState(), item.As<v8::Object>(), &done), value)) { |
| + m_context->onRejected(); |
| + return ScriptValue(); |
| + } |
| + if (done) { |
| + m_context->onReadDone(); |
| + return v; |
| + } |
| + if (!V8Uint8Array::hasInstance(value, v.isolate())) { |
| + m_context->onRejected(); |
| + return ScriptValue(); |
| + } |
| + m_context->onRead(V8Uint8Array::toImpl(value.As<v8::Object>())); |
| + return v; |
| + } |
| + |
| + private: |
| + OnFulfilled(ScriptState* scriptState, PassRefPtr<Context> context) |
| + : ScriptFunction(scriptState), m_context(context) {} |
| + |
| + RefPtr<Context> m_context; |
| + }; |
| + |
| + class OnRejected final : public ScriptFunction { |
| + public: |
| + static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<Context> context) |
| + { |
| + return (new OnRejected(scriptState, context))->bindToV8Function(); |
| + } |
| + |
| + ScriptValue call(ScriptValue v) override |
| + { |
| + m_context->onRejected(); |
| + return v; |
| + } |
| + |
| + private: |
| + OnRejected(ScriptState* scriptState, PassRefPtr<Context> context) |
| + : ScriptFunction(scriptState), m_context(context) {} |
| + |
| + RefPtr<Context> m_context; |
| + }; |
| + |
| + class ReaderImpl final : public FetchDataConsumerHandle::Reader { |
| + public: |
| + ReaderImpl(PassRefPtr<Context> context, Client* client) |
| + : m_context(context) |
| + { |
| + m_context->attachReader(client); |
| + } |
| + ~ReaderImpl() override |
| + { |
| + m_context->detachReader(); |
| + } |
| + |
| + Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override |
| + { |
| + *readSize = 0; |
| + const void* src = nullptr; |
| + size_t available; |
| + Result r = beginRead(&src, flags, &available); |
| + if (r != WebDataConsumerHandle::Ok) |
| + return r; |
| + *readSize = std::min(available, size); |
| + memcpy(buffer, src, *readSize); |
| + return endRead(*readSize); |
| + } |
| + |
| + Result beginRead(const void** buffer, Flags, size_t* available) override |
| + { |
| + return m_context->beginRead(buffer, available); |
| + } |
| + |
| + Result endRead(size_t readSize) override |
| + { |
| + return m_context->endRead(readSize); |
| + } |
| + |
| + private: |
| + RefPtr<Context> m_context; |
| + }; |
| + |
| + static PassRefPtr<Context> create(ScriptState* scriptState, v8::Local<v8::Value> stream) |
| + { |
| + return adoptRef(new Context(scriptState, stream)); |
| + } |
| + |
| + void attachReader(WebDataConsumerHandle::Client* client) |
| + { |
| + m_client = client; |
| + notifyLater(); |
| + } |
| + |
| + void detachReader() |
| + { |
| + m_client = nullptr; |
| + } |
| + |
| + Result beginRead(const void** buffer, size_t* available) |
| + { |
| + *buffer = nullptr; |
| + *available = 0; |
| + if (m_hasError) |
| + return WebDataConsumerHandle::UnexpectedError; |
| + if (m_isDone) |
| + return WebDataConsumerHandle::Done; |
| + |
| + if (m_pendingBuffer) { |
| + ASSERT(m_pendingOffset < m_pendingBuffer->length()); |
| + *buffer = m_pendingBuffer->data() + m_pendingOffset; |
| + *available = m_pendingBuffer->length() - m_pendingOffset; |
| + return WebDataConsumerHandle::Ok; |
| + } |
| + ASSERT(!m_reader.isEmpty()); |
| + m_isInRecursion = true; |
| + if (!m_isReading) { |
| + m_isReading = true; |
| + ScriptState::Scope scope(m_reader.scriptState()); |
| + V8RecursionScope recursionScope(m_reader.isolate()); |
| + ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then( |
| + OnFulfilled::createFunction(m_reader.scriptState(), this), |
| + OnRejected::createFunction(m_reader.scriptState(), this)); |
| + // Note: Microtasks may run here. |
| + } |
| + m_isInRecursion = false; |
| + return WebDataConsumerHandle::ShouldWait; |
| + } |
| + |
| + Result endRead(size_t readSize) |
| + { |
| + ASSERT(m_pendingBuffer); |
| + ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length()); |
| + m_pendingOffset += readSize; |
| + if (m_pendingOffset == m_pendingBuffer->length()) { |
| + m_pendingBuffer = nullptr; |
| + m_pendingOffset = 0; |
| + } |
| + return WebDataConsumerHandle::Ok; |
| + } |
| + |
| + void onRead(DOMUint8Array* buffer) |
| + { |
| + ASSERT(m_isReading); |
| + ASSERT(buffer); |
| + ASSERT(!m_pendingBuffer); |
| + ASSERT(!m_pendingOffset); |
| + m_isReading = false; |
| + m_pendingBuffer = buffer; |
| + notify(); |
| + } |
| + |
| + void onReadDone() |
| + { |
| + ASSERT(m_isReading); |
| + ASSERT(!m_pendingBuffer); |
| + m_isReading = false; |
| + m_isDone = true; |
| + m_reader.clear(); |
| + notify(); |
| + } |
| + |
| + void onRejected() |
| + { |
| + ASSERT(m_isReading); |
| + ASSERT(!m_pendingBuffer); |
| + m_hasError = true; |
| + m_isReading = false; |
| + m_reader.clear(); |
| + notify(); |
| + } |
| + |
| + void notify() |
| + { |
| + if (!m_client) |
| + return; |
| + if (m_isInRecursion) { |
| + notifyLater(); |
| + return; |
| + } |
| + m_client->didGetReadable(); |
| + } |
| + |
| + void notifyLater() |
| + { |
| + ASSERT(m_client); |
| + Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&Context::notify, this)); |
| + } |
| + |
| +private: |
| + Context(ScriptState* scriptState, v8::Local<v8::Value> stream) |
| + : m_client(nullptr) |
| + , m_pendingOffset(0) |
| + , m_isReading(false) |
| + , m_isDone(false) |
| + , m_hasError(false) |
| + , m_isInRecursion(false) |
| + { |
| + if (!ReadableStreamOperations::isLocked(scriptState, stream)) { |
| + TrackExceptionState es; |
|
tyoshino (SeeGerritForStatus)
2015/12/10 10:49:51
please write the reason you can ignore exceptions
yhirano
2015/12/10 11:18:10
Done.
|
| + m_reader = ReadableStreamOperations::getReader(scriptState, stream, es); |
| + } |
| + m_hasError = m_reader.isEmpty(); |
| + } |
| + |
| + // This ScriptValue is leaky because it stores a strong reference to a |
| + // JavaScript object. |
| + // TODO(yhirano): Fix it. |
| + // |
| + // Holding a ScriptValue here is safe in terms of cross-world wrapper |
| + // leakage because we read only Uint8Array chunks from the reader. |
| + ScriptValue m_reader; |
| + WebDataConsumerHandle::Client* m_client; |
| + RefPtr<DOMUint8Array> m_pendingBuffer; |
| + size_t m_pendingOffset; |
| + bool m_isReading; |
| + bool m_isDone; |
| + bool m_hasError; |
| + bool m_isInRecursion; |
| +}; |
| + |
| +ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream) |
| + : m_context(Context::create(scriptState, stream)) |
| +{ |
| +} |
| +ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default; |
| + |
| +FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client) |
| +{ |
| + return new Context::ReaderImpl(m_context, client); |
| +} |
| + |
| +} // namespace blink |
| + |