Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/ReadableStreamDataConsumerHandle.cpp

Issue 2365853002: Implement ReadableStreamBytesConsumer (Closed)
Patch Set: fix Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "modules/fetch/ReadableStreamDataConsumerHandle.h"
6
7 #include "bindings/core/v8/ExceptionState.h"
8 #include "bindings/core/v8/ScopedPersistent.h"
9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptState.h"
11 #include "bindings/core/v8/ScriptValue.h"
12 #include "bindings/core/v8/V8BindingMacros.h"
13 #include "bindings/core/v8/V8IteratorResultValue.h"
14 #include "bindings/core/v8/V8Uint8Array.h"
15 #include "core/dom/DOMTypedArray.h"
16 #include "core/streams/ReadableStreamOperations.h"
17 #include "public/platform/Platform.h"
18 #include "public/platform/WebTaskRunner.h"
19 #include "public/platform/WebThread.h"
20 #include "public/platform/WebTraceLocation.h"
21 #include "wtf/Assertions.h"
22 #include "wtf/Functional.h"
23 #include "wtf/RefCounted.h"
24 #include <algorithm>
25 #include <string.h>
26 #include <v8.h>
27
28 namespace blink {
29
30 using Result = WebDataConsumerHandle::Result;
31 using Flags = WebDataConsumerHandle::Flags;
32
33 // This context is not yet thread-safe.
34 class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted <ReadingContext> {
35 WTF_MAKE_NONCOPYABLE(ReadingContext);
36 public:
37 class OnFulfilled final : public ScriptFunction {
38 public:
39 static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<ReadingContext> context)
40 {
41 return (new OnFulfilled(scriptState, std::move(context)))->bindToV8F unction();
42 }
43
44 ScriptValue call(ScriptValue v) override
45 {
46 RefPtr<ReadingContext> readingContext(m_readingContext);
47 if (!readingContext)
48 return v;
49 bool done;
50 v8::Local<v8::Value> item = v.v8Value();
51 ASSERT(item->IsObject());
52 v8::Local<v8::Value> value = v8UnpackIteratorResult(v.getScriptState (), item.As<v8::Object>(), &done).ToLocalChecked();
53 if (done) {
54 readingContext->onReadDone();
55 return v;
56 }
57 if (!value->IsUint8Array()) {
58 readingContext->onRejected();
59 return ScriptValue();
60 }
61 readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>())) ;
62 return v;
63 }
64
65 private:
66 OnFulfilled(ScriptState* scriptState, PassRefPtr<ReadingContext> context )
67 : ScriptFunction(scriptState), m_readingContext(context) {}
68
69 RefPtr<ReadingContext> m_readingContext;
70 };
71
72 class OnRejected final : public ScriptFunction {
73 public:
74 static v8::Local<v8::Function> createFunction(ScriptState* scriptState, PassRefPtr<ReadingContext> context)
75 {
76 return (new OnRejected(scriptState, std::move(context)))->bindToV8Fu nction();
77 }
78
79 ScriptValue call(ScriptValue v) override
80 {
81 RefPtr<ReadingContext> readingContext(m_readingContext);
82 if (!readingContext)
83 return v;
84 readingContext->onRejected();
85 return v;
86 }
87
88 private:
89 OnRejected(ScriptState* scriptState, PassRefPtr<ReadingContext> context)
90 : ScriptFunction(scriptState), m_readingContext(context) {}
91
92 RefPtr<ReadingContext> m_readingContext;
93 };
94
95 class ReaderImpl final : public FetchDataConsumerHandle::Reader {
96 public:
97 ReaderImpl(PassRefPtr<ReadingContext> context, Client* client)
98 : m_readingContext(context)
99 {
100 m_readingContext->attachReader(client);
101 }
102 ~ReaderImpl() override
103 {
104 m_readingContext->detachReader();
105 }
106
107 Result beginRead(const void** buffer, Flags, size_t* available) override
108 {
109 return m_readingContext->beginRead(buffer, available);
110 }
111
112 Result endRead(size_t readSize) override
113 {
114 return m_readingContext->endRead(readSize);
115 }
116
117 private:
118 RefPtr<ReadingContext> m_readingContext;
119 };
120
121 static PassRefPtr<ReadingContext> create(ScriptState* scriptState, ScriptVal ue streamReader)
122 {
123 return adoptRef(new ReadingContext(scriptState, streamReader));
124 }
125
126 void attachReader(WebDataConsumerHandle::Client* client)
127 {
128 m_client = client;
129 notifyLater();
130 }
131
132 void detachReader()
133 {
134 m_client = nullptr;
135 }
136
137 Result beginRead(const void** buffer, size_t* available)
138 {
139 *buffer = nullptr;
140 *available = 0;
141 if (m_hasError)
142 return WebDataConsumerHandle::UnexpectedError;
143 if (m_isDone)
144 return WebDataConsumerHandle::Done;
145
146 if (m_pendingBuffer) {
147 ASSERT(m_pendingOffset < m_pendingBuffer->length());
148 *buffer = m_pendingBuffer->data() + m_pendingOffset;
149 *available = m_pendingBuffer->length() - m_pendingOffset;
150 return WebDataConsumerHandle::Ok;
151 }
152 if (!m_isReading) {
153 m_isReading = true;
154 ScriptState::Scope scope(m_scriptState.get());
155 ScriptValue reader(m_scriptState.get(), m_reader.newLocal(m_scriptSt ate->isolate()));
156 if (reader.isEmpty()) {
157 // The reader was collected.
158 m_hasError = true;
159 m_isReading = false;
160 return WebDataConsumerHandle::UnexpectedError;
161 }
162 ReadableStreamOperations::defaultReaderRead(
163 m_scriptState.get(), reader).then(
164 OnFulfilled::createFunction(m_scriptState.get(), this),
165 OnRejected::createFunction(m_scriptState.get(), this));
166 }
167 return WebDataConsumerHandle::ShouldWait;
168 }
169
170 Result endRead(size_t readSize)
171 {
172 ASSERT(m_pendingBuffer);
173 ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length());
174 m_pendingOffset += readSize;
175 if (m_pendingOffset == m_pendingBuffer->length()) {
176 m_pendingBuffer = nullptr;
177 m_pendingOffset = 0;
178 }
179 return WebDataConsumerHandle::Ok;
180 }
181
182 void onRead(DOMUint8Array* buffer)
183 {
184 ASSERT(m_isReading);
185 ASSERT(buffer);
186 ASSERT(!m_pendingBuffer);
187 ASSERT(!m_pendingOffset);
188 m_isReading = false;
189 if (buffer->length() > 0)
190 m_pendingBuffer = buffer;
191 notify();
192 }
193
194 void onReadDone()
195 {
196 ASSERT(m_isReading);
197 ASSERT(!m_pendingBuffer);
198 m_isReading = false;
199 m_isDone = true;
200 m_reader.clear();
201 notify();
202 }
203
204 void onRejected()
205 {
206 ASSERT(m_isReading);
207 ASSERT(!m_pendingBuffer);
208 m_hasError = true;
209 m_isReading = false;
210 m_reader.clear();
211 notify();
212 }
213
214 void notify()
215 {
216 if (!m_client)
217 return;
218 m_client->didGetReadable();
219 }
220
221 void notifyLater()
222 {
223 ASSERT(m_client);
224 Platform::current()->currentThread()->getWebTaskRunner()->postTask(BLINK _FROM_HERE, WTF::bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this)) );
225 }
226
227 private:
228 ReadingContext(ScriptState* scriptState, ScriptValue streamReader)
229 : m_reader(scriptState->isolate(), streamReader.v8Value())
230 , m_scriptState(scriptState)
231 , m_client(nullptr)
232 , m_pendingOffset(0)
233 , m_isReading(false)
234 , m_isDone(false)
235 , m_hasError(false)
236 {
237 m_reader.setWeak(this, &ReadingContext::onCollected);
238 }
239
240 void onCollected()
241 {
242 m_reader.clear();
243 if (m_isDone || m_hasError)
244 return;
245 m_hasError = true;
246 if (m_client)
247 notifyLater();
248 }
249
250 static void onCollected(const v8::WeakCallbackInfo<ReadableStreamDataConsume rHandle::ReadingContext>& data)
251 {
252 data.GetParameter()->onCollected();
253 }
254
255 // |m_reader| is a weak persistent. It should be kept alive by someone
256 // outside of ReadableStreamDataConsumerHandle.
257 // Holding a ScopedPersistent here is safe in terms of cross-world wrapper
258 // leakage because we read only Uint8Array chunks from the reader.
259 ScopedPersistent<v8::Value> m_reader;
260 RefPtr<ScriptState> m_scriptState;
261 WebDataConsumerHandle::Client* m_client;
262 Persistent<DOMUint8Array> m_pendingBuffer;
263 size_t m_pendingOffset;
264 bool m_isReading;
265 bool m_isDone;
266 bool m_hasError;
267 };
268
269 ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, ScriptValue streamReader)
270 : m_readingContext(ReadingContext::create(scriptState, streamReader))
271 {
272 }
273 ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default;
274
275 std::unique_ptr<FetchDataConsumerHandle::Reader> ReadableStreamDataConsumerHandl e::obtainFetchDataReader(Client* client)
276 {
277 return WTF::wrapUnique(new ReadingContext::ReaderImpl(m_readingContext, clie nt));
278 }
279
280 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698