Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(77)

Side by Side Diff: Source/modules/fetch/Body.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Reflect comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/Body.h" 6 #include "modules/fetch/Body.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromiseResolver.h" 9 #include "bindings/core/v8/ScriptPromiseResolver.h"
10 #include "bindings/core/v8/ScriptState.h" 10 #include "bindings/core/v8/ScriptState.h"
11 #include "bindings/core/v8/V8ArrayBuffer.h" 11 #include "bindings/core/v8/V8ArrayBuffer.h"
12 #include "bindings/core/v8/V8ThrowException.h" 12 #include "bindings/core/v8/V8ThrowException.h"
13 #include "core/dom/DOMArrayBuffer.h" 13 #include "core/dom/DOMArrayBuffer.h"
14 #include "core/dom/DOMTypedArray.h" 14 #include "core/dom/DOMTypedArray.h"
15 #include "core/dom/ExceptionCode.h"
15 #include "core/fileapi/Blob.h" 16 #include "core/fileapi/Blob.h"
16 #include "core/fileapi/FileReaderLoader.h"
17 #include "core/fileapi/FileReaderLoaderClient.h"
18 #include "core/frame/UseCounter.h" 17 #include "core/frame/UseCounter.h"
19 #include "core/streams/ReadableByteStream.h" 18 #include "core/streams/ReadableByteStream.h"
20 #include "core/streams/ReadableByteStreamReader.h" 19 #include "core/streams/ReadableByteStreamReader.h"
21 #include "core/streams/UnderlyingSource.h" 20 #include "core/streams/UnderlyingSource.h"
22 #include "modules/fetch/BodyStreamBuffer.h" 21 #include "modules/fetch/BodyStreamBuffer.h"
23 #include "modules/fetch/DataConsumerHandleUtil.h" 22 #include "modules/fetch/DataConsumerHandleUtil.h"
24 #include "modules/fetch/FetchBlobDataConsumerHandle.h" 23 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
25 24
26 namespace blink { 25 namespace blink {
27 26
28 class Body::BlobHandleReceiver final : public BodyStreamBuffer::BlobHandleCreato rClient { 27 class Body::ReadableStreamSource : public GarbageCollectedFinalized<Body::Readab leStreamSource>, public UnderlyingSource, public WebDataConsumerHandle::Client, public BodyStreamBuffer::DrainingStreamNotificationClient {
29 public:
30 explicit BlobHandleReceiver(Body* body)
31 : m_body(body)
32 {
33 }
34 void didCreateBlobHandle(PassRefPtr<BlobDataHandle> handle) override
35 {
36 ASSERT(m_body);
37 m_body->readAsyncFromBlob(handle);
38 m_body = nullptr;
39 }
40 void didFail(DOMException* exception) override
41 {
42 ASSERT(m_body);
43 m_body->didBlobHandleReceiveError(exception);
44 m_body = nullptr;
45 }
46 DEFINE_INLINE_VIRTUAL_TRACE()
47 {
48 BodyStreamBuffer::BlobHandleCreatorClient::trace(visitor);
49 visitor->trace(m_body);
50 }
51 private:
52 Member<Body> m_body;
53 };
54
55 // This class is an ActiveDOMObject subclass only for holding the
56 // ExecutionContext used in |pullSource|.
57 class Body::ReadableStreamSource : public BodyStreamBuffer::Observer, public Und erlyingSource, public FileReaderLoaderClient, public ActiveDOMObject {
58 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); 28 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
59 public: 29 public:
60 enum State { 30 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer)
61 Initial, 31 : m_bodyStreamBuffer(buffer)
62 Streaming, 32 , m_streamNeedsMore(false)
63 Closed, 33 #if ENABLE(ASSERT)
64 Errored, 34 , m_drained(false)
65 }; 35 #endif
66 ReadableStreamSource(ExecutionContext* executionContext, PassRefPtr<BlobData Handle> handle)
67 : ActiveDOMObject(executionContext)
68 , m_blobDataHandle(handle ? handle : BlobDataHandle::create(BlobData::cr eate(), 0))
69 , m_state(Initial)
70 { 36 {
71 suspendIfNeeded(); 37 if (m_bodyStreamBuffer)
72 } 38 obtainReader();
73
74 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer)
75 : ActiveDOMObject(executionContext)
76 , m_bodyStreamBuffer(buffer)
77 , m_state(Initial)
78 {
79 suspendIfNeeded();
80 }
81
82 explicit ReadableStreamSource(ExecutionContext* executionContext)
83 : ActiveDOMObject(executionContext)
84 , m_blobDataHandle(BlobDataHandle::create(BlobData::create(), 0))
85 , m_state(Initial)
86 {
87 suspendIfNeeded();
88 } 39 }
89 40
90 ~ReadableStreamSource() override { } 41 ~ReadableStreamSource() override { }
91 42
92 State state() const { return m_state; }
93
94 void startStream(ReadableByteStream* stream) 43 void startStream(ReadableByteStream* stream)
95 { 44 {
96 m_stream = stream; 45 m_stream = stream;
97 stream->didSourceStart(); 46 stream->didSourceStart();
98 } 47 }
99 // Creates a new BodyStreamBuffer to drain the data. 48 // Creates a new BodyStreamBuffer to drain the data.
100 BodyStreamBuffer* createDrainingStream() 49 PassOwnPtr<DrainingBodyStreamBuffer> createDrainingStream()
101 { 50 {
102 ASSERT(m_state != Initial); 51 if (!m_bodyStreamBuffer)
52 return nullptr;
103 53
104 auto drainingStreamBuffer = new BodyStreamBuffer(new Canceller(this)); 54 ASSERT(!m_drained);
105 if (m_stream->stateInternal() == ReadableByteStream::Closed) { 55 #if ENABLE(ASSERT)
106 drainingStreamBuffer->close(); 56 m_drained = true;
107 return drainingStreamBuffer; 57 #endif
108 }
109 if (m_stream->stateInternal() == ReadableByteStream::Errored) {
110 drainingStreamBuffer->error(exception());
111 return drainingStreamBuffer;
112 }
113 58
114 ASSERT(!m_drainingStreamBuffer); 59 if (m_stream->stateInternal() == ReadableByteStream::Closed)
yhirano 2015/07/06 03:25:38 [optional] It would be good to move this block to
hiroshige 2015/07/06 05:47:07 Can we assume |m_stream|'s state changes to Closed
yhirano 2015/07/06 06:14:00 You're right. Though currently all close / error c
hiroshige 2015/07/07 03:50:57 I moved the blocks and added asserts here.
115 // Take back the data in |m_stream|. 60 m_bodyStreamBuffer = BodyStreamBuffer::createEmpty();
116 Deque<std::pair<RefPtr<DOMArrayBufferView>, size_t>> tmp_queue; 61 if (m_stream->stateInternal() == ReadableByteStream::Errored)
yhirano 2015/07/06 03:25:38 [optional] It would be good to move this block to
hiroshige 2015/07/07 03:50:57 Done.
117 ASSERT(m_stream->stateInternal() == ReadableStream::Readable); 62 m_bodyStreamBuffer = BodyStreamBuffer::create(createFetchDataConsume rHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle()));
118 m_stream->readInternal(tmp_queue);
119 while (!tmp_queue.isEmpty()) {
120 std::pair<RefPtr<DOMArrayBufferView>, size_t> data = tmp_queue.takeF irst();
121 drainingStreamBuffer->write(data.first->buffer());
122 }
123 if (m_state == Closed)
124 drainingStreamBuffer->close();
125 63
126 m_drainingStreamBuffer = drainingStreamBuffer; 64 m_reader.clear();
127 return m_drainingStreamBuffer; 65 return DrainingBodyStreamBuffer::create(m_bodyStreamBuffer, this);
128 } 66 }
67
129 DEFINE_INLINE_VIRTUAL_TRACE() 68 DEFINE_INLINE_VIRTUAL_TRACE()
130 { 69 {
131 visitor->trace(m_bodyStreamBuffer); 70 visitor->trace(m_bodyStreamBuffer);
132 visitor->trace(m_drainingStreamBuffer);
133 visitor->trace(m_stream); 71 visitor->trace(m_stream);
134 BodyStreamBuffer::Observer::trace(visitor);
135 UnderlyingSource::trace(visitor); 72 UnderlyingSource::trace(visitor);
136 ActiveDOMObject::trace(visitor); 73 DrainingStreamNotificationClient::trace(visitor);
137 } 74 }
138 75
139 void close() 76 void close()
140 { 77 {
141 if (m_state == Closed) { 78 m_reader.clear();
142 // It is possible to call |close| from the source side (such
143 // as blob loading finish) and from the consumer side (such as
144 // calling |cancel|). Thus we should ignore it here.
145 return;
146 }
147 m_state = Closed;
148 if (m_drainingStreamBuffer)
149 m_drainingStreamBuffer->close();
150 m_stream->close(); 79 m_stream->close();
151 } 80 }
152 void error() 81 void error()
153 { 82 {
154 m_state = Errored; 83 m_reader.clear();
155 if (m_drainingStreamBuffer) 84 m_stream->error(DOMException::create(NetworkError, "network error"));
156 m_drainingStreamBuffer->error(exception()); 85 }
157 m_stream->error(exception()); 86
87 void obtainReader()
yhirano 2015/07/06 03:25:38 This function can be private.
hiroshige 2015/07/07 03:50:57 Done.
88 {
89 m_reader = m_bodyStreamBuffer->handle()->obtainReader(this);
158 } 90 }
159 91
160 private: 92 private:
161 class Canceller : public BodyStreamBuffer::Canceller { 93 void didFetchDataLoadFinishedFromDrainingStream()
162 public: 94 {
163 Canceller(ReadableStreamSource* source) : m_source(source) { } 95 ASSERT(m_bodyStreamBuffer);
164 void cancel() override 96 ASSERT(m_drained);
165 { 97
166 m_source->cancel(); 98 #if ENABLE(ASSERT)
99 m_drained = false;
100 #endif
101 obtainReader();
102 // We have to call didGetReadable() now to call close()/error() if
103 // necessary.
104 // didGetReadable() would be called asynchronously, but it is too late.
105 didGetReadable();
106 }
107
108 void didGetReadable() override
109 {
110 if (!m_streamNeedsMore) {
111 // Perform zero-length read to call close()/error() early.
112 size_t readSize;
113 WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, We bDataConsumerHandle::FlagNone, &readSize);
114 switch (result) {
115 case WebDataConsumerHandle::Ok:
116 case WebDataConsumerHandle::ShouldWait:
117 return;
118 case WebDataConsumerHandle::Done:
119 close();
120 return;
121 case WebDataConsumerHandle::Busy:
122 case WebDataConsumerHandle::ResourceExhausted:
123 case WebDataConsumerHandle::UnexpectedError:
124 error();
125 return;
126 }
167 } 127 }
168 128
169 DEFINE_INLINE_VIRTUAL_TRACE() 129 processData();
170 { 130 }
171 visitor->trace(m_source);
172 BodyStreamBuffer::Canceller::trace(visitor);
173 }
174
175 private:
176 Member<ReadableStreamSource> m_source;
177 };
178 131
179 // UnderlyingSource functions. 132 // UnderlyingSource functions.
180 void pullSource() override 133 void pullSource() override
181 { 134 {
182 // Note that one |pull| is called only when |read| is called on the 135 ASSERT(!m_streamNeedsMore);
183 // associated ReadableByteStreamReader because we create a stream with 136 m_streamNeedsMore = true;
184 // StrictStrategy. 137
185 if (m_state == Initial) { 138 ASSERT(!m_drained);
186 m_state = Streaming; 139
187 if (m_bodyStreamBuffer) { 140 processData();
188 m_bodyStreamBuffer->registerObserver(this); 141 }
189 onWrite(); 142
190 if (m_bodyStreamBuffer->hasError()) 143 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride
191 return onError(); 144 {
192 if (m_bodyStreamBuffer->isClosed()) 145 close();
193 return onClose(); 146 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
194 } else { 147 }
195 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsAr rayBuffer; 148
196 m_loader = FileReaderLoader::create(readType, this); 149 // Reads data and writes the data to |m_stream|, as long as data are
197 m_loader->start(executionContext(), m_blobDataHandle); 150 // available and the stream has pending reads.
151 void processData()
152 {
153 ASSERT(m_reader);
154 while (m_streamNeedsMore) {
155 const void* buffer;
156 size_t available;
157 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
158 switch (result) {
159 case WebDataConsumerHandle::Ok:
160 m_streamNeedsMore = m_stream->enqueue(DOMUint8Array::create(stat ic_cast<const unsigned char*>(buffer), available));
161 m_reader->endRead(available);
162 break;
163
164 case WebDataConsumerHandle::Done:
165 close();
166 return;
167
168 case WebDataConsumerHandle::ShouldWait:
169 return;
170
171 case WebDataConsumerHandle::Busy:
172 case WebDataConsumerHandle::ResourceExhausted:
173 case WebDataConsumerHandle::UnexpectedError:
174 error();
175 return;
198 } 176 }
199 } 177 }
200 } 178 }
201 179
202 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride 180 // Source of data.
203 { 181 Member<BodyStreamBuffer> m_bodyStreamBuffer;
204 cancel(); 182 OwnPtr<FetchDataConsumerHandle::Reader> m_reader;
205 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
206 }
207 183
208 // BodyStreamBuffer::Observer functions.
209 void onWrite() override
210 {
211 ASSERT(m_state == Streaming);
212 while (RefPtr<DOMArrayBuffer> buf = m_bodyStreamBuffer->read()) {
213 write(buf);
214 }
215 }
216 void onClose() override
217 {
218 ASSERT(m_state == Streaming);
219 close();
220 m_bodyStreamBuffer->unregisterObserver();
221 }
222 void onError() override
223 {
224 ASSERT(m_state == Streaming);
225 error();
226 m_bodyStreamBuffer->unregisterObserver();
227 }
228
229 // FileReaderLoaderClient functions.
230 void didStartLoading() override { }
231 void didReceiveData() override { }
232 void didFinishLoading() override
233 {
234 ASSERT(m_state == Streaming);
235 write(m_loader->arrayBufferResult());
236 close();
237 }
238 void didFail(FileError::ErrorCode) override
239 {
240 ASSERT(m_state == Streaming);
241 error();
242 }
243
244 void write(PassRefPtr<DOMArrayBuffer> buf)
245 {
246 if (m_drainingStreamBuffer) {
247 m_drainingStreamBuffer->write(buf);
248 } else {
249 auto size = buf->byteLength();
250 m_stream->enqueue(DOMUint8Array::create(buf, 0, size));
251 }
252 }
253 void cancel()
254 {
255 if (m_bodyStreamBuffer) {
256 m_bodyStreamBuffer->cancel();
257 // We should not close the stream here, because it is canceller's
258 // responsibility.
259 } else {
260 if (m_loader)
261 m_loader->cancel();
262 close();
263 }
264 }
265
266 DOMException* exception()
267 {
268 if (m_state != Errored)
269 return nullptr;
270 if (m_bodyStreamBuffer) {
271 ASSERT(m_bodyStreamBuffer->exception());
272 return m_bodyStreamBuffer->exception();
273 }
274 return DOMException::create(NetworkError, "network error");
275 }
276
277 // Set when the data container of the Body is a BodyStreamBuffer.
278 Member<BodyStreamBuffer> m_bodyStreamBuffer;
279 // Set when the data container of the Body is a BlobDataHandle.
280 RefPtr<BlobDataHandle> m_blobDataHandle;
281 // Used to read the data from BlobDataHandle.
282 OwnPtr<FileReaderLoader> m_loader;
283 // Created when createDrainingStream is called to drain the data.
284 Member<BodyStreamBuffer> m_drainingStreamBuffer;
285 Member<ReadableByteStream> m_stream; 184 Member<ReadableByteStream> m_stream;
286 State m_state; 185 bool m_streamNeedsMore;
186 #if ENABLE(ASSERT)
187 bool m_drained;
188 #endif
287 }; 189 };
288 190
289 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) 191 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type)
290 { 192 {
291 if (bodyUsed()) 193 if (bodyUsed())
292 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read")); 194 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read"));
293 195
294 // When the main thread sends a V8::TerminateExecution() signal to a worker 196 // When the main thread sends a V8::TerminateExecution() signal to a worker
295 // thread, any V8 API on the worker thread starts returning an empty 197 // thread, any V8 API on the worker thread starts returning an empty
296 // handle. This can happen in Body::readAsync. To avoid the situation, we 198 // handle. This can happen in Body::readAsync. To avoid the situation, we
297 // first check the ExecutionContext and return immediately if it's already 199 // first check the ExecutionContext and return immediately if it's already
298 // gone (which means that the V8::TerminateExecution() signal has been sent 200 // gone (which means that the V8::TerminateExecution() signal has been sent
299 // to this worker thread). 201 // to this worker thread).
300 ExecutionContext* executionContext = scriptState->executionContext(); 202 ExecutionContext* executionContext = scriptState->executionContext();
301 if (!executionContext) 203 if (!executionContext)
302 return ScriptPromise(); 204 return ScriptPromise();
303 205
304 lockBody(); 206 lockBody();
305 m_responseType = type; 207 m_responseType = type;
306 208
307 ASSERT(!m_resolver); 209 ASSERT(!m_resolver);
308 m_resolver = ScriptPromiseResolver::create(scriptState); 210 m_resolver = ScriptPromiseResolver::create(scriptState);
309 ScriptPromise promise = m_resolver->promise(); 211 ScriptPromise promise = m_resolver->promise();
310 212
311 if (m_stream->stateInternal() == ReadableStream::Closed) { 213 if (m_stream->stateInternal() == ReadableStream::Closed) {
312 // We resolve the resolver manually in order not to use member 214 resolveWithEmptyDataSynchronously();
313 // variables.
314 switch (m_responseType) {
315 case ResponseAsArrayBuffer:
316 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
317 break;
318 case ResponseAsBlob: {
319 OwnPtr<BlobData> blobData = BlobData::create();
320 blobData->setContentType(mimeType());
321 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.rel ease(), 0)));
322 break;
323 }
324 case ResponseAsText:
325 m_resolver->resolve(String());
326 break;
327 case ResponseAsFormData:
328 // TODO(yhirano): Implement this.
329 ASSERT_NOT_REACHED();
330 break;
331 case ResponseAsJSON: {
332 ScriptState::Scope scope(m_resolver->scriptState());
333 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->s criptState()->isolate(), "Unexpected end of input"));
334 break;
335 }
336 case ResponseUnknown:
337 ASSERT_NOT_REACHED();
338 break;
339 }
340 m_resolver.clear();
341 } else if (m_stream->stateInternal() == ReadableStream::Errored) { 215 } else if (m_stream->stateInternal() == ReadableStream::Errored) {
342 m_resolver->reject(m_stream->storedException()); 216 m_resolver->reject(m_stream->storedException());
343 m_resolver.clear(); 217 m_resolver.clear();
344 } else if (isBodyConsumed()) {
345 m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(mimeT ype(), new BlobHandleReceiver(this));
346 } else if (buffer()) {
347 buffer()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver( this));
348 } else { 218 } else {
349 readAsyncFromBlob(blobDataHandle()); 219 readAsyncFromDrainingBodyStreamBuffer(createDrainingStream(), mimeType() );
350 } 220 }
351 return promise; 221 return promise;
352 } 222 }
353 223
354 void Body::readAsyncFromFetchDataConsumerHandle(FetchDataConsumerHandle* handle, const String& mimeType) 224 void Body::resolveWithEmptyDataSynchronously()
355 { 225 {
356 ASSERT(!m_fetchDataLoader); 226 // We resolve the resolver manually in order not to use member
227 // variables.
228 switch (m_responseType) {
229 case ResponseAsArrayBuffer:
230 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
231 break;
232 case ResponseAsBlob: {
233 OwnPtr<BlobData> blobData = BlobData::create();
234 blobData->setContentType(mimeType());
235 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release (), 0)));
236 break;
237 }
238 case ResponseAsText:
239 m_resolver->resolve(String());
240 break;
241 case ResponseAsFormData:
242 // TODO(yhirano): Implement this.
243 ASSERT_NOT_REACHED();
244 break;
245 case ResponseAsJSON: {
246 ScriptState::Scope scope(m_resolver->scriptState());
247 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->scrip tState()->isolate(), "Unexpected end of input"));
248 break;
249 }
250 case ResponseUnknown:
251 ASSERT_NOT_REACHED();
252 break;
253 }
254 m_resolver.clear();
255 }
256
257 void Body::readAsyncFromDrainingBodyStreamBuffer(PassOwnPtr<DrainingBodyStreamBu ffer> buffer, const String& mimeType)
258 {
259 if (!buffer) {
260 resolveWithEmptyDataSynchronously();
261 m_streamSource->close();
262 return;
263 }
264
265 FetchDataLoader* fetchDataLoader = nullptr;
357 266
358 switch (m_responseType) { 267 switch (m_responseType) {
359 case ResponseAsArrayBuffer: 268 case ResponseAsArrayBuffer:
360 m_fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer(); 269 fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer();
361 break; 270 break;
362 271
363 case ResponseAsJSON: 272 case ResponseAsJSON:
364 case ResponseAsText: 273 case ResponseAsText:
365 m_fetchDataLoader = FetchDataLoader::createLoaderAsString(); 274 fetchDataLoader = FetchDataLoader::createLoaderAsString();
366 break; 275 break;
367 276
368 case ResponseAsBlob: 277 case ResponseAsBlob:
369 m_fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType); 278 fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType);
370 break; 279 break;
371 280
372 case ResponseAsFormData: 281 case ResponseAsFormData:
373 // FIXME: Implement this. 282 // FIXME: Implement this.
374 ASSERT_NOT_REACHED(); 283 ASSERT_NOT_REACHED();
375 return; 284 return;
376 285
377 default: 286 default:
378 ASSERT_NOT_REACHED(); 287 ASSERT_NOT_REACHED();
379 return; 288 return;
380 } 289 }
381 290
382 m_fetchDataLoader->start(handle, this); 291 buffer->startLoading(fetchDataLoader, this);
383 }
384
385 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle)
386 {
387 readAsyncFromFetchDataConsumerHandle(FetchBlobDataConsumerHandle::create(exe cutionContext(), handle).get(), mimeType());
388 } 292 }
389 293
390 ScriptPromise Body::arrayBuffer(ScriptState* scriptState) 294 ScriptPromise Body::arrayBuffer(ScriptState* scriptState)
391 { 295 {
392 return readAsync(scriptState, ResponseAsArrayBuffer); 296 return readAsync(scriptState, ResponseAsArrayBuffer);
393 } 297 }
394 298
395 ScriptPromise Body::blob(ScriptState* scriptState) 299 ScriptPromise Body::blob(ScriptState* scriptState)
396 { 300 {
397 return readAsync(scriptState, ResponseAsBlob); 301 return readAsync(scriptState, ResponseAsBlob);
(...skipping 29 matching lines...) Expand all
427 { 331 {
428 ASSERT(!bodyUsed()); 332 ASSERT(!bodyUsed());
429 if (option == PassBody) 333 if (option == PassBody)
430 m_bodyUsed = true; 334 m_bodyUsed = true;
431 ASSERT(!m_stream->isLocked()); 335 ASSERT(!m_stream->isLocked());
432 TrackExceptionState exceptionState; 336 TrackExceptionState exceptionState;
433 m_stream->getBytesReader(executionContext(), exceptionState); 337 m_stream->getBytesReader(executionContext(), exceptionState);
434 ASSERT(!exceptionState.hadException()); 338 ASSERT(!exceptionState.hadException());
435 } 339 }
436 340
437 bool Body::isBodyConsumed() const 341 void Body::setBody(BodyStreamBuffer* buffer)
438 { 342 {
439 if (m_streamSource->state() != m_streamSource->Initial) { 343 m_streamSource = new ReadableStreamSource(executionContext(), buffer);
440 // Some data is pulled from the source.
441 return true;
442 }
443 if (m_stream->stateInternal() == ReadableStream::Closed) {
444 // Return true if the blob handle is originally not empty.
445 RefPtr<BlobDataHandle> handle = blobDataHandle();
446 return handle && handle->size();
447 }
448 if (m_stream->stateInternal() == ReadableStream::Errored) {
449 // The stream is errored. That means an effort to read data was made.
450 return true;
451 }
452 return false;
453 }
454
455 void Body::setBody(ReadableStreamSource* source)
456 {
457 m_streamSource = source;
458 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy); 344 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy);
459 m_streamSource->startStream(m_stream); 345 m_streamSource->startStream(m_stream);
460 } 346 }
461 347
462 BodyStreamBuffer* Body::createDrainingStream() 348 PassOwnPtr<DrainingBodyStreamBuffer> Body::createDrainingStream()
463 { 349 {
464 return m_streamSource->createDrainingStream(); 350 return m_streamSource->createDrainingStream();
465 } 351 }
466 352
467 void Body::stop()
468 {
469 if (m_fetchDataLoader) {
470 m_fetchDataLoader->cancel();
471 m_fetchDataLoader.clear();
472 }
473 }
474
475 bool Body::hasPendingActivity() const 353 bool Body::hasPendingActivity() const
476 { 354 {
477 if (executionContext()->activeDOMObjectsAreStopped()) 355 if (executionContext()->activeDOMObjectsAreStopped())
478 return false; 356 return false;
479 if (m_resolver) 357 if (m_resolver)
480 return true; 358 return true;
481 if (m_stream->isLocked()) 359 if (m_stream->isLocked())
482 return true; 360 return true;
483 return false; 361 return false;
484 } 362 }
485 363
486 Body::ReadableStreamSource* Body::createBodySource(PassRefPtr<BlobDataHandle> ha ndle)
487 {
488 return new ReadableStreamSource(executionContext(), handle);
489 }
490
491 Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer* buffer)
492 {
493 return new ReadableStreamSource(executionContext(), buffer);
494 }
495
496 DEFINE_TRACE(Body) 364 DEFINE_TRACE(Body)
497 { 365 {
498 visitor->trace(m_fetchDataLoader);
499 visitor->trace(m_resolver); 366 visitor->trace(m_resolver);
500 visitor->trace(m_stream); 367 visitor->trace(m_stream);
501 visitor->trace(m_streamSource); 368 visitor->trace(m_streamSource);
502 ActiveDOMObject::trace(visitor); 369 ActiveDOMObject::trace(visitor);
503 FetchDataLoader::Client::trace(visitor); 370 FetchDataLoader::Client::trace(visitor);
504 } 371 }
505 372
506 Body::Body(ExecutionContext* context) 373 Body::Body(ExecutionContext* context)
507 : ActiveDOMObject(context) 374 : ActiveDOMObject(context)
508 , m_bodyUsed(false) 375 , m_bodyUsed(false)
509 , m_responseType(ResponseType::ResponseUnknown) 376 , m_responseType(ResponseType::ResponseUnknown)
510 , m_streamSource(new ReadableStreamSource(context)) 377 , m_streamSource(new ReadableStreamSource(context, nullptr))
511 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy)) 378 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy))
512 { 379 {
513 m_streamSource->startStream(m_stream); 380 m_streamSource->startStream(m_stream);
514 } 381 }
515 382
516 void Body::resolveJSON(const String& string) 383 void Body::resolveJSON(const String& string)
517 { 384 {
518 ASSERT(m_responseType == ResponseAsJSON); 385 ASSERT(m_responseType == ResponseAsJSON);
519 ScriptState::Scope scope(m_resolver->scriptState()); 386 ScriptState::Scope scope(m_resolver->scriptState());
520 v8::Isolate* isolate = m_resolver->scriptState()->isolate(); 387 v8::Isolate* isolate = m_resolver->scriptState()->isolate();
521 v8::Local<v8::String> inputString = v8String(isolate, string); 388 v8::Local<v8::String> inputString = v8String(isolate, string);
522 v8::TryCatch trycatch; 389 v8::TryCatch trycatch;
523 v8::Local<v8::Value> parsed; 390 v8::Local<v8::Value> parsed;
524 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch)) 391 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch))
525 m_resolver->resolve(parsed); 392 m_resolver->resolve(parsed);
526 else 393 else
527 m_resolver->reject(trycatch.Exception()); 394 m_resolver->reject(trycatch.Exception());
528 } 395 }
529 396
530 // FetchDataLoader::Client functions. 397 // FetchDataLoader::Client functions.
531 void Body::didFetchDataLoadFailed() 398 void Body::didFetchDataLoadFailed()
532 { 399 {
533 ASSERT(m_fetchDataLoader);
534 m_fetchDataLoader.clear();
535
536 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 400 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
537 return; 401 return;
538 402
539 m_streamSource->error();
540 if (m_resolver) { 403 if (m_resolver) {
541 if (!m_resolver->executionContext() || m_resolver->executionContext()->a ctiveDOMObjectsAreStopped()) { 404 if (!m_resolver->executionContext() || m_resolver->executionContext()->a ctiveDOMObjectsAreStopped()) {
542 m_resolver.clear(); 405 m_resolver.clear();
543 return; 406 return;
544 } 407 }
545 ScriptState* state = m_resolver->scriptState(); 408 ScriptState* state = m_resolver->scriptState();
546 ScriptState::Scope scope(state); 409 ScriptState::Scope scope(state);
547 m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), " Failed to fetch")); 410 m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), " Failed to fetch"));
548 m_resolver.clear(); 411 m_resolver.clear();
549 } 412 }
550 } 413 }
551 414
552 void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandl e) 415 void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandl e)
553 { 416 {
554 ASSERT(m_fetchDataLoader);
555 m_fetchDataLoader.clear();
556
557 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 417 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
558 return; 418 return;
559 419
560 ASSERT(m_responseType == ResponseAsBlob); 420 ASSERT(m_responseType == ResponseAsBlob);
561 m_resolver->resolve(Blob::create(blobDataHandle)); 421 m_resolver->resolve(Blob::create(blobDataHandle));
562 m_streamSource->close();
563 m_resolver.clear(); 422 m_resolver.clear();
564 } 423 }
565 424
566 void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) 425 void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer)
567 { 426 {
568 ASSERT(m_fetchDataLoader);
569 m_fetchDataLoader.clear();
570
571 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 427 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
572 return; 428 return;
573 429
574 ASSERT(m_responseType == ResponseAsArrayBuffer); 430 ASSERT(m_responseType == ResponseAsArrayBuffer);
575 m_resolver->resolve(arrayBuffer); 431 m_resolver->resolve(arrayBuffer);
576 m_streamSource->close();
577 m_resolver.clear(); 432 m_resolver.clear();
578 } 433 }
579 434
580 void Body::didFetchDataLoadedString(const String& str) 435 void Body::didFetchDataLoadedString(const String& str)
581 { 436 {
582 ASSERT(m_fetchDataLoader);
583 m_fetchDataLoader.clear();
584
585 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 437 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
586 return; 438 return;
587 439
588 switch (m_responseType) { 440 switch (m_responseType) {
589 case ResponseAsJSON: 441 case ResponseAsJSON:
590 resolveJSON(str); 442 resolveJSON(str);
591 break; 443 break;
592 case ResponseAsText: 444 case ResponseAsText:
593 m_resolver->resolve(str); 445 m_resolver->resolve(str);
594 break; 446 break;
595 default: 447 default:
596 ASSERT_NOT_REACHED(); 448 ASSERT_NOT_REACHED();
597 } 449 }
598 450
599 m_streamSource->close();
600 m_resolver.clear(); 451 m_resolver.clear();
601 } 452 }
602 453
603 void Body::didBlobHandleReceiveError(DOMException* exception)
604 {
605 if (!m_resolver)
606 return;
607 m_streamSource->error();
608 m_resolver->reject(exception);
609 m_resolver.clear();
610 }
611
612 } // namespace blink 454 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698