Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp

Issue 2365853002: Implement ReadableStreamBytesConsumer (Closed)
Patch Set: git cl format Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "modules/fetch/ReadableStreamDataConsumerHandle.h"
6
7 #include "bindings/core/v8/ExceptionState.h"
8 #include "bindings/core/v8/ScopedPersistent.h"
9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptState.h"
11 #include "bindings/core/v8/ScriptValue.h"
12 #include "bindings/core/v8/V8BindingMacros.h"
13 #include "bindings/core/v8/V8IteratorResultValue.h"
14 #include "bindings/core/v8/V8Uint8Array.h"
15 #include "core/dom/DOMTypedArray.h"
16 #include "core/streams/ReadableStreamOperations.h"
17 #include "public/platform/Platform.h"
18 #include "public/platform/WebTaskRunner.h"
19 #include "public/platform/WebThread.h"
20 #include "public/platform/WebTraceLocation.h"
21 #include "wtf/Assertions.h"
22 #include "wtf/Functional.h"
23 #include "wtf/RefCounted.h"
24 #include <algorithm>
25 #include <string.h>
26 #include <v8.h>
27
28 namespace blink {
29
30 using Result = WebDataConsumerHandle::Result;
31 using Flags = WebDataConsumerHandle::Flags;
32
33 // This context is not yet thread-safe.
34 class ReadableStreamDataConsumerHandle::ReadingContext final
35 : public RefCounted<ReadingContext> {
36 WTF_MAKE_NONCOPYABLE(ReadingContext);
37
38 public:
39 class OnFulfilled final : public ScriptFunction {
40 public:
41 static v8::Local<v8::Function> createFunction(
42 ScriptState* scriptState,
43 PassRefPtr<ReadingContext> context) {
44 return (new OnFulfilled(scriptState, std::move(context)))
45 ->bindToV8Function();
46 }
47
48 ScriptValue call(ScriptValue v) override {
49 RefPtr<ReadingContext> readingContext(m_readingContext);
50 if (!readingContext)
51 return v;
52 bool done;
53 v8::Local<v8::Value> item = v.v8Value();
54 ASSERT(item->IsObject());
55 v8::Local<v8::Value> value =
56 v8UnpackIteratorResult(v.getScriptState(), item.As<v8::Object>(),
57 &done)
58 .ToLocalChecked();
59 if (done) {
60 readingContext->onReadDone();
61 return v;
62 }
63 if (!value->IsUint8Array()) {
64 readingContext->onRejected();
65 return ScriptValue();
66 }
67 readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
68 return v;
69 }
70
71 private:
72 OnFulfilled(ScriptState* scriptState, PassRefPtr<ReadingContext> context)
73 : ScriptFunction(scriptState), m_readingContext(context) {}
74
75 RefPtr<ReadingContext> m_readingContext;
76 };
77
78 class OnRejected final : public ScriptFunction {
79 public:
80 static v8::Local<v8::Function> createFunction(
81 ScriptState* scriptState,
82 PassRefPtr<ReadingContext> context) {
83 return (new OnRejected(scriptState, std::move(context)))
84 ->bindToV8Function();
85 }
86
87 ScriptValue call(ScriptValue v) override {
88 RefPtr<ReadingContext> readingContext(m_readingContext);
89 if (!readingContext)
90 return v;
91 readingContext->onRejected();
92 return v;
93 }
94
95 private:
96 OnRejected(ScriptState* scriptState, PassRefPtr<ReadingContext> context)
97 : ScriptFunction(scriptState), m_readingContext(context) {}
98
99 RefPtr<ReadingContext> m_readingContext;
100 };
101
102 class ReaderImpl final : public FetchDataConsumerHandle::Reader {
103 public:
104 ReaderImpl(PassRefPtr<ReadingContext> context, Client* client)
105 : m_readingContext(context) {
106 m_readingContext->attachReader(client);
107 }
108 ~ReaderImpl() override { m_readingContext->detachReader(); }
109
110 Result beginRead(const void** buffer, Flags, size_t* available) override {
111 return m_readingContext->beginRead(buffer, available);
112 }
113
114 Result endRead(size_t readSize) override {
115 return m_readingContext->endRead(readSize);
116 }
117
118 private:
119 RefPtr<ReadingContext> m_readingContext;
120 };
121
122 static PassRefPtr<ReadingContext> create(ScriptState* scriptState,
123 ScriptValue streamReader) {
124 return adoptRef(new ReadingContext(scriptState, streamReader));
125 }
126
127 void attachReader(WebDataConsumerHandle::Client* client) {
128 m_client = client;
129 notifyLater();
130 }
131
132 void detachReader() { m_client = nullptr; }
133
134 Result beginRead(const void** buffer, size_t* available) {
135 *buffer = nullptr;
136 *available = 0;
137 if (m_hasError)
138 return WebDataConsumerHandle::UnexpectedError;
139 if (m_isDone)
140 return WebDataConsumerHandle::Done;
141
142 if (m_pendingBuffer) {
143 ASSERT(m_pendingOffset < m_pendingBuffer->length());
144 *buffer = m_pendingBuffer->data() + m_pendingOffset;
145 *available = m_pendingBuffer->length() - m_pendingOffset;
146 return WebDataConsumerHandle::Ok;
147 }
148 if (!m_isReading) {
149 m_isReading = true;
150 ScriptState::Scope scope(m_scriptState.get());
151 ScriptValue reader(m_scriptState.get(),
152 m_reader.newLocal(m_scriptState->isolate()));
153 if (reader.isEmpty()) {
154 // The reader was collected.
155 m_hasError = true;
156 m_isReading = false;
157 return WebDataConsumerHandle::UnexpectedError;
158 }
159 ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader)
160 .then(OnFulfilled::createFunction(m_scriptState.get(), this),
161 OnRejected::createFunction(m_scriptState.get(), this));
162 }
163 return WebDataConsumerHandle::ShouldWait;
164 }
165
166 Result endRead(size_t readSize) {
167 ASSERT(m_pendingBuffer);
168 ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length());
169 m_pendingOffset += readSize;
170 if (m_pendingOffset == m_pendingBuffer->length()) {
171 m_pendingBuffer = nullptr;
172 m_pendingOffset = 0;
173 }
174 return WebDataConsumerHandle::Ok;
175 }
176
177 void onRead(DOMUint8Array* buffer) {
178 ASSERT(m_isReading);
179 ASSERT(buffer);
180 ASSERT(!m_pendingBuffer);
181 ASSERT(!m_pendingOffset);
182 m_isReading = false;
183 if (buffer->length() > 0)
184 m_pendingBuffer = buffer;
185 notify();
186 }
187
188 void onReadDone() {
189 ASSERT(m_isReading);
190 ASSERT(!m_pendingBuffer);
191 m_isReading = false;
192 m_isDone = true;
193 m_reader.clear();
194 notify();
195 }
196
197 void onRejected() {
198 ASSERT(m_isReading);
199 ASSERT(!m_pendingBuffer);
200 m_hasError = true;
201 m_isReading = false;
202 m_reader.clear();
203 notify();
204 }
205
206 void notify() {
207 if (!m_client)
208 return;
209 m_client->didGetReadable();
210 }
211
212 void notifyLater() {
213 ASSERT(m_client);
214 Platform::current()->currentThread()->getWebTaskRunner()->postTask(
215 BLINK_FROM_HERE,
216 WTF::bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this)));
217 }
218
219 private:
220 ReadingContext(ScriptState* scriptState, ScriptValue streamReader)
221 : m_reader(scriptState->isolate(), streamReader.v8Value()),
222 m_scriptState(scriptState),
223 m_client(nullptr),
224 m_pendingOffset(0),
225 m_isReading(false),
226 m_isDone(false),
227 m_hasError(false) {
228 m_reader.setWeak(this, &ReadingContext::onCollected);
229 }
230
231 void onCollected() {
232 m_reader.clear();
233 if (m_isDone || m_hasError)
234 return;
235 m_hasError = true;
236 if (m_client)
237 notifyLater();
238 }
239
240 static void onCollected(
241 const v8::WeakCallbackInfo<
242 ReadableStreamDataConsumerHandle::ReadingContext>& data) {
243 data.GetParameter()->onCollected();
244 }
245
246 // |m_reader| is a weak persistent. It should be kept alive by someone
247 // outside of ReadableStreamDataConsumerHandle.
248 // Holding a ScopedPersistent here is safe in terms of cross-world wrapper
249 // leakage because we read only Uint8Array chunks from the reader.
250 ScopedPersistent<v8::Value> m_reader;
251 RefPtr<ScriptState> m_scriptState;
252 WebDataConsumerHandle::Client* m_client;
253 Persistent<DOMUint8Array> m_pendingBuffer;
254 size_t m_pendingOffset;
255 bool m_isReading;
256 bool m_isDone;
257 bool m_hasError;
258 };
259
260 ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(
261 ScriptState* scriptState,
262 ScriptValue streamReader)
263 : m_readingContext(ReadingContext::create(scriptState, streamReader)) {}
264 ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default;
265
266 std::unique_ptr<FetchDataConsumerHandle::Reader>
267 ReadableStreamDataConsumerHandle::obtainFetchDataReader(Client* client) {
268 return WTF::wrapUnique(
269 new ReadingContext::ReaderImpl(m_readingContext, client));
270 }
271
272 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698