Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(716)

Unified Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp

Issue 2365853002: Implement ReadableStreamBytesConsumer (Closed)
Patch Set: git cl format Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..27f03eeacd67d854f53f45331cda9efc84db7bf3
--- /dev/null
+++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
@@ -0,0 +1,219 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "modules/fetch/ReadableStreamBytesConsumer.h"
+
+#include "bindings/core/v8/ScopedPersistent.h"
+#include "bindings/core/v8/ScriptFunction.h"
+#include "bindings/core/v8/ScriptState.h"
+#include "bindings/core/v8/ScriptValue.h"
+#include "bindings/core/v8/V8BindingMacros.h"
+#include "bindings/core/v8/V8IteratorResultValue.h"
+#include "bindings/core/v8/V8Uint8Array.h"
+#include "core/streams/ReadableStreamOperations.h"
+#include "wtf/Assertions.h"
+#include "wtf/text/WTFString.h"
+#include <algorithm>
+#include <string.h>
+#include <v8.h>
+
+namespace blink {
+
+class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction {
+ public:
+ static v8::Local<v8::Function> createFunction(
+ ScriptState* scriptState,
+ ReadableStreamBytesConsumer* consumer) {
+ return (new OnFulfilled(scriptState, consumer))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override {
+ bool done;
+ v8::Local<v8::Value> item = v.v8Value();
+ DCHECK(item->IsObject());
+ v8::Local<v8::Value> value =
+ v8UnpackIteratorResult(v.getScriptState(), item.As<v8::Object>(), &done)
+ .ToLocalChecked();
+ if (done) {
+ m_consumer->onReadDone();
+ return v;
+ }
+ if (!value->IsUint8Array()) {
+ m_consumer->onRejected();
+ return ScriptValue();
+ }
+ m_consumer->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
+ return v;
+ }
+
+ DEFINE_INLINE_TRACE() {
+ visitor->trace(m_consumer);
+ ScriptFunction::trace(visitor);
+ }
+
+ private:
+ OnFulfilled(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ : ScriptFunction(scriptState), m_consumer(consumer) {}
+
+ Member<ReadableStreamBytesConsumer> m_consumer;
+};
+
+class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction {
+ public:
+ static v8::Local<v8::Function> createFunction(
+ ScriptState* scriptState,
+ ReadableStreamBytesConsumer* consumer) {
+ return (new OnRejected(scriptState, consumer))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override {
+ m_consumer->onRejected();
+ return v;
+ }
+
+ DEFINE_INLINE_TRACE() {
+ visitor->trace(m_consumer);
+ ScriptFunction::trace(visitor);
+ }
+
+ private:
+ OnRejected(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ : ScriptFunction(scriptState), m_consumer(consumer) {}
+
+ Member<ReadableStreamBytesConsumer> m_consumer;
+};
+
+ReadableStreamBytesConsumer::ReadableStreamBytesConsumer(
+ ScriptState* scriptState,
+ ScriptValue streamReader)
+ : m_reader(scriptState->isolate(), streamReader.v8Value()),
+ m_scriptState(scriptState) {
+ m_reader.setPhantom();
+}
+
+ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer() {}
+
+BytesConsumer::Result ReadableStreamBytesConsumer::beginRead(
+ const char** buffer,
+ size_t* available) {
+ *buffer = nullptr;
+ *available = 0;
+ if (m_state == PublicState::Errored)
+ return Result::Error;
+ if (m_state == PublicState::Closed)
+ return Result::Done;
+
+ if (m_pendingBuffer) {
+ DCHECK_LE(m_pendingOffset, m_pendingBuffer->length());
+ *buffer = reinterpret_cast<const char*>(m_pendingBuffer->data()) +
+ m_pendingOffset;
+ *available = m_pendingBuffer->length() - m_pendingOffset;
+ return Result::Ok;
+ }
+ if (!m_isReading) {
+ m_isReading = true;
+ ScriptState::Scope scope(m_scriptState.get());
+ ScriptValue reader(m_scriptState.get(),
+ m_reader.newLocal(m_scriptState->isolate()));
+ // The owner must retain the reader.
+ DCHECK(!reader.isEmpty());
+ ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader)
+ .then(OnFulfilled::createFunction(m_scriptState.get(), this),
+ OnRejected::createFunction(m_scriptState.get(), this));
+ }
+ return Result::ShouldWait;
+}
+
+BytesConsumer::Result ReadableStreamBytesConsumer::endRead(size_t readSize) {
+ DCHECK(m_pendingBuffer);
+ DCHECK_LE(m_pendingOffset + readSize, m_pendingBuffer->length());
+ m_pendingOffset += readSize;
+ if (m_pendingOffset >= m_pendingBuffer->length()) {
+ m_pendingBuffer = nullptr;
+ m_pendingOffset = 0;
+ }
+ return Result::Ok;
+}
+
+void ReadableStreamBytesConsumer::setClient(Client* client) {
+ DCHECK(!m_client);
+ DCHECK(client);
+ m_client = client;
+}
+
+void ReadableStreamBytesConsumer::clearClient() {
+ m_client = nullptr;
+}
+
+void ReadableStreamBytesConsumer::cancel() {
+ if (m_state == PublicState::Closed || m_state == PublicState::Errored)
+ return;
+ m_state = PublicState::Closed;
+ clearClient();
+ m_reader.clear();
+}
+
+BytesConsumer::PublicState ReadableStreamBytesConsumer::getPublicState() const {
+ return m_state;
+}
+
+BytesConsumer::Error ReadableStreamBytesConsumer::getError() const {
+ return Error("Failed to read from a ReadableStream.");
+}
+
+DEFINE_TRACE(ReadableStreamBytesConsumer) {
+ visitor->trace(m_client);
+ visitor->trace(m_pendingBuffer);
+ BytesConsumer::trace(visitor);
+}
+
+void ReadableStreamBytesConsumer::dispose() {
+ m_reader.clear();
+}
+
+void ReadableStreamBytesConsumer::onRead(DOMUint8Array* buffer) {
+ DCHECK(m_isReading);
+ DCHECK(buffer);
+ DCHECK(!m_pendingBuffer);
+ DCHECK(!m_pendingOffset);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_pendingBuffer = buffer;
+ if (m_client)
+ m_client->onStateChange();
+}
+
+void ReadableStreamBytesConsumer::onReadDone() {
+ DCHECK(m_isReading);
+ DCHECK(!m_pendingBuffer);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_state = PublicState::Closed;
+ m_reader.clear();
+ Client* client = m_client;
+ clearClient();
+ if (client)
+ client->onStateChange();
+}
+
+void ReadableStreamBytesConsumer::onRejected() {
+ DCHECK(m_isReading);
+ DCHECK(!m_pendingBuffer);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_state = PublicState::Errored;
+ m_reader.clear();
+ Client* client = m_client;
+ clearClient();
+ if (client)
+ client->onStateChange();
+}
+
+} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698