Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(778)

Unified Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp

Issue 2365853002: Implement ReadableStreamBytesConsumer (Closed)
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..18bc282d09b8a9a559366255939cbcc99537fe69
--- /dev/null
+++ b/third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp
@@ -0,0 +1,229 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "modules/fetch/ReadableStreamBytesConsumer.h"
+
+#include "bindings/core/v8/ScopedPersistent.h"
+#include "bindings/core/v8/ScriptFunction.h"
+#include "bindings/core/v8/ScriptState.h"
+#include "bindings/core/v8/ScriptValue.h"
+#include "bindings/core/v8/V8BindingMacros.h"
+#include "bindings/core/v8/V8IteratorResultValue.h"
+#include "bindings/core/v8/V8Uint8Array.h"
+#include "core/streams/ReadableStreamOperations.h"
+#include "wtf/Assertions.h"
+#include "wtf/text/WTFString.h"
+#include <algorithm>
+#include <string.h>
+#include <v8.h>
+
+namespace blink {
+
+class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction {
+public:
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ {
+ return (new OnFulfilled(scriptState, consumer))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override
+ {
+ bool done;
+ v8::Local<v8::Value> item = v.v8Value();
+ DCHECK(item->IsObject());
+ v8::Local<v8::Value> value = v8UnpackIteratorResult(v.getScriptState(), item.As<v8::Object>(), &done).ToLocalChecked();
+ if (done) {
+ m_consumer->onReadDone();
+ return v;
+ }
+ if (!value->IsUint8Array()) {
+ m_consumer->onRejected();
+ return ScriptValue();
+ }
+ m_consumer->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
+ return v;
+ }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_consumer);
+ ScriptFunction::trace(visitor);
+ }
+
+private:
+ OnFulfilled(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ : ScriptFunction(scriptState), m_consumer(consumer) {}
+
+ Member<ReadableStreamBytesConsumer> m_consumer;
+};
+
+class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction {
+public:
+ static v8::Local<v8::Function> createFunction(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ {
+ return (new OnRejected(scriptState, consumer))->bindToV8Function();
+ }
+
+ ScriptValue call(ScriptValue v) override
+ {
+ m_consumer->onRejected();
+ return v;
+ }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_consumer);
+ ScriptFunction::trace(visitor);
+ }
+
+private:
+ OnRejected(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
+ : ScriptFunction(scriptState), m_consumer(consumer) {}
+
+ Member<ReadableStreamBytesConsumer> m_consumer;
+};
+
+ReadableStreamBytesConsumer::ReadableStreamBytesConsumer(ScriptState* scriptState, ScriptValue streamReader)
+ : m_reader(scriptState->isolate(), streamReader.v8Value())
+ , m_scriptState(scriptState)
+{
+ m_reader.setPhantom();
+}
+
+ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer()
+{
+}
+
+BytesConsumer::Result ReadableStreamBytesConsumer::beginRead(const char** buffer, size_t* available)
+{
+ *buffer = nullptr;
+ *available = 0;
+ if (m_state == PublicState::Errored)
+ return Result::Error;
+ if (m_state == PublicState::Closed)
+ return Result::Done;
+
+ if (m_pendingBuffer) {
+ DCHECK_LE(m_pendingOffset, m_pendingBuffer->length());
hiroshige 2016/09/30 06:35:14 This CL seems to allow 0-byte |m_pendingBuffer| wh
yhirano 2016/09/30 09:01:02 Because I think it's simpler.
hiroshige 2016/09/30 11:46:24 Acknowledged.
+ *buffer = reinterpret_cast<const char*>(m_pendingBuffer->data()) + m_pendingOffset;
+ *available = m_pendingBuffer->length() - m_pendingOffset;
+ return Result::Ok;
+ }
+ if (!m_isReading) {
+ m_isReading = true;
+ ScriptState::Scope scope(m_scriptState.get());
+ ScriptValue reader(m_scriptState.get(), m_reader.newLocal(m_scriptState->isolate()));
+ // The owner must retain the reader.
+ DCHECK(!reader.isEmpty());
+ ReadableStreamOperations::defaultReaderRead(
+ m_scriptState.get(), reader).then(
+ OnFulfilled::createFunction(m_scriptState.get(), this),
+ OnRejected::createFunction(m_scriptState.get(), this));
+ }
+ return Result::ShouldWait;
+}
+
+BytesConsumer::Result ReadableStreamBytesConsumer::endRead(size_t readSize)
+{
+ DCHECK(m_pendingBuffer);
+ DCHECK_LE(m_pendingOffset + readSize, m_pendingBuffer->length());
+ m_pendingOffset += readSize;
+ if (m_pendingOffset == m_pendingBuffer->length()) {
hiroshige 2016/09/30 06:35:14 Using '>=' instead of '==' is more robust for buff
yhirano 2016/09/30 09:01:02 Done.
+ m_pendingBuffer = nullptr;
+ m_pendingOffset = 0;
+ }
+ return Result::Ok;
+}
+
+void ReadableStreamBytesConsumer::setClient(Client* client)
+{
+ DCHECK(!m_client);
+ DCHECK(client);
+ m_client = client;
+}
+
+void ReadableStreamBytesConsumer::clearClient()
+{
+ m_client = nullptr;
+}
+
+void ReadableStreamBytesConsumer::cancel()
+{
+ if (m_state == PublicState::Closed || m_state == PublicState::Errored)
+ return;
+ m_state = PublicState::Closed;
+ clearClient();
+ m_reader.clear();
+}
+
+BytesConsumer::PublicState ReadableStreamBytesConsumer::getPublicState() const
+{
+ return m_state;
+}
+
+BytesConsumer::Error ReadableStreamBytesConsumer::getError() const
+{
+ return Error("Failed to read from a ReadableStream.");
+}
+
+DEFINE_TRACE(ReadableStreamBytesConsumer)
+{
+ visitor->trace(m_client);
+ visitor->trace(m_pendingBuffer);
+ BytesConsumer::trace(visitor);
+}
+
+void ReadableStreamBytesConsumer::dispose()
+{
+ m_reader.clear();
+}
+
+void ReadableStreamBytesConsumer::onRead(DOMUint8Array* buffer)
+{
+ DCHECK(m_isReading);
+ DCHECK(buffer);
+ DCHECK(!m_pendingBuffer);
+ DCHECK(!m_pendingOffset);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_pendingBuffer = buffer;
+ if (m_client)
+ m_client->onStateChange();
+}
+
+void ReadableStreamBytesConsumer::onReadDone()
+{
+ DCHECK(m_isReading);
+ DCHECK(!m_pendingBuffer);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_state = PublicState::Closed;
+ m_reader.clear();
+ Client* client = m_client;
+ clearClient();
+ if (client)
+ client->onStateChange();
+}
+
+void ReadableStreamBytesConsumer::onRejected()
+{
+ DCHECK(m_isReading);
+ DCHECK(!m_pendingBuffer);
+ m_isReading = false;
+ if (m_state == PublicState::Closed)
+ return;
+ DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
+ m_state = PublicState::Errored;
+ m_reader.clear();
+ Client* client = m_client;
+ clearClient();
+ if (client)
+ client->onStateChange();
+}
+
+} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698