| Index: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
|
| diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ace6a73db1f2ed353f1f11822294bbc6cee899c2
|
| --- /dev/null
|
| +++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp
|
| @@ -0,0 +1,289 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "config.h"
|
| +#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
|
| +
|
| +#include "bindings/core/v8/ExceptionState.h"
|
| +#include "bindings/core/v8/ReadableStreamOperations.h"
|
| +#include "bindings/core/v8/ScriptFunction.h"
|
| +#include "bindings/core/v8/ScriptState.h"
|
| +#include "bindings/core/v8/ScriptValue.h"
|
| +#include "bindings/core/v8/V8BindingMacros.h"
|
| +#include "bindings/core/v8/V8IteratorResultValue.h"
|
| +#include "bindings/core/v8/V8RecursionScope.h"
|
| +#include "bindings/core/v8/V8Uint8Array.h"
|
| +#include "core/dom/DOMTypedArray.h"
|
| +#include "public/platform/Platform.h"
|
| +#include "public/platform/WebTaskRunner.h"
|
| +#include "public/platform/WebThread.h"
|
| +#include "public/platform/WebTraceLocation.h"
|
| +#include "wtf/Assertions.h"
|
| +#include "wtf/Functional.h"
|
| +#include "wtf/RefCounted.h"
|
| +#include "wtf/WeakPtr.h"
|
| +#include <algorithm>
|
| +#include <string.h>
|
| +#include <v8.h>
|
| +
|
| +namespace blink {
|
| +
|
| +using Result = WebDataConsumerHandle::Result;
|
| +using Flags = WebDataConsumerHandle::Flags;
|
| +
|
| +// This context is not yet thread-safe.
|
| +class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted<ReadingContext> {
|
| + WTF_MAKE_NONCOPYABLE(ReadingContext);
|
| +public:
|
| + class OnFulfilled final : public ScriptFunction {
|
| + public:
|
| + static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
|
| + {
|
| + return (new OnFulfilled(scriptState, context))->bindToV8Function();
|
| + }
|
| +
|
| + ScriptValue call(ScriptValue v) override
|
| + {
|
| + RefPtr<ReadingContext> readingContext(m_readingContext.get());
|
| + if (!readingContext)
|
| + return v;
|
| + bool done;
|
| + v8::Local<v8::Value> item = v.v8Value();
|
| + ASSERT(item->IsObject());
|
| + v8::Local<v8::Value> value = v8CallOrCrash(v8UnpackIteratorResult(v.scriptState(), item.As<v8::Object>(), &done));
|
| + if (done) {
|
| + readingContext->onReadDone();
|
| + return v;
|
| + }
|
| + if (!V8Uint8Array::hasInstance(value, v.isolate())) {
|
| + readingContext->onRejected();
|
| + return ScriptValue();
|
| + }
|
| + readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
|
| + return v;
|
| + }
|
| +
|
| + private:
|
| + OnFulfilled(ScriptState* scriptState, WeakPtr<ReadingContext> context)
|
| + : ScriptFunction(scriptState), m_readingContext(context) {}
|
| +
|
| + WeakPtr<ReadingContext> m_readingContext;
|
| + };
|
| +
|
| + class OnRejected final : public ScriptFunction {
|
| + public:
|
| + static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
|
| + {
|
| + return (new OnRejected(scriptState, context))->bindToV8Function();
|
| + }
|
| +
|
| + ScriptValue call(ScriptValue v) override
|
| + {
|
| + RefPtr<ReadingContext> readingContext(m_readingContext.get());
|
| + if (!readingContext)
|
| + return v;
|
| + readingContext->onRejected();
|
| + return v;
|
| + }
|
| +
|
| + private:
|
| + OnRejected(ScriptState* scriptState, WeakPtr<ReadingContext> context)
|
| + : ScriptFunction(scriptState), m_readingContext(context) {}
|
| +
|
| + WeakPtr<ReadingContext> m_readingContext;
|
| + };
|
| +
|
| + class ReaderImpl final : public FetchDataConsumerHandle::Reader {
|
| + public:
|
| + ReaderImpl(PassRefPtr<ReadingContext> context, Client* client)
|
| + : m_readingContext(context)
|
| + {
|
| + m_readingContext->attachReader(client);
|
| + }
|
| + ~ReaderImpl() override
|
| + {
|
| + m_readingContext->detachReader();
|
| + }
|
| +
|
| + Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override
|
| + {
|
| + *readSize = 0;
|
| + const void* src = nullptr;
|
| + size_t available;
|
| + Result r = beginRead(&src, flags, &available);
|
| + if (r != WebDataConsumerHandle::Ok)
|
| + return r;
|
| + *readSize = std::min(available, size);
|
| + memcpy(buffer, src, *readSize);
|
| + return endRead(*readSize);
|
| + }
|
| +
|
| + Result beginRead(const void** buffer, Flags, size_t* available) override
|
| + {
|
| + return m_readingContext->beginRead(buffer, available);
|
| + }
|
| +
|
| + Result endRead(size_t readSize) override
|
| + {
|
| + return m_readingContext->endRead(readSize);
|
| + }
|
| +
|
| + private:
|
| + RefPtr<ReadingContext> m_readingContext;
|
| + };
|
| +
|
| + static PassRefPtr<ReadingContext> create(ScriptState* scriptState, v8::Local<v8::Value> stream)
|
| + {
|
| + return adoptRef(new ReadingContext(scriptState, stream));
|
| + }
|
| +
|
| + void attachReader(WebDataConsumerHandle::Client* client)
|
| + {
|
| + m_client = client;
|
| + notifyLater();
|
| + }
|
| +
|
| + void detachReader()
|
| + {
|
| + m_client = nullptr;
|
| + }
|
| +
|
| + Result beginRead(const void** buffer, size_t* available)
|
| + {
|
| + *buffer = nullptr;
|
| + *available = 0;
|
| + if (m_hasError)
|
| + return WebDataConsumerHandle::UnexpectedError;
|
| + if (m_isDone)
|
| + return WebDataConsumerHandle::Done;
|
| +
|
| + if (m_pendingBuffer) {
|
| + ASSERT(m_pendingOffset < m_pendingBuffer->length());
|
| + *buffer = m_pendingBuffer->data() + m_pendingOffset;
|
| + *available = m_pendingBuffer->length() - m_pendingOffset;
|
| + return WebDataConsumerHandle::Ok;
|
| + }
|
| + ASSERT(!m_reader.isEmpty());
|
| + m_isInRecursion = true;
|
| + if (!m_isReading) {
|
| + m_isReading = true;
|
| + ScriptState::Scope scope(m_reader.scriptState());
|
| + V8RecursionScope recursionScope(m_reader.isolate());
|
| + ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then(
|
| + OnFulfilled::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()),
|
| + OnRejected::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()));
|
| + // Note: Microtasks may run here.
|
| + }
|
| + m_isInRecursion = false;
|
| + return WebDataConsumerHandle::ShouldWait;
|
| + }
|
| +
|
| + Result endRead(size_t readSize)
|
| + {
|
| + ASSERT(m_pendingBuffer);
|
| + ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length());
|
| + m_pendingOffset += readSize;
|
| + if (m_pendingOffset == m_pendingBuffer->length()) {
|
| + m_pendingBuffer = nullptr;
|
| + m_pendingOffset = 0;
|
| + }
|
| + return WebDataConsumerHandle::Ok;
|
| + }
|
| +
|
| + void onRead(DOMUint8Array* buffer)
|
| + {
|
| + ASSERT(m_isReading);
|
| + ASSERT(buffer);
|
| + ASSERT(!m_pendingBuffer);
|
| + ASSERT(!m_pendingOffset);
|
| + m_isReading = false;
|
| + m_pendingBuffer = buffer;
|
| + notify();
|
| + }
|
| +
|
| + void onReadDone()
|
| + {
|
| + ASSERT(m_isReading);
|
| + ASSERT(!m_pendingBuffer);
|
| + m_isReading = false;
|
| + m_isDone = true;
|
| + m_reader.clear();
|
| + notify();
|
| + }
|
| +
|
| + void onRejected()
|
| + {
|
| + ASSERT(m_isReading);
|
| + ASSERT(!m_pendingBuffer);
|
| + m_hasError = true;
|
| + m_isReading = false;
|
| + m_reader.clear();
|
| + notify();
|
| + }
|
| +
|
| + void notify()
|
| + {
|
| + if (!m_client)
|
| + return;
|
| + if (m_isInRecursion) {
|
| + notifyLater();
|
| + return;
|
| + }
|
| + m_client->didGetReadable();
|
| + }
|
| +
|
| + void notifyLater()
|
| + {
|
| + ASSERT(m_client);
|
| + Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this)));
|
| + }
|
| +
|
| +private:
|
| + ReadingContext(ScriptState* scriptState, v8::Local<v8::Value> stream)
|
| + : m_client(nullptr)
|
| + , m_weakPtrFactory(this)
|
| + , m_pendingOffset(0)
|
| + , m_isReading(false)
|
| + , m_isDone(false)
|
| + , m_hasError(false)
|
| + , m_isInRecursion(false)
|
| + {
|
| + if (!ReadableStreamOperations::isLocked(scriptState, stream)) {
|
| + // Here the stream implementation must not throw an exception.
|
| + NonThrowableExceptionState es;
|
| + m_reader = ReadableStreamOperations::getReader(scriptState, stream, es);
|
| + }
|
| + m_hasError = m_reader.isEmpty();
|
| + }
|
| +
|
| + // This ScriptValue is leaky because it stores a strong reference to a
|
| + // JavaScript object.
|
| + // TODO(yhirano): Fix it.
|
| + //
|
| + // Holding a ScriptValue here is safe in terms of cross-world wrapper
|
| + // leakage because we read only Uint8Array chunks from the reader.
|
| + ScriptValue m_reader;
|
| + WebDataConsumerHandle::Client* m_client;
|
| + RefPtr<DOMUint8Array> m_pendingBuffer;
|
| + WeakPtrFactory<ReadingContext> m_weakPtrFactory;
|
| + size_t m_pendingOffset;
|
| + bool m_isReading;
|
| + bool m_isDone;
|
| + bool m_hasError;
|
| + bool m_isInRecursion;
|
| +};
|
| +
|
| +ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream)
|
| + : m_readingContext(ReadingContext::create(scriptState, stream))
|
| +{
|
| +}
|
| +ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default;
|
| +
|
| +FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client)
|
| +{
|
| + return new ReadingContext::ReaderImpl(m_readingContext, client);
|
| +}
|
| +
|
| +} // namespace blink
|
| +
|
|
|