Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(570)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp

Issue 2277143002: Use BytesConsumer in BodyStreamBuffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@bytes-consumer-tee
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/BodyStreamBuffer.h" 5 #include "modules/fetch/BodyStreamBuffer.h"
6 6
7 #include "bindings/core/v8/ScriptState.h" 7 #include "bindings/core/v8/ScriptState.h"
8 #include "bindings/core/v8/V8HiddenValue.h" 8 #include "bindings/core/v8/V8HiddenValue.h"
9 #include "core/dom/DOMArrayBuffer.h" 9 #include "core/dom/DOMArrayBuffer.h"
10 #include "core/dom/DOMTypedArray.h" 10 #include "core/dom/DOMTypedArray.h"
11 #include "core/dom/ExceptionCode.h" 11 #include "core/dom/ExceptionCode.h"
12 #include "core/streams/ReadableStreamController.h" 12 #include "core/streams/ReadableStreamController.h"
13 #include "core/streams/ReadableStreamOperations.h" 13 #include "core/streams/ReadableStreamOperations.h"
14 #include "modules/fetch/Body.h" 14 #include "modules/fetch/Body.h"
15 #include "modules/fetch/BytesConsumerForDataConsumerHandle.h" 15 #include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
16 #include "modules/fetch/DataConsumerHandleUtil.h" 16 #include "modules/fetch/DataConsumerHandleUtil.h"
17 #include "modules/fetch/DataConsumerTee.h"
18 #include "modules/fetch/ReadableStreamDataConsumerHandle.h" 17 #include "modules/fetch/ReadableStreamDataConsumerHandle.h"
19 #include "platform/blob/BlobData.h" 18 #include "platform/blob/BlobData.h"
20 #include "platform/network/EncodedFormData.h" 19 #include "platform/network/EncodedFormData.h"
21 #include <memory> 20 #include <memory>
22 21
23 namespace blink { 22 namespace blink {
24 23
25 class BodyStreamBuffer::LoaderClient final : public GarbageCollectedFinalized<Lo aderClient>, public ActiveDOMObject, public FetchDataLoader::Client { 24 class BodyStreamBuffer::LoaderClient final : public GarbageCollectedFinalized<Lo aderClient>, public ActiveDOMObject, public FetchDataLoader::Client {
26 WTF_MAKE_NONCOPYABLE(LoaderClient); 25 WTF_MAKE_NONCOPYABLE(LoaderClient);
27 USING_GARBAGE_COLLECTED_MIXIN(LoaderClient); 26 USING_GARBAGE_COLLECTED_MIXIN(LoaderClient);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 void stop() override 75 void stop() override
77 { 76 {
78 m_buffer->stopLoading(); 77 m_buffer->stopLoading();
79 } 78 }
80 79
81 Member<BodyStreamBuffer> m_buffer; 80 Member<BodyStreamBuffer> m_buffer;
82 Member<FetchDataLoader::Client> m_client; 81 Member<FetchDataLoader::Client> m_client;
83 }; 82 };
84 83
85 BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet chDataConsumerHandle> handle) 84 BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet chDataConsumerHandle> handle)
85 : BodyStreamBuffer(scriptState, new BytesConsumerForDataConsumerHandle(scrip tState->getExecutionContext(), std::move(handle)))
86 {
87 }
88
89 BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, BytesConsumer* cons umer)
86 : UnderlyingSourceBase(scriptState) 90 : UnderlyingSourceBase(scriptState)
87 , m_scriptState(scriptState) 91 , m_scriptState(scriptState)
88 , m_handle(std::move(handle)) 92 , m_consumer(consumer)
89 , m_reader(m_handle->obtainFetchDataReader(this))
90 , m_madeFromReadableStream(false) 93 , m_madeFromReadableStream(false)
91 { 94 {
92 v8::Local<v8::Value> bodyValue = toV8(this, scriptState); 95 v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
93 DCHECK(!bodyValue.IsEmpty()); 96 DCHECK(!bodyValue.IsEmpty());
94 DCHECK(bodyValue->IsObject()); 97 DCHECK(bodyValue->IsObject());
95 v8::Local<v8::Object> body = bodyValue.As<v8::Object>(); 98 v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
96 99
97 ScriptValue readableStream = ReadableStreamOperations::createReadableStream( 100 ScriptValue readableStream = ReadableStreamOperations::createReadableStream(
98 scriptState, this, ReadableStreamOperations::createCountQueuingStrategy( scriptState, 0)); 101 scriptState, this, ReadableStreamOperations::createCountQueuingStrategy( scriptState, 0));
99 DCHECK(!readableStream.isEmpty()); 102 DCHECK(!readableStream.isEmpty());
100 V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBody Stream(scriptState->isolate()), readableStream.v8Value()); 103 V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBody Stream(scriptState->isolate()), readableStream.v8Value());
104 m_consumer->setClient(this);
101 } 105 }
102 106
103 BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream) 107 BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream)
104 : UnderlyingSourceBase(scriptState) 108 : UnderlyingSourceBase(scriptState)
105 , m_scriptState(scriptState) 109 , m_scriptState(scriptState)
106 , m_madeFromReadableStream(true) 110 , m_madeFromReadableStream(true)
107 { 111 {
108 DCHECK(ReadableStreamOperations::isReadableStream(scriptState, stream)); 112 DCHECK(ReadableStreamOperations::isReadableStream(scriptState, stream));
109 v8::Local<v8::Value> bodyValue = toV8(this, scriptState); 113 v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
110 DCHECK(!bodyValue.IsEmpty()); 114 DCHECK(!bodyValue.IsEmpty());
111 DCHECK(bodyValue->IsObject()); 115 DCHECK(bodyValue->IsObject());
112 v8::Local<v8::Object> body = bodyValue.As<v8::Object>(); 116 v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
113 117
114 V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBody Stream(scriptState->isolate()), stream.v8Value()); 118 V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBody Stream(scriptState->isolate()), stream.v8Value());
115 } 119 }
116 120
117 ScriptValue BodyStreamBuffer::stream() 121 ScriptValue BodyStreamBuffer::stream()
118 { 122 {
119 ScriptState::Scope scope(m_scriptState.get()); 123 ScriptState::Scope scope(m_scriptState.get());
120 v8::Local<v8::Value> bodyValue = toV8(this, m_scriptState.get()); 124 v8::Local<v8::Value> bodyValue = toV8(this, m_scriptState.get());
121 DCHECK(!bodyValue.IsEmpty()); 125 DCHECK(!bodyValue.IsEmpty());
122 DCHECK(bodyValue->IsObject()); 126 DCHECK(bodyValue->IsObject());
123 v8::Local<v8::Object> body = bodyValue.As<v8::Object>(); 127 v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
124 return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scri ptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate()) )); 128 return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scri ptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate()) ));
125 } 129 }
126 130
127 PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataCons umerHandle::Reader::BlobSizePolicy policy) 131 PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(BytesConsumer ::BlobSizePolicy policy)
128 { 132 {
129 ASSERT(!isStreamLocked()); 133 ASSERT(!isStreamLocked());
130 ASSERT(!isStreamDisturbed()); 134 ASSERT(!isStreamDisturbed());
131 if (isStreamClosed() || isStreamErrored()) 135 if (isStreamClosed() || isStreamErrored())
132 return nullptr; 136 return nullptr;
133 137
134 if (m_madeFromReadableStream) 138 if (m_madeFromReadableStream)
135 return nullptr; 139 return nullptr;
136 140
137 RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(poli cy); 141 RefPtr<BlobDataHandle> blobDataHandle = m_consumer->drainAsBlobDataHandle(po licy);
138 if (blobDataHandle) { 142 if (blobDataHandle) {
139 closeAndLockAndDisturb(); 143 closeAndLockAndDisturb();
140 return blobDataHandle.release(); 144 return blobDataHandle.release();
141 } 145 }
142 return nullptr; 146 return nullptr;
143 } 147 }
144 148
145 PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData() 149 PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData()
146 { 150 {
147 ASSERT(!isStreamLocked()); 151 ASSERT(!isStreamLocked());
148 ASSERT(!isStreamDisturbed()); 152 ASSERT(!isStreamDisturbed());
149 if (isStreamClosed() || isStreamErrored()) 153 if (isStreamClosed() || isStreamErrored())
150 return nullptr; 154 return nullptr;
151 155
152 if (m_madeFromReadableStream) 156 if (m_madeFromReadableStream)
153 return nullptr; 157 return nullptr;
154 158
155 RefPtr<EncodedFormData> formData = m_reader->drainAsFormData(); 159 RefPtr<EncodedFormData> formData = m_consumer->drainAsFormData();
156 if (formData) { 160 if (formData) {
157 closeAndLockAndDisturb(); 161 closeAndLockAndDisturb();
158 return formData.release(); 162 return formData.release();
159 } 163 }
160 return nullptr; 164 return nullptr;
161 } 165 }
162 166
163 void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl ient* client) 167 void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl ient* client)
164 { 168 {
165 ASSERT(!m_loader); 169 ASSERT(!m_loader);
166 ASSERT(m_scriptState->contextIsValid()); 170 ASSERT(m_scriptState->contextIsValid());
167 std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
168 m_loader = loader; 171 m_loader = loader;
169 loader->start(new BytesConsumerForDataConsumerHandle(getExecutionContext(), std::move(handle)), new LoaderClient(m_scriptState->getExecutionContext(), this, client)); 172 loader->start(releaseHandle(), new LoaderClient(m_scriptState->getExecutionC ontext(), this, client));
170 } 173 }
171 174
172 void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch 2) 175 void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch 2)
173 { 176 {
174 DCHECK(!isStreamLocked()); 177 DCHECK(!isStreamLocked());
175 DCHECK(!isStreamDisturbed()); 178 DCHECK(!isStreamDisturbed());
176 *branch1 = nullptr; 179 *branch1 = nullptr;
177 *branch2 = nullptr; 180 *branch2 = nullptr;
178 181
179 if (m_madeFromReadableStream) { 182 if (m_madeFromReadableStream) {
180 ScriptValue stream1, stream2; 183 ScriptValue stream1, stream2;
181 ReadableStreamOperations::tee(m_scriptState.get(), stream(), &stream1, & stream2); 184 ReadableStreamOperations::tee(m_scriptState.get(), stream(), &stream1, & stream2);
182 *branch1 = new BodyStreamBuffer(m_scriptState.get(), stream1); 185 *branch1 = new BodyStreamBuffer(m_scriptState.get(), stream1);
183 *branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2); 186 *branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2);
184 return; 187 return;
185 } 188 }
186 std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle(); 189 BytesConsumer* dest1 = nullptr;
187 std::unique_ptr<FetchDataConsumerHandle> handle1, handle2; 190 BytesConsumer* dest2 = nullptr;
188 DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(hand le), &handle1, &handle2); 191 BytesConsumer::tee(m_scriptState->getExecutionContext(), releaseHandle(), &d est1, &dest2);
189 *branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1)); 192 *branch1 = new BodyStreamBuffer(m_scriptState.get(), dest1);
190 *branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2)); 193 *branch2 = new BodyStreamBuffer(m_scriptState.get(), dest2);
191 } 194 }
192 195
193 ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState) 196 ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState)
194 { 197 {
195 ASSERT(scriptState == m_scriptState.get()); 198 ASSERT(scriptState == m_scriptState.get());
196 if (m_streamNeedsMore) 199 if (m_streamNeedsMore)
197 return ScriptPromise::castUndefined(scriptState); 200 return ScriptPromise::castUndefined(scriptState);
198 m_streamNeedsMore = true; 201 m_streamNeedsMore = true;
199 processData(); 202 processData();
200 return ScriptPromise::castUndefined(scriptState); 203 return ScriptPromise::castUndefined(scriptState);
201 } 204 }
202 205
203 ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea son) 206 ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea son)
204 { 207 {
205 ASSERT(scriptState == m_scriptState.get()); 208 ASSERT(scriptState == m_scriptState.get());
206 close(); 209 close();
207 return ScriptPromise::castUndefined(scriptState); 210 return ScriptPromise::castUndefined(scriptState);
208 } 211 }
209 212
210 void BodyStreamBuffer::didGetReadable() 213 void BodyStreamBuffer::onStateChange()
211 { 214 {
212 if (!m_reader || !getExecutionContext() || getExecutionContext()->activeDOMO bjectsAreStopped()) 215 if (!m_consumer || !getExecutionContext() || getExecutionContext()->activeDO MObjectsAreStopped())
213 return; 216 return;
214 217
215 if (!m_streamNeedsMore) { 218 switch (m_consumer->getPublicState()) {
216 // Perform zero-length read to call close()/error() early. 219 case BytesConsumer::PublicState::ReadableOrWaiting:
217 size_t readSize; 220 break;
218 WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDat aConsumerHandle::FlagNone, &readSize); 221 case BytesConsumer::PublicState::Closed:
219 switch (result) { 222 close();
220 case WebDataConsumerHandle::Ok: 223 return;
221 case WebDataConsumerHandle::ShouldWait: 224 case BytesConsumer::PublicState::Errored:
222 return; 225 error();
223 case WebDataConsumerHandle::Done:
224 close();
225 return;
226 case WebDataConsumerHandle::Busy:
227 case WebDataConsumerHandle::ResourceExhausted:
228 case WebDataConsumerHandle::UnexpectedError:
229 error();
230 return;
231 }
232 return; 226 return;
233 } 227 }
234 processData(); 228 processData();
235 } 229 }
236 230
237 bool BodyStreamBuffer::hasPendingActivity() const 231 bool BodyStreamBuffer::hasPendingActivity() const
238 { 232 {
239 if (m_loader) 233 if (m_loader)
240 return true; 234 return true;
241 return UnderlyingSourceBase::hasPendingActivity(); 235 return UnderlyingSourceBase::hasPendingActivity();
242 } 236 }
243 237
244 void BodyStreamBuffer::stop() 238 void BodyStreamBuffer::stop()
245 { 239 {
246 m_reader = nullptr; 240 if (m_consumer)
hiroshige 2016/09/08 05:36:38 optional: splitting these three lines into a separ
yhirano 2016/09/08 05:50:39 Done.
247 m_handle = nullptr; 241 m_consumer->cancel();
242 m_consumer = nullptr;
248 UnderlyingSourceBase::stop(); 243 UnderlyingSourceBase::stop();
249 } 244 }
250 245
251 bool BodyStreamBuffer::isStreamReadable() 246 bool BodyStreamBuffer::isStreamReadable()
252 { 247 {
253 ScriptState::Scope scope(m_scriptState.get()); 248 ScriptState::Scope scope(m_scriptState.get());
254 return ReadableStreamOperations::isReadable(m_scriptState.get(), stream()); 249 return ReadableStreamOperations::isReadable(m_scriptState.get(), stream());
255 } 250 }
256 251
257 bool BodyStreamBuffer::isStreamClosed() 252 bool BodyStreamBuffer::isStreamClosed()
(...skipping 30 matching lines...) Expand all
288 283
289 ScriptState::Scope scope(m_scriptState.get()); 284 ScriptState::Scope scope(m_scriptState.get());
290 NonThrowableExceptionState exceptionState; 285 NonThrowableExceptionState exceptionState;
291 ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get() , stream(), exceptionState); 286 ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get() , stream(), exceptionState);
292 ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader); 287 ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader);
293 } 288 }
294 289
295 void BodyStreamBuffer::close() 290 void BodyStreamBuffer::close()
296 { 291 {
297 controller()->close(); 292 controller()->close();
298 m_reader = nullptr; 293 if (m_consumer) {
299 m_handle = nullptr; 294 m_consumer->cancel();
295 m_consumer = nullptr;
296 }
300 } 297 }
301 298
302 void BodyStreamBuffer::error() 299 void BodyStreamBuffer::error()
303 { 300 {
304 controller()->error(DOMException::create(NetworkError, "network error")); 301 controller()->error(DOMException::create(NetworkError, "network error"));
305 m_reader = nullptr; 302 if (m_consumer) {
306 m_handle = nullptr; 303 m_consumer->cancel();
304 m_consumer = nullptr;
305 }
307 } 306 }
308 307
309 void BodyStreamBuffer::processData() 308 void BodyStreamBuffer::processData()
310 { 309 {
311 ASSERT(m_reader); 310 DCHECK(m_consumer);
312 while (m_streamNeedsMore) { 311 while (m_streamNeedsMore) {
313 const void* buffer; 312 const char* buffer = nullptr;
314 size_t available; 313 size_t available = 0;
315 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebD ataConsumerHandle::FlagNone, &available); 314
316 switch (result) { 315 switch (m_consumer->beginRead(&buffer, &available)) {
317 case WebDataConsumerHandle::Ok: { 316 case BytesConsumer::Result::Ok: {
318 DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsig ned char*>(buffer), available); 317 DOMUint8Array* array = DOMUint8Array::create(reinterpret_cast<const unsigned char*>(buffer), available);
319 m_reader->endRead(available); 318 if (m_consumer->endRead(available) != BytesConsumer::Result::Ok) {
319 error();
320 return;
321 }
320 // Clear m_streamNeedsMore in order to detect a pull call. 322 // Clear m_streamNeedsMore in order to detect a pull call.
321 m_streamNeedsMore = false; 323 m_streamNeedsMore = false;
322 controller()->enqueue(array); 324 controller()->enqueue(array);
323 // If m_streamNeedsMore is true, it means that pull is called and 325 // If m_streamNeedsMore is true, it means that pull is called and
324 // the stream needs more data even if the desired size is not 326 // the stream needs more data even if the desired size is not
325 // positive. 327 // positive.
326 if (!m_streamNeedsMore) 328 if (!m_streamNeedsMore)
327 m_streamNeedsMore = controller()->desiredSize() > 0; 329 m_streamNeedsMore = controller()->desiredSize() > 0;
328 break; 330 break;
329 } 331 }
330 case WebDataConsumerHandle::Done: 332 case BytesConsumer::Result::ShouldWait:
333 return;
334 case BytesConsumer::Result::Done:
331 close(); 335 close();
332 return; 336 return;
333 337 case BytesConsumer::Result::Error:
334 case WebDataConsumerHandle::ShouldWait:
335 return;
336
337 case WebDataConsumerHandle::Busy:
338 case WebDataConsumerHandle::ResourceExhausted:
339 case WebDataConsumerHandle::UnexpectedError:
340 error(); 338 error();
341 return; 339 return;
342 } 340 }
343 } 341 }
344 } 342 }
345 343
346 void BodyStreamBuffer::endLoading() 344 void BodyStreamBuffer::endLoading()
347 { 345 {
348 ASSERT(m_loader); 346 ASSERT(m_loader);
349 m_loader = nullptr; 347 m_loader = nullptr;
350 } 348 }
351 349
352 void BodyStreamBuffer::stopLoading() 350 void BodyStreamBuffer::stopLoading()
353 { 351 {
354 if (!m_loader) 352 if (!m_loader)
355 return; 353 return;
356 m_loader->cancel(); 354 m_loader->cancel();
357 m_loader = nullptr; 355 m_loader = nullptr;
358 } 356 }
359 357
360 std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() 358 BytesConsumer* BodyStreamBuffer::releaseHandle()
361 { 359 {
362 DCHECK(!isStreamLocked()); 360 DCHECK(!isStreamLocked());
363 DCHECK(!isStreamDisturbed()); 361 DCHECK(!isStreamDisturbed());
364 362
365 if (m_madeFromReadableStream) { 363 if (m_madeFromReadableStream) {
366 ScriptState::Scope scope(m_scriptState.get()); 364 ScriptState::Scope scope(m_scriptState.get());
367 // We need to have |reader| alive by some means (as written in 365 // We need to have |reader| alive by some means (as written in
368 // ReadableStreamDataConsumerHandle). Based on the following facts 366 // ReadableStreamDataConsumerHandle). Based on the following facts
369 // - This function is used only from tee and startLoading. 367 // - This function is used only from tee and startLoading.
370 // - This branch cannot be taken when called from tee. 368 // - This branch cannot be taken when called from tee.
371 // - startLoading makes hasPendingActivity return true while loading. 369 // - startLoading makes hasPendingActivity return true while loading.
372 // , we don't need to keep the reader explicitly. 370 // , we don't need to keep the reader explicitly.
373 NonThrowableExceptionState exceptionState; 371 NonThrowableExceptionState exceptionState;
374 ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.g et(), stream(), exceptionState); 372 ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.g et(), stream(), exceptionState);
375 return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), rea der); 373 return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutio nContext(), ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader ));
376 } 374 }
377 // We need to call these before calling closeAndLockAndDisturb. 375 // We need to call these before calling closeAndLockAndDisturb.
378 const bool isClosed = isStreamClosed(); 376 const bool isClosed = isStreamClosed();
379 const bool isErrored = isStreamErrored(); 377 const bool isErrored = isStreamErrored();
380 std::unique_ptr<FetchDataConsumerHandle> handle = std::move(m_handle); 378 BytesConsumer* consumer = m_consumer.release();
381 379
382 closeAndLockAndDisturb(); 380 closeAndLockAndDisturb();
383 381
384 if (isClosed) { 382 if (isClosed) {
385 // Note that the stream cannot be "draining", because it doesn't have 383 // Note that the stream cannot be "draining", because it doesn't have
386 // the internal buffer. 384 // the internal buffer.
387 return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumer Handle()); 385 return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutio nContext(), createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHan dle()));
388 } 386 }
389 if (isErrored) 387 if (isErrored)
390 return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorD ataConsumerHandle()); 388 return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutio nContext(), createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorData ConsumerHandle()));
391 389
392 DCHECK(handle); 390 DCHECK(consumer);
393 return handle; 391 consumer->clearClient();
392 return consumer;
394 } 393 }
395 394
396 } // namespace blink 395 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698