Index: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
new file mode 100644 |
index 0000000000000000000000000000000000000000..1348da9fe2e123097f48fb29ccf4310a51426232 |
--- /dev/null |
+++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp |
@@ -0,0 +1,267 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "config.h" |
+#include "modules/fetch/ReadableStreamDataConsumerHandle.h" |
+ |
+#include "bindings/core/v8/ExceptionState.h" |
+#include "bindings/core/v8/ReadableStreamOperations.h" |
+#include "bindings/core/v8/ScriptFunction.h" |
+#include "bindings/core/v8/ScriptState.h" |
+#include "bindings/core/v8/ScriptValue.h" |
+#include "bindings/core/v8/V8BindingMacros.h" |
+#include "bindings/core/v8/V8IteratorResultValue.h" |
+#include "bindings/core/v8/V8Uint8Array.h" |
+#include "core/dom/DOMTypedArray.h" |
+#include "public/platform/Platform.h" |
+#include "public/platform/WebTaskRunner.h" |
+#include "public/platform/WebThread.h" |
+#include "public/platform/WebTraceLocation.h" |
tyoshino (SeeGerritForStatus)
2015/12/09 08:54:26
add wtf/Assertions.h
yhirano
2015/12/09 10:54:09
Done.
|
+#include "wtf/Functional.h" |
+#include "wtf/RefCounted.h" |
+#include <algorithm> |
+#include <string.h> |
+#include <v8.h> |
+ |
+namespace blink { |
+ |
+using Result = WebDataConsumerHandle::Result; |
+using Flags = WebDataConsumerHandle::Flags; |
+ |
+// This context is not yet thread-safe. |
+class ReadableStreamDataConsumerHandle::Context final : public RefCounted<Context> { |
+ WTF_MAKE_NONCOPYABLE(Context); |
+public: |
+ class OnFulfilled final : public ScriptFunction { |
+ public: |
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<Context> context) |
+ { |
+ return (new OnFulfilled(scriptState, context))->bindToV8Function(); |
+ } |
+ |
+ ScriptValue call(ScriptValue v) override |
+ { |
+ bool done; |
+ v8::Local<v8::Value> value; |
+ v8::Local<v8::Value> item = v.v8Value(); |
+ if (!item->IsObject() || !v8Call(v8IterationItemUnpack(v.scriptState(), item.As<v8::Object>(), &done), value)) { |
+ m_context->onRejected(); |
+ return ScriptValue(); |
+ } |
+ if (done) { |
+ m_context->onReadDone(); |
+ return v; |
+ } |
+ if (!V8Uint8Array::hasInstance(value, v.isolate())) { |
+ m_context->onRejected(); |
+ return ScriptValue(); |
+ } |
+ m_context->onRead(V8Uint8Array::toImpl(value.As<v8::Object>())); |
+ return v; |
+ } |
+ |
+ private: |
+ OnFulfilled(ScriptState* scriptState, PassRefPtr<Context> context) |
+ : ScriptFunction(scriptState), m_context(context) {} |
+ |
+ RefPtr<Context> m_context; |
+ }; |
+ |
+ class OnRejected final : public ScriptFunction { |
+ public: |
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<Context> context) |
+ { |
+ return (new OnRejected(scriptState, context))->bindToV8Function(); |
+ } |
+ |
+ ScriptValue call(ScriptValue v) override |
+ { |
+ m_context->onRejected(); |
+ return v; |
+ } |
+ |
+ private: |
+ OnRejected(ScriptState* scriptState, PassRefPtr<Context> context) |
+ : ScriptFunction(scriptState), m_context(context) {} |
+ |
+ RefPtr<Context> m_context; |
+ }; |
+ |
+ class ReaderImpl final : public FetchDataConsumerHandle::Reader { |
+ public: |
+ ReaderImpl(PassRefPtr<Context> context, Client* client) |
+ : m_context(context) |
+ { |
+ m_context->attachReader(client); |
+ } |
+ ~ReaderImpl() override |
+ { |
+ m_context->detachReader(); |
+ } |
+ |
+ Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override |
+ { |
+ *readSize = 0; |
+ const void* src = nullptr; |
+ size_t available; |
+ Result r = beginRead(&src, flags, &available); |
+ if (r != WebDataConsumerHandle::Ok) |
+ return r; |
+ *readSize = std::min(available, size); |
+ memcpy(buffer, src, *readSize); |
+ return endRead(*readSize); |
+ } |
+ |
+ Result beginRead(const void** buffer, Flags, size_t* available) override |
+ { |
+ return m_context->beginRead(buffer, available); |
+ } |
+ |
+ Result endRead(size_t readSize) override |
+ { |
+ return m_context->endRead(readSize); |
+ } |
+ |
+ private: |
+ RefPtr<Context> m_context; |
+ }; |
+ |
+ static PassRefPtr<Context> create(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ { |
+ return adoptRef(new Context(scriptState, stream)); |
+ } |
+ |
+ void attachReader(WebDataConsumerHandle::Client* client) |
+ { |
+ m_client = client; |
+ notifyLater(); |
+ } |
+ |
+ void detachReader() |
+ { |
+ m_client = nullptr; |
+ } |
+ |
+ Result beginRead(const void** buffer, size_t* available) |
+ { |
+ *buffer = nullptr; |
+ *available = 0; |
+ if (m_hasError) |
+ return WebDataConsumerHandle::UnexpectedError; |
+ if (m_isDone) |
+ return WebDataConsumerHandle::Done; |
+ |
+ if (m_pendingBuffer) { |
+ ASSERT(m_pendingOffset < m_pendingBuffer->length()); |
+ *buffer = m_pendingBuffer->data() + m_pendingOffset; |
+ *available = m_pendingBuffer->length() - m_pendingOffset; |
+ return WebDataConsumerHandle::Ok; |
+ } |
+ ASSERT(!m_reader.isEmpty()); |
+ if (!m_isReading) { |
+ m_isReading = true; |
+ ScriptState::Scope scope(m_reader.scriptState()); |
+ ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then( |
+ OnFulfilled::createFunction(m_reader.scriptState(), this), |
+ OnRejected::createFunction(m_reader.scriptState(), this)); |
+ } |
+ return WebDataConsumerHandle::ShouldWait; |
+ } |
+ |
+ Result endRead(size_t readSize) |
+ { |
+ ASSERT(m_pendingBuffer); |
+ ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length()); |
+ m_pendingOffset += readSize; |
+ if (m_pendingOffset == m_pendingBuffer->length()) { |
+ m_pendingBuffer = nullptr; |
+ m_pendingOffset = 0; |
+ } |
+ return WebDataConsumerHandle::Ok; |
+ } |
+ |
+ void onRead(DOMUint8Array* buffer) |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(buffer); |
+ ASSERT(!m_pendingBuffer); |
+ ASSERT(!m_pendingOffset); |
+ m_isReading = false; |
+ m_pendingBuffer = buffer; |
+ notify(); |
+ } |
+ |
+ void onReadDone() |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(!m_pendingBuffer); |
+ m_isReading = false; |
+ m_isDone = true; |
+ m_reader.clear(); |
+ notify(); |
+ } |
+ |
+ void onRejected() |
+ { |
+ ASSERT(m_isReading); |
+ ASSERT(!m_pendingBuffer); |
+ m_hasError = true; |
+ m_isReading = false; |
+ m_reader.clear(); |
+ notify(); |
+ } |
+ |
+ void notify() |
+ { |
+ if (m_client) |
+ m_client->didGetReadable(); |
+ } |
+ |
+ void notifyLater() |
+ { |
+ ASSERT(m_client); |
+ Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&Context::notify, this)); |
+ } |
+ |
+private: |
+ Context(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ : m_client(nullptr) |
+ , m_pendingOffset(0) |
+ , m_isReading(false) |
+ , m_isDone(false) |
+ , m_hasError(false) |
+ { |
+ if (!ReadableStreamOperations::isLocked(scriptState, stream)) { |
+ TrackExceptionState es; |
+ m_reader = ReadableStreamOperations::getReader(scriptState, stream, es); |
+ } |
+ m_hasError = m_reader.isEmpty(); |
+ } |
+ |
+ // This ScriptValue is leaky because it stores a strong reference to a |
+ // JavaScript object. Fix it. |
tyoshino (SeeGerritForStatus)
2015/12/09 08:54:26
TODO
yhirano
2015/12/09 10:54:09
Done.
|
+ // Holding a ScriptValue here is safe in terms of cross-world wrapper |
+ // leackage because we read only Uint8Array chunks from the reader. |
tyoshino (SeeGerritForStatus)
2015/12/09 08:54:26
leackage -> leakage
yhirano
2015/12/09 10:54:09
Done.
|
+ ScriptValue m_reader; |
+ WebDataConsumerHandle::Client* m_client; |
+ RefPtr<DOMUint8Array> m_pendingBuffer; |
+ size_t m_pendingOffset; |
+ bool m_isReading; |
+ bool m_isDone; |
+ bool m_hasError; |
+}; |
+ |
+ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream) |
+ : m_context(Context::create(scriptState, stream)) |
+{ |
+} |
+ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default; |
+ |
+FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client) |
+{ |
+ return new Context::ReaderImpl(m_context, client); |
+} |
+ |
+} // namespace blink |
+ |