Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(513)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamBytesConsumer.cpp

Issue 2365853002: Implement ReadableStreamBytesConsumer (Closed)
Patch Set: git cl format Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "modules/fetch/ReadableStreamBytesConsumer.h"
6
7 #include "bindings/core/v8/ScopedPersistent.h"
8 #include "bindings/core/v8/ScriptFunction.h"
9 #include "bindings/core/v8/ScriptState.h"
10 #include "bindings/core/v8/ScriptValue.h"
11 #include "bindings/core/v8/V8BindingMacros.h"
12 #include "bindings/core/v8/V8IteratorResultValue.h"
13 #include "bindings/core/v8/V8Uint8Array.h"
14 #include "core/streams/ReadableStreamOperations.h"
15 #include "wtf/Assertions.h"
16 #include "wtf/text/WTFString.h"
17 #include <algorithm>
18 #include <string.h>
19 #include <v8.h>
20
21 namespace blink {
22
23 class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction {
24 public:
25 static v8::Local<v8::Function> createFunction(
26 ScriptState* scriptState,
27 ReadableStreamBytesConsumer* consumer) {
28 return (new OnFulfilled(scriptState, consumer))->bindToV8Function();
29 }
30
31 ScriptValue call(ScriptValue v) override {
32 bool done;
33 v8::Local<v8::Value> item = v.v8Value();
34 DCHECK(item->IsObject());
35 v8::Local<v8::Value> value =
36 v8UnpackIteratorResult(v.getScriptState(), item.As<v8::Object>(), &done)
37 .ToLocalChecked();
38 if (done) {
39 m_consumer->onReadDone();
40 return v;
41 }
42 if (!value->IsUint8Array()) {
43 m_consumer->onRejected();
44 return ScriptValue();
45 }
46 m_consumer->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
47 return v;
48 }
49
50 DEFINE_INLINE_TRACE() {
51 visitor->trace(m_consumer);
52 ScriptFunction::trace(visitor);
53 }
54
55 private:
56 OnFulfilled(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
57 : ScriptFunction(scriptState), m_consumer(consumer) {}
58
59 Member<ReadableStreamBytesConsumer> m_consumer;
60 };
61
62 class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction {
63 public:
64 static v8::Local<v8::Function> createFunction(
65 ScriptState* scriptState,
66 ReadableStreamBytesConsumer* consumer) {
67 return (new OnRejected(scriptState, consumer))->bindToV8Function();
68 }
69
70 ScriptValue call(ScriptValue v) override {
71 m_consumer->onRejected();
72 return v;
73 }
74
75 DEFINE_INLINE_TRACE() {
76 visitor->trace(m_consumer);
77 ScriptFunction::trace(visitor);
78 }
79
80 private:
81 OnRejected(ScriptState* scriptState, ReadableStreamBytesConsumer* consumer)
82 : ScriptFunction(scriptState), m_consumer(consumer) {}
83
84 Member<ReadableStreamBytesConsumer> m_consumer;
85 };
86
87 ReadableStreamBytesConsumer::ReadableStreamBytesConsumer(
88 ScriptState* scriptState,
89 ScriptValue streamReader)
90 : m_reader(scriptState->isolate(), streamReader.v8Value()),
91 m_scriptState(scriptState) {
92 m_reader.setPhantom();
93 }
94
95 ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer() {}
96
97 BytesConsumer::Result ReadableStreamBytesConsumer::beginRead(
98 const char** buffer,
99 size_t* available) {
100 *buffer = nullptr;
101 *available = 0;
102 if (m_state == PublicState::Errored)
103 return Result::Error;
104 if (m_state == PublicState::Closed)
105 return Result::Done;
106
107 if (m_pendingBuffer) {
108 DCHECK_LE(m_pendingOffset, m_pendingBuffer->length());
109 *buffer = reinterpret_cast<const char*>(m_pendingBuffer->data()) +
110 m_pendingOffset;
111 *available = m_pendingBuffer->length() - m_pendingOffset;
112 return Result::Ok;
113 }
114 if (!m_isReading) {
115 m_isReading = true;
116 ScriptState::Scope scope(m_scriptState.get());
117 ScriptValue reader(m_scriptState.get(),
118 m_reader.newLocal(m_scriptState->isolate()));
119 // The owner must retain the reader.
120 DCHECK(!reader.isEmpty());
121 ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader)
122 .then(OnFulfilled::createFunction(m_scriptState.get(), this),
123 OnRejected::createFunction(m_scriptState.get(), this));
124 }
125 return Result::ShouldWait;
126 }
127
128 BytesConsumer::Result ReadableStreamBytesConsumer::endRead(size_t readSize) {
129 DCHECK(m_pendingBuffer);
130 DCHECK_LE(m_pendingOffset + readSize, m_pendingBuffer->length());
131 m_pendingOffset += readSize;
132 if (m_pendingOffset >= m_pendingBuffer->length()) {
133 m_pendingBuffer = nullptr;
134 m_pendingOffset = 0;
135 }
136 return Result::Ok;
137 }
138
139 void ReadableStreamBytesConsumer::setClient(Client* client) {
140 DCHECK(!m_client);
141 DCHECK(client);
142 m_client = client;
143 }
144
145 void ReadableStreamBytesConsumer::clearClient() {
146 m_client = nullptr;
147 }
148
149 void ReadableStreamBytesConsumer::cancel() {
150 if (m_state == PublicState::Closed || m_state == PublicState::Errored)
151 return;
152 m_state = PublicState::Closed;
153 clearClient();
154 m_reader.clear();
155 }
156
157 BytesConsumer::PublicState ReadableStreamBytesConsumer::getPublicState() const {
158 return m_state;
159 }
160
161 BytesConsumer::Error ReadableStreamBytesConsumer::getError() const {
162 return Error("Failed to read from a ReadableStream.");
163 }
164
165 DEFINE_TRACE(ReadableStreamBytesConsumer) {
166 visitor->trace(m_client);
167 visitor->trace(m_pendingBuffer);
168 BytesConsumer::trace(visitor);
169 }
170
171 void ReadableStreamBytesConsumer::dispose() {
172 m_reader.clear();
173 }
174
175 void ReadableStreamBytesConsumer::onRead(DOMUint8Array* buffer) {
176 DCHECK(m_isReading);
177 DCHECK(buffer);
178 DCHECK(!m_pendingBuffer);
179 DCHECK(!m_pendingOffset);
180 m_isReading = false;
181 if (m_state == PublicState::Closed)
182 return;
183 DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
184 m_pendingBuffer = buffer;
185 if (m_client)
186 m_client->onStateChange();
187 }
188
189 void ReadableStreamBytesConsumer::onReadDone() {
190 DCHECK(m_isReading);
191 DCHECK(!m_pendingBuffer);
192 m_isReading = false;
193 if (m_state == PublicState::Closed)
194 return;
195 DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
196 m_state = PublicState::Closed;
197 m_reader.clear();
198 Client* client = m_client;
199 clearClient();
200 if (client)
201 client->onStateChange();
202 }
203
204 void ReadableStreamBytesConsumer::onRejected() {
205 DCHECK(m_isReading);
206 DCHECK(!m_pendingBuffer);
207 m_isReading = false;
208 if (m_state == PublicState::Closed)
209 return;
210 DCHECK_EQ(m_state, PublicState::ReadableOrWaiting);
211 m_state = PublicState::Errored;
212 m_reader.clear();
213 Client* client = m_client;
214 clearClient();
215 if (client)
216 client->onStateChange();
217 }
218
219 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698