Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(263)

Side by Side Diff: Source/modules/fetch/Body.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/Body.h ('k') | Source/modules/fetch/BodyStreamBuffer.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/Body.h" 6 #include "modules/fetch/Body.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromiseResolver.h" 9 #include "bindings/core/v8/ScriptPromiseResolver.h"
10 #include "bindings/core/v8/ScriptState.h" 10 #include "bindings/core/v8/ScriptState.h"
11 #include "bindings/core/v8/V8ArrayBuffer.h" 11 #include "bindings/core/v8/V8ArrayBuffer.h"
12 #include "bindings/core/v8/V8ThrowException.h" 12 #include "bindings/core/v8/V8ThrowException.h"
13 #include "core/dom/DOMArrayBuffer.h" 13 #include "core/dom/DOMArrayBuffer.h"
14 #include "core/dom/DOMTypedArray.h" 14 #include "core/dom/DOMTypedArray.h"
15 #include "core/dom/ExceptionCode.h"
15 #include "core/fileapi/Blob.h" 16 #include "core/fileapi/Blob.h"
16 #include "core/fileapi/FileReaderLoader.h"
17 #include "core/fileapi/FileReaderLoaderClient.h"
18 #include "core/frame/UseCounter.h" 17 #include "core/frame/UseCounter.h"
19 #include "core/streams/ReadableByteStream.h" 18 #include "core/streams/ReadableByteStream.h"
20 #include "core/streams/ReadableByteStreamReader.h" 19 #include "core/streams/ReadableByteStreamReader.h"
21 #include "core/streams/UnderlyingSource.h" 20 #include "core/streams/UnderlyingSource.h"
22 #include "modules/fetch/BodyStreamBuffer.h" 21 #include "modules/fetch/BodyStreamBuffer.h"
23 #include "modules/fetch/DataConsumerHandleUtil.h" 22 #include "modules/fetch/DataConsumerHandleUtil.h"
24 #include "modules/fetch/FetchBlobDataConsumerHandle.h" 23 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
25 24
26 namespace blink { 25 namespace blink {
27 26
28 class Body::BlobHandleReceiver final : public BodyStreamBuffer::BlobHandleCreato rClient { 27 class Body::ReadableStreamSource : public GarbageCollectedFinalized<Body::Readab leStreamSource>, public UnderlyingSource, public WebDataConsumerHandle::Client, public BodyStreamBuffer::DrainingStreamNotificationClient {
29 public:
30 explicit BlobHandleReceiver(Body* body)
31 : m_body(body)
32 {
33 }
34 void didCreateBlobHandle(PassRefPtr<BlobDataHandle> handle) override
35 {
36 ASSERT(m_body);
37 m_body->readAsyncFromBlob(handle);
38 m_body = nullptr;
39 }
40 void didFail(DOMException* exception) override
41 {
42 ASSERT(m_body);
43 m_body->didBlobHandleReceiveError(exception);
44 m_body = nullptr;
45 }
46 DEFINE_INLINE_VIRTUAL_TRACE()
47 {
48 BodyStreamBuffer::BlobHandleCreatorClient::trace(visitor);
49 visitor->trace(m_body);
50 }
51 private:
52 Member<Body> m_body;
53 };
54
55 // This class is an ActiveDOMObject subclass only for holding the
56 // ExecutionContext used in |pullSource|.
57 class Body::ReadableStreamSource : public BodyStreamBuffer::Observer, public Und erlyingSource, public FileReaderLoaderClient, public ActiveDOMObject {
58 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); 28 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
59 public: 29 public:
60 enum State { 30 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer)
61 Initial, 31 : m_bodyStreamBuffer(buffer)
62 Streaming, 32 , m_streamNeedsMore(false)
63 Closed, 33 #if ENABLE(ASSERT)
64 Errored, 34 , m_drained(false)
65 }; 35 , m_isCloseCalled(false)
66 ReadableStreamSource(ExecutionContext* executionContext, PassRefPtr<BlobData Handle> handle) 36 , m_isErrorCalled(false)
67 : ActiveDOMObject(executionContext) 37 #endif
68 , m_blobDataHandle(handle ? handle : BlobDataHandle::create(BlobData::cr eate(), 0))
69 , m_state(Initial)
70 { 38 {
71 suspendIfNeeded(); 39 if (m_bodyStreamBuffer)
72 } 40 obtainReader();
73
74 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer)
75 : ActiveDOMObject(executionContext)
76 , m_bodyStreamBuffer(buffer)
77 , m_state(Initial)
78 {
79 suspendIfNeeded();
80 }
81
82 explicit ReadableStreamSource(ExecutionContext* executionContext)
83 : ActiveDOMObject(executionContext)
84 , m_blobDataHandle(BlobDataHandle::create(BlobData::create(), 0))
85 , m_state(Initial)
86 {
87 suspendIfNeeded();
88 } 41 }
89 42
90 ~ReadableStreamSource() override { } 43 ~ReadableStreamSource() override { }
91 44
92 State state() const { return m_state; }
93
94 void startStream(ReadableByteStream* stream) 45 void startStream(ReadableByteStream* stream)
95 { 46 {
96 m_stream = stream; 47 m_stream = stream;
97 stream->didSourceStart(); 48 stream->didSourceStart();
98 } 49 }
99 // Creates a new BodyStreamBuffer to drain the data. 50 // Creates a new BodyStreamBuffer to drain the data.
100 BodyStreamBuffer* createDrainingStream() 51 PassOwnPtr<DrainingBodyStreamBuffer> createDrainingStream()
101 { 52 {
102 ASSERT(m_state != Initial); 53 if (!m_bodyStreamBuffer)
54 return nullptr;
103 55
104 auto drainingStreamBuffer = new BodyStreamBuffer(new Canceller(this)); 56 #if ENABLE(ASSERT)
105 if (m_stream->stateInternal() == ReadableByteStream::Closed) { 57 ASSERT(!m_drained);
106 drainingStreamBuffer->close(); 58 m_drained = true;
107 return drainingStreamBuffer; 59 ASSERT(!(m_stream->stateInternal() == ReadableByteStream::Closed && !m_i sCloseCalled));
108 } 60 ASSERT(!(m_stream->stateInternal() == ReadableByteStream::Errored && !m_ isErrorCalled));
109 if (m_stream->stateInternal() == ReadableByteStream::Errored) { 61 #endif
110 drainingStreamBuffer->error(exception());
111 return drainingStreamBuffer;
112 }
113 62
114 ASSERT(!m_drainingStreamBuffer); 63 m_reader.clear();
115 // Take back the data in |m_stream|. 64 return DrainingBodyStreamBuffer::create(m_bodyStreamBuffer, this);
116 Deque<std::pair<RefPtr<DOMArrayBufferView>, size_t>> tmp_queue; 65 }
117 ASSERT(m_stream->stateInternal() == ReadableStream::Readable);
118 m_stream->readInternal(tmp_queue);
119 while (!tmp_queue.isEmpty()) {
120 std::pair<RefPtr<DOMArrayBufferView>, size_t> data = tmp_queue.takeF irst();
121 drainingStreamBuffer->write(data.first->buffer());
122 }
123 if (m_state == Closed)
124 drainingStreamBuffer->close();
125 66
126 m_drainingStreamBuffer = drainingStreamBuffer;
127 return m_drainingStreamBuffer;
128 }
129 DEFINE_INLINE_VIRTUAL_TRACE() 67 DEFINE_INLINE_VIRTUAL_TRACE()
130 { 68 {
131 visitor->trace(m_bodyStreamBuffer); 69 visitor->trace(m_bodyStreamBuffer);
132 visitor->trace(m_drainingStreamBuffer);
133 visitor->trace(m_stream); 70 visitor->trace(m_stream);
134 BodyStreamBuffer::Observer::trace(visitor);
135 UnderlyingSource::trace(visitor); 71 UnderlyingSource::trace(visitor);
136 ActiveDOMObject::trace(visitor); 72 DrainingStreamNotificationClient::trace(visitor);
137 } 73 }
138 74
139 void close() 75 void close()
140 { 76 {
141 if (m_state == Closed) { 77 m_reader.clear();
142 // It is possible to call |close| from the source side (such
143 // as blob loading finish) and from the consumer side (such as
144 // calling |cancel|). Thus we should ignore it here.
145 return;
146 }
147 m_state = Closed;
148 if (m_drainingStreamBuffer)
149 m_drainingStreamBuffer->close();
150 m_stream->close(); 78 m_stream->close();
79 m_bodyStreamBuffer = BodyStreamBuffer::createEmpty();
80 #if ENABLE(ASSERT)
81 m_isCloseCalled = true;
82 #endif
151 } 83 }
84
152 void error() 85 void error()
153 { 86 {
154 m_state = Errored; 87 m_reader.clear();
155 if (m_drainingStreamBuffer) 88 m_stream->error(DOMException::create(NetworkError, "network error"));
156 m_drainingStreamBuffer->error(exception()); 89 m_bodyStreamBuffer = BodyStreamBuffer::create(createFetchDataConsumerHan dleFromWebHandle(createUnexpectedErrorDataConsumerHandle()));
157 m_stream->error(exception()); 90 #if ENABLE(ASSERT)
91 m_isErrorCalled = true;
92 #endif
158 } 93 }
159 94
160 private: 95 private:
161 class Canceller : public BodyStreamBuffer::Canceller { 96 void obtainReader()
162 public: 97 {
163 Canceller(ReadableStreamSource* source) : m_source(source) { } 98 m_reader = m_bodyStreamBuffer->handle()->obtainReader(this);
164 void cancel() override 99 }
165 { 100
166 m_source->cancel(); 101 void didFetchDataLoadFinishedFromDrainingStream()
102 {
103 ASSERT(m_bodyStreamBuffer);
104 ASSERT(m_drained);
105
106 #if ENABLE(ASSERT)
107 m_drained = false;
108 #endif
109 obtainReader();
110 // We have to call didGetReadable() now to call close()/error() if
111 // necessary.
112 // didGetReadable() would be called asynchronously, but it is too late.
113 didGetReadable();
114 }
115
116 void didGetReadable() override
117 {
118 if (!m_streamNeedsMore) {
119 // Perform zero-length read to call close()/error() early.
120 size_t readSize;
121 WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, We bDataConsumerHandle::FlagNone, &readSize);
122 switch (result) {
123 case WebDataConsumerHandle::Ok:
124 case WebDataConsumerHandle::ShouldWait:
125 return;
126 case WebDataConsumerHandle::Done:
127 close();
128 return;
129 case WebDataConsumerHandle::Busy:
130 case WebDataConsumerHandle::ResourceExhausted:
131 case WebDataConsumerHandle::UnexpectedError:
132 error();
133 return;
134 }
167 } 135 }
168 136
169 DEFINE_INLINE_VIRTUAL_TRACE() 137 processData();
170 { 138 }
171 visitor->trace(m_source);
172 BodyStreamBuffer::Canceller::trace(visitor);
173 }
174
175 private:
176 Member<ReadableStreamSource> m_source;
177 };
178 139
179 // UnderlyingSource functions. 140 // UnderlyingSource functions.
180 void pullSource() override 141 void pullSource() override
181 { 142 {
182 // Note that one |pull| is called only when |read| is called on the 143 ASSERT(!m_streamNeedsMore);
183 // associated ReadableByteStreamReader because we create a stream with 144 m_streamNeedsMore = true;
184 // StrictStrategy. 145
185 if (m_state == Initial) { 146 ASSERT(!m_drained);
186 m_state = Streaming; 147
187 if (m_bodyStreamBuffer) { 148 processData();
188 m_bodyStreamBuffer->registerObserver(this); 149 }
189 onWrite(); 150
190 if (m_bodyStreamBuffer->hasError()) 151 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride
191 return onError(); 152 {
192 if (m_bodyStreamBuffer->isClosed()) 153 close();
193 return onClose(); 154 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
194 } else { 155 }
195 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsAr rayBuffer; 156
196 m_loader = FileReaderLoader::create(readType, this); 157 // Reads data and writes the data to |m_stream|, as long as data are
197 m_loader->start(executionContext(), m_blobDataHandle); 158 // available and the stream has pending reads.
159 void processData()
160 {
161 ASSERT(m_reader);
162 while (m_streamNeedsMore) {
163 const void* buffer;
164 size_t available;
165 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
166 switch (result) {
167 case WebDataConsumerHandle::Ok:
168 m_streamNeedsMore = m_stream->enqueue(DOMUint8Array::create(stat ic_cast<const unsigned char*>(buffer), available));
169 m_reader->endRead(available);
170 break;
171
172 case WebDataConsumerHandle::Done:
173 close();
174 return;
175
176 case WebDataConsumerHandle::ShouldWait:
177 return;
178
179 case WebDataConsumerHandle::Busy:
180 case WebDataConsumerHandle::ResourceExhausted:
181 case WebDataConsumerHandle::UnexpectedError:
182 error();
183 return;
198 } 184 }
199 } 185 }
200 } 186 }
201 187
202 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride 188 // Source of data.
203 { 189 Member<BodyStreamBuffer> m_bodyStreamBuffer;
204 cancel(); 190 OwnPtr<FetchDataConsumerHandle::Reader> m_reader;
205 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
206 }
207 191
208 // BodyStreamBuffer::Observer functions.
209 void onWrite() override
210 {
211 ASSERT(m_state == Streaming);
212 while (RefPtr<DOMArrayBuffer> buf = m_bodyStreamBuffer->read()) {
213 write(buf);
214 }
215 }
216 void onClose() override
217 {
218 ASSERT(m_state == Streaming);
219 close();
220 m_bodyStreamBuffer->unregisterObserver();
221 }
222 void onError() override
223 {
224 ASSERT(m_state == Streaming);
225 error();
226 m_bodyStreamBuffer->unregisterObserver();
227 }
228
229 // FileReaderLoaderClient functions.
230 void didStartLoading() override { }
231 void didReceiveData() override { }
232 void didFinishLoading() override
233 {
234 ASSERT(m_state == Streaming);
235 write(m_loader->arrayBufferResult());
236 close();
237 }
238 void didFail(FileError::ErrorCode) override
239 {
240 ASSERT(m_state == Streaming);
241 error();
242 }
243
244 void write(PassRefPtr<DOMArrayBuffer> buf)
245 {
246 if (m_drainingStreamBuffer) {
247 m_drainingStreamBuffer->write(buf);
248 } else {
249 auto size = buf->byteLength();
250 m_stream->enqueue(DOMUint8Array::create(buf, 0, size));
251 }
252 }
253 void cancel()
254 {
255 if (m_bodyStreamBuffer) {
256 m_bodyStreamBuffer->cancel();
257 // We should not close the stream here, because it is canceller's
258 // responsibility.
259 } else {
260 if (m_loader)
261 m_loader->cancel();
262 close();
263 }
264 }
265
266 DOMException* exception()
267 {
268 if (m_state != Errored)
269 return nullptr;
270 if (m_bodyStreamBuffer) {
271 ASSERT(m_bodyStreamBuffer->exception());
272 return m_bodyStreamBuffer->exception();
273 }
274 return DOMException::create(NetworkError, "network error");
275 }
276
277 // Set when the data container of the Body is a BodyStreamBuffer.
278 Member<BodyStreamBuffer> m_bodyStreamBuffer;
279 // Set when the data container of the Body is a BlobDataHandle.
280 RefPtr<BlobDataHandle> m_blobDataHandle;
281 // Used to read the data from BlobDataHandle.
282 OwnPtr<FileReaderLoader> m_loader;
283 // Created when createDrainingStream is called to drain the data.
284 Member<BodyStreamBuffer> m_drainingStreamBuffer;
285 Member<ReadableByteStream> m_stream; 192 Member<ReadableByteStream> m_stream;
286 State m_state; 193 bool m_streamNeedsMore;
194 #if ENABLE(ASSERT)
195 bool m_drained;
196 bool m_isCloseCalled;
197 bool m_isErrorCalled;
198 #endif
287 }; 199 };
288 200
289 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) 201 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type)
290 { 202 {
291 if (bodyUsed()) 203 if (bodyUsed())
292 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read")); 204 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read"));
293 205
294 // When the main thread sends a V8::TerminateExecution() signal to a worker 206 // When the main thread sends a V8::TerminateExecution() signal to a worker
295 // thread, any V8 API on the worker thread starts returning an empty 207 // thread, any V8 API on the worker thread starts returning an empty
296 // handle. This can happen in Body::readAsync. To avoid the situation, we 208 // handle. This can happen in Body::readAsync. To avoid the situation, we
297 // first check the ExecutionContext and return immediately if it's already 209 // first check the ExecutionContext and return immediately if it's already
298 // gone (which means that the V8::TerminateExecution() signal has been sent 210 // gone (which means that the V8::TerminateExecution() signal has been sent
299 // to this worker thread). 211 // to this worker thread).
300 ExecutionContext* executionContext = scriptState->executionContext(); 212 ExecutionContext* executionContext = scriptState->executionContext();
301 if (!executionContext) 213 if (!executionContext)
302 return ScriptPromise(); 214 return ScriptPromise();
303 215
304 lockBody(); 216 lockBody();
305 m_responseType = type; 217 m_responseType = type;
306 218
307 ASSERT(!m_resolver); 219 ASSERT(!m_resolver);
308 m_resolver = ScriptPromiseResolver::create(scriptState); 220 m_resolver = ScriptPromiseResolver::create(scriptState);
309 ScriptPromise promise = m_resolver->promise(); 221 ScriptPromise promise = m_resolver->promise();
310 222
311 if (m_stream->stateInternal() == ReadableStream::Closed) { 223 if (m_stream->stateInternal() == ReadableStream::Closed) {
312 // We resolve the resolver manually in order not to use member 224 resolveWithEmptyDataSynchronously();
313 // variables.
314 switch (m_responseType) {
315 case ResponseAsArrayBuffer:
316 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
317 break;
318 case ResponseAsBlob: {
319 OwnPtr<BlobData> blobData = BlobData::create();
320 blobData->setContentType(mimeType());
321 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.rel ease(), 0)));
322 break;
323 }
324 case ResponseAsText:
325 m_resolver->resolve(String());
326 break;
327 case ResponseAsFormData:
328 // TODO(yhirano): Implement this.
329 ASSERT_NOT_REACHED();
330 break;
331 case ResponseAsJSON: {
332 ScriptState::Scope scope(m_resolver->scriptState());
333 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->s criptState()->isolate(), "Unexpected end of input"));
334 break;
335 }
336 case ResponseUnknown:
337 ASSERT_NOT_REACHED();
338 break;
339 }
340 m_resolver.clear();
341 } else if (m_stream->stateInternal() == ReadableStream::Errored) { 225 } else if (m_stream->stateInternal() == ReadableStream::Errored) {
342 m_resolver->reject(m_stream->storedException()); 226 m_resolver->reject(m_stream->storedException());
343 m_resolver.clear(); 227 m_resolver.clear();
344 } else if (isBodyConsumed()) {
345 m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(mimeT ype(), new BlobHandleReceiver(this));
346 } else if (buffer()) {
347 buffer()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver( this));
348 } else { 228 } else {
349 readAsyncFromBlob(blobDataHandle()); 229 readAsyncFromDrainingBodyStreamBuffer(createDrainingStream(), mimeType() );
350 } 230 }
351 return promise; 231 return promise;
352 } 232 }
353 233
354 void Body::readAsyncFromFetchDataConsumerHandle(FetchDataConsumerHandle* handle, const String& mimeType) 234 void Body::resolveWithEmptyDataSynchronously()
355 { 235 {
356 ASSERT(!m_fetchDataLoader); 236 // We resolve the resolver manually in order not to use member
237 // variables.
238 switch (m_responseType) {
239 case ResponseAsArrayBuffer:
240 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
241 break;
242 case ResponseAsBlob: {
243 OwnPtr<BlobData> blobData = BlobData::create();
244 blobData->setContentType(mimeType());
245 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release (), 0)));
246 break;
247 }
248 case ResponseAsText:
249 m_resolver->resolve(String());
250 break;
251 case ResponseAsFormData:
252 // TODO(yhirano): Implement this.
253 ASSERT_NOT_REACHED();
254 break;
255 case ResponseAsJSON: {
256 ScriptState::Scope scope(m_resolver->scriptState());
257 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->scrip tState()->isolate(), "Unexpected end of input"));
258 break;
259 }
260 case ResponseUnknown:
261 ASSERT_NOT_REACHED();
262 break;
263 }
264 m_resolver.clear();
265 }
266
267 void Body::readAsyncFromDrainingBodyStreamBuffer(PassOwnPtr<DrainingBodyStreamBu ffer> buffer, const String& mimeType)
268 {
269 if (!buffer) {
270 resolveWithEmptyDataSynchronously();
271 m_streamSource->close();
272 return;
273 }
274
275 FetchDataLoader* fetchDataLoader = nullptr;
357 276
358 switch (m_responseType) { 277 switch (m_responseType) {
359 case ResponseAsArrayBuffer: 278 case ResponseAsArrayBuffer:
360 m_fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer(); 279 fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer();
361 break; 280 break;
362 281
363 case ResponseAsJSON: 282 case ResponseAsJSON:
364 case ResponseAsText: 283 case ResponseAsText:
365 m_fetchDataLoader = FetchDataLoader::createLoaderAsString(); 284 fetchDataLoader = FetchDataLoader::createLoaderAsString();
366 break; 285 break;
367 286
368 case ResponseAsBlob: 287 case ResponseAsBlob:
369 m_fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType); 288 fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType);
370 break; 289 break;
371 290
372 case ResponseAsFormData: 291 case ResponseAsFormData:
373 // FIXME: Implement this. 292 // FIXME: Implement this.
374 ASSERT_NOT_REACHED(); 293 ASSERT_NOT_REACHED();
375 return; 294 return;
376 295
377 default: 296 default:
378 ASSERT_NOT_REACHED(); 297 ASSERT_NOT_REACHED();
379 return; 298 return;
380 } 299 }
381 300
382 m_fetchDataLoader->start(handle, this); 301 buffer->startLoading(fetchDataLoader, this);
383 }
384
385 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle)
386 {
387 readAsyncFromFetchDataConsumerHandle(FetchBlobDataConsumerHandle::create(exe cutionContext(), handle).get(), mimeType());
388 } 302 }
389 303
390 ScriptPromise Body::arrayBuffer(ScriptState* scriptState) 304 ScriptPromise Body::arrayBuffer(ScriptState* scriptState)
391 { 305 {
392 return readAsync(scriptState, ResponseAsArrayBuffer); 306 return readAsync(scriptState, ResponseAsArrayBuffer);
393 } 307 }
394 308
395 ScriptPromise Body::blob(ScriptState* scriptState) 309 ScriptPromise Body::blob(ScriptState* scriptState)
396 { 310 {
397 return readAsync(scriptState, ResponseAsBlob); 311 return readAsync(scriptState, ResponseAsBlob);
(...skipping 29 matching lines...) Expand all
427 { 341 {
428 ASSERT(!bodyUsed()); 342 ASSERT(!bodyUsed());
429 if (option == PassBody) 343 if (option == PassBody)
430 m_bodyUsed = true; 344 m_bodyUsed = true;
431 ASSERT(!m_stream->isLocked()); 345 ASSERT(!m_stream->isLocked());
432 TrackExceptionState exceptionState; 346 TrackExceptionState exceptionState;
433 m_stream->getBytesReader(executionContext(), exceptionState); 347 m_stream->getBytesReader(executionContext(), exceptionState);
434 ASSERT(!exceptionState.hadException()); 348 ASSERT(!exceptionState.hadException());
435 } 349 }
436 350
437 bool Body::isBodyConsumed() const 351 void Body::setBody(BodyStreamBuffer* buffer)
438 { 352 {
439 if (m_streamSource->state() != m_streamSource->Initial) { 353 m_streamSource = new ReadableStreamSource(executionContext(), buffer);
440 // Some data is pulled from the source.
441 return true;
442 }
443 if (m_stream->stateInternal() == ReadableStream::Closed) {
444 // Return true if the blob handle is originally not empty.
445 RefPtr<BlobDataHandle> handle = blobDataHandle();
446 return handle && handle->size();
447 }
448 if (m_stream->stateInternal() == ReadableStream::Errored) {
449 // The stream is errored. That means an effort to read data was made.
450 return true;
451 }
452 return false;
453 }
454
455 void Body::setBody(ReadableStreamSource* source)
456 {
457 m_streamSource = source;
458 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy); 354 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy);
459 m_streamSource->startStream(m_stream); 355 m_streamSource->startStream(m_stream);
460 } 356 }
461 357
462 BodyStreamBuffer* Body::createDrainingStream() 358 PassOwnPtr<DrainingBodyStreamBuffer> Body::createDrainingStream()
463 { 359 {
464 return m_streamSource->createDrainingStream(); 360 return m_streamSource->createDrainingStream();
465 } 361 }
466 362
467 void Body::stop()
468 {
469 if (m_fetchDataLoader) {
470 m_fetchDataLoader->cancel();
471 m_fetchDataLoader.clear();
472 }
473 }
474
475 bool Body::hasPendingActivity() const 363 bool Body::hasPendingActivity() const
476 { 364 {
477 if (executionContext()->activeDOMObjectsAreStopped()) 365 if (executionContext()->activeDOMObjectsAreStopped())
478 return false; 366 return false;
479 if (m_resolver) 367 if (m_resolver)
480 return true; 368 return true;
481 if (m_stream->isLocked()) 369 if (m_stream->isLocked())
482 return true; 370 return true;
483 return false; 371 return false;
484 } 372 }
485 373
486 Body::ReadableStreamSource* Body::createBodySource(PassRefPtr<BlobDataHandle> ha ndle)
487 {
488 return new ReadableStreamSource(executionContext(), handle);
489 }
490
491 Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer* buffer)
492 {
493 return new ReadableStreamSource(executionContext(), buffer);
494 }
495
496 DEFINE_TRACE(Body) 374 DEFINE_TRACE(Body)
497 { 375 {
498 visitor->trace(m_fetchDataLoader);
499 visitor->trace(m_resolver); 376 visitor->trace(m_resolver);
500 visitor->trace(m_stream); 377 visitor->trace(m_stream);
501 visitor->trace(m_streamSource); 378 visitor->trace(m_streamSource);
502 ActiveDOMObject::trace(visitor); 379 ActiveDOMObject::trace(visitor);
503 FetchDataLoader::Client::trace(visitor); 380 FetchDataLoader::Client::trace(visitor);
504 } 381 }
505 382
506 Body::Body(ExecutionContext* context) 383 Body::Body(ExecutionContext* context)
507 : ActiveDOMObject(context) 384 : ActiveDOMObject(context)
508 , m_bodyUsed(false) 385 , m_bodyUsed(false)
509 , m_responseType(ResponseType::ResponseUnknown) 386 , m_responseType(ResponseType::ResponseUnknown)
510 , m_streamSource(new ReadableStreamSource(context)) 387 , m_streamSource(new ReadableStreamSource(context, nullptr))
511 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy)) 388 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy))
512 { 389 {
513 m_streamSource->startStream(m_stream); 390 m_streamSource->startStream(m_stream);
514 } 391 }
515 392
516 void Body::resolveJSON(const String& string) 393 void Body::resolveJSON(const String& string)
517 { 394 {
518 ASSERT(m_responseType == ResponseAsJSON); 395 ASSERT(m_responseType == ResponseAsJSON);
519 ScriptState::Scope scope(m_resolver->scriptState()); 396 ScriptState::Scope scope(m_resolver->scriptState());
520 v8::Isolate* isolate = m_resolver->scriptState()->isolate(); 397 v8::Isolate* isolate = m_resolver->scriptState()->isolate();
521 v8::Local<v8::String> inputString = v8String(isolate, string); 398 v8::Local<v8::String> inputString = v8String(isolate, string);
522 v8::TryCatch trycatch; 399 v8::TryCatch trycatch;
523 v8::Local<v8::Value> parsed; 400 v8::Local<v8::Value> parsed;
524 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch)) 401 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch))
525 m_resolver->resolve(parsed); 402 m_resolver->resolve(parsed);
526 else 403 else
527 m_resolver->reject(trycatch.Exception()); 404 m_resolver->reject(trycatch.Exception());
528 } 405 }
529 406
530 // FetchDataLoader::Client functions. 407 // FetchDataLoader::Client functions.
531 void Body::didFetchDataLoadFailed() 408 void Body::didFetchDataLoadFailed()
532 { 409 {
533 ASSERT(m_fetchDataLoader);
534 m_fetchDataLoader.clear();
535
536 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 410 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
537 return; 411 return;
538 412
539 m_streamSource->error();
540 if (m_resolver) { 413 if (m_resolver) {
541 if (!m_resolver->executionContext() || m_resolver->executionContext()->a ctiveDOMObjectsAreStopped()) { 414 if (!m_resolver->executionContext() || m_resolver->executionContext()->a ctiveDOMObjectsAreStopped()) {
542 m_resolver.clear(); 415 m_resolver.clear();
543 return; 416 return;
544 } 417 }
545 ScriptState* state = m_resolver->scriptState(); 418 ScriptState* state = m_resolver->scriptState();
546 ScriptState::Scope scope(state); 419 ScriptState::Scope scope(state);
547 m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), " Failed to fetch")); 420 m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), " Failed to fetch"));
548 m_resolver.clear(); 421 m_resolver.clear();
549 } 422 }
550 } 423 }
551 424
552 void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandl e) 425 void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandl e)
553 { 426 {
554 ASSERT(m_fetchDataLoader);
555 m_fetchDataLoader.clear();
556
557 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 427 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
558 return; 428 return;
559 429
560 ASSERT(m_responseType == ResponseAsBlob); 430 ASSERT(m_responseType == ResponseAsBlob);
561 m_resolver->resolve(Blob::create(blobDataHandle)); 431 m_resolver->resolve(Blob::create(blobDataHandle));
562 m_streamSource->close();
563 m_resolver.clear(); 432 m_resolver.clear();
564 } 433 }
565 434
566 void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) 435 void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer)
567 { 436 {
568 ASSERT(m_fetchDataLoader);
569 m_fetchDataLoader.clear();
570
571 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 437 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
572 return; 438 return;
573 439
574 ASSERT(m_responseType == ResponseAsArrayBuffer); 440 ASSERT(m_responseType == ResponseAsArrayBuffer);
575 m_resolver->resolve(arrayBuffer); 441 m_resolver->resolve(arrayBuffer);
576 m_streamSource->close();
577 m_resolver.clear(); 442 m_resolver.clear();
578 } 443 }
579 444
580 void Body::didFetchDataLoadedString(const String& str) 445 void Body::didFetchDataLoadedString(const String& str)
581 { 446 {
582 ASSERT(m_fetchDataLoader);
583 m_fetchDataLoader.clear();
584
585 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 447 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
586 return; 448 return;
587 449
588 switch (m_responseType) { 450 switch (m_responseType) {
589 case ResponseAsJSON: 451 case ResponseAsJSON:
590 resolveJSON(str); 452 resolveJSON(str);
591 break; 453 break;
592 case ResponseAsText: 454 case ResponseAsText:
593 m_resolver->resolve(str); 455 m_resolver->resolve(str);
594 break; 456 break;
595 default: 457 default:
596 ASSERT_NOT_REACHED(); 458 ASSERT_NOT_REACHED();
597 } 459 }
598 460
599 m_streamSource->close();
600 m_resolver.clear(); 461 m_resolver.clear();
601 } 462 }
602 463
603 void Body::didBlobHandleReceiveError(DOMException* exception)
604 {
605 if (!m_resolver)
606 return;
607 m_streamSource->error();
608 m_resolver->reject(exception);
609 m_resolver.clear();
610 }
611
612 } // namespace blink 464 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/Body.h ('k') | Source/modules/fetch/BodyStreamBuffer.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698