Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(67)

Side by Side Diff: Source/modules/fetch/Body.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Clean up done. ready. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/Body.h" 6 #include "modules/fetch/Body.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromiseResolver.h" 9 #include "bindings/core/v8/ScriptPromiseResolver.h"
10 #include "bindings/core/v8/ScriptState.h" 10 #include "bindings/core/v8/ScriptState.h"
11 #include "bindings/core/v8/V8ArrayBuffer.h" 11 #include "bindings/core/v8/V8ArrayBuffer.h"
12 #include "bindings/core/v8/V8ThrowException.h" 12 #include "bindings/core/v8/V8ThrowException.h"
13 #include "core/dom/DOMArrayBuffer.h" 13 #include "core/dom/DOMArrayBuffer.h"
14 #include "core/dom/DOMTypedArray.h" 14 #include "core/dom/DOMTypedArray.h"
15 #include "core/dom/ExceptionCode.h"
15 #include "core/fileapi/Blob.h" 16 #include "core/fileapi/Blob.h"
16 #include "core/fileapi/FileReaderLoader.h"
17 #include "core/fileapi/FileReaderLoaderClient.h"
18 #include "core/frame/UseCounter.h" 17 #include "core/frame/UseCounter.h"
19 #include "core/streams/ReadableByteStream.h" 18 #include "core/streams/ReadableByteStream.h"
20 #include "core/streams/ReadableByteStreamReader.h" 19 #include "core/streams/ReadableByteStreamReader.h"
21 #include "core/streams/UnderlyingSource.h" 20 #include "core/streams/UnderlyingSource.h"
22 #include "modules/fetch/BodyStreamBuffer.h" 21 #include "modules/fetch/BodyStreamBuffer.h"
22 #include "modules/fetch/DataConsumerHandleUtil.h"
23 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
23 24
24 namespace blink { 25 namespace blink {
25 26
26 class Body::BlobHandleReceiver final : public BodyStreamBuffer::BlobHandleCreato rClient {
27 public:
28 explicit BlobHandleReceiver(Body* body)
29 : m_body(body)
30 {
31 }
32 void didCreateBlobHandle(PassRefPtr<BlobDataHandle> handle) override
33 {
34 ASSERT(m_body);
35 m_body->readAsyncFromBlob(handle);
36 m_body = nullptr;
37 }
38 void didFail(DOMException* exception) override
39 {
40 ASSERT(m_body);
41 m_body->didBlobHandleReceiveError(exception);
42 m_body = nullptr;
43 }
44 DEFINE_INLINE_VIRTUAL_TRACE()
45 {
46 BodyStreamBuffer::BlobHandleCreatorClient::trace(visitor);
47 visitor->trace(m_body);
48 }
49 private:
50 Member<Body> m_body;
51 };
52
53 // This class is an ActiveDOMObject subclass only for holding the 27 // This class is an ActiveDOMObject subclass only for holding the
54 // ExecutionContext used in |pullSource|. 28 // ExecutionContext used in |pullSource|.
55 class Body::ReadableStreamSource : public BodyStreamBuffer::Observer, public Und erlyingSource, public FileReaderLoaderClient, public ActiveDOMObject { 29 class Body::ReadableStreamSource : public GarbageCollectedFinalized<Body::Readab leStreamSource>, public UnderlyingSource, public WebDataConsumerHandle::Client, public BodyStreamBuffer::DrainingStreamNotificationClient, public ActiveDOMObjec t {
56 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); 30 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
57 public: 31 public:
58 enum State { 32 enum State {
59 Initial,
60 Streaming, 33 Streaming,
61 Closed, 34 Closed,
62 Errored, 35 Errored,
63 }; 36 };
64 ReadableStreamSource(ExecutionContext* executionContext, PassRefPtr<BlobData Handle> handle)
65 : ActiveDOMObject(executionContext)
66 , m_blobDataHandle(handle ? handle : BlobDataHandle::create(BlobData::cr eate(), 0))
67 , m_state(Initial)
68 {
69 suspendIfNeeded();
70 }
71 37
72 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer) 38 ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* b uffer)
73 : ActiveDOMObject(executionContext) 39 : ActiveDOMObject(executionContext)
74 , m_bodyStreamBuffer(buffer) 40 , m_bodyStreamBuffer(buffer)
75 , m_state(Initial) 41 , m_state(Streaming)
42 , m_streamNeedsMore(false)
43 #if ENABLE(ASSERT)
44 , m_drained(false)
45 #endif
76 { 46 {
77 suspendIfNeeded(); 47 suspendIfNeeded();
78 } 48 if (m_bodyStreamBuffer)
79 49 obtainReader();
80 explicit ReadableStreamSource(ExecutionContext* executionContext)
81 : ActiveDOMObject(executionContext)
82 , m_blobDataHandle(BlobDataHandle::create(BlobData::create(), 0))
83 , m_state(Initial)
84 {
85 suspendIfNeeded();
86 } 50 }
87 51
88 ~ReadableStreamSource() override { } 52 ~ReadableStreamSource() override { }
89 53
90 State state() const { return m_state; } 54 State state() const { return m_state; }
91 55
92 void startStream(ReadableByteStream* stream) 56 void startStream(ReadableByteStream* stream)
93 { 57 {
94 m_stream = stream; 58 m_stream = stream;
95 stream->didSourceStart(); 59 stream->didSourceStart();
96 } 60 }
97 // Creates a new BodyStreamBuffer to drain the data. 61 // Creates a new BodyStreamBuffer to drain the data.
98 BodyStreamBuffer* createDrainingStream() 62 PassOwnPtr<DrainingBodyStreamBuffer> createDrainingStream()
99 { 63 {
100 ASSERT(m_state != Initial); 64 if (!m_bodyStreamBuffer)
65 return nullptr;
101 66
102 auto drainingStreamBuffer = new BodyStreamBuffer(new Canceller(this));
103 if (m_stream->stateInternal() == ReadableByteStream::Closed) { 67 if (m_stream->stateInternal() == ReadableByteStream::Closed) {
104 drainingStreamBuffer->close(); 68 return DrainingBodyStreamBuffer::create(BodyStreamBuffer::createEmpt y(), nullptr);
105 return drainingStreamBuffer;
106 } 69 }
107 if (m_stream->stateInternal() == ReadableByteStream::Errored) { 70 if (m_stream->stateInternal() == ReadableByteStream::Errored) {
108 drainingStreamBuffer->error(exception()); 71 return DrainingBodyStreamBuffer::create(BodyStreamBuffer::create(cre ateFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle( ))), nullptr);
109 return drainingStreamBuffer;
110 } 72 }
111 73
112 ASSERT(!m_drainingStreamBuffer); 74 ASSERT(!m_drained);
113 // Take back the data in |m_stream|. 75 #if ENABLE(ASSERT)
114 Deque<std::pair<RefPtr<DOMArrayBufferView>, size_t>> tmp_queue; 76 m_drained = true;
115 ASSERT(m_stream->stateInternal() == ReadableStream::Readable); 77 #endif
116 m_stream->readInternal(tmp_queue); 78 m_reader.clear();
117 while (!tmp_queue.isEmpty()) { 79 return DrainingBodyStreamBuffer::create(m_bodyStreamBuffer, this);
118 std::pair<RefPtr<DOMArrayBufferView>, size_t> data = tmp_queue.takeF irst(); 80 }
119 drainingStreamBuffer->write(data.first->buffer());
120 }
121 if (m_state == Closed)
122 drainingStreamBuffer->close();
123 81
124 m_drainingStreamBuffer = drainingStreamBuffer;
125 return m_drainingStreamBuffer;
126 }
127 DEFINE_INLINE_VIRTUAL_TRACE() 82 DEFINE_INLINE_VIRTUAL_TRACE()
128 { 83 {
129 visitor->trace(m_bodyStreamBuffer); 84 visitor->trace(m_bodyStreamBuffer);
130 visitor->trace(m_drainingStreamBuffer);
131 visitor->trace(m_stream); 85 visitor->trace(m_stream);
132 BodyStreamBuffer::Observer::trace(visitor);
133 UnderlyingSource::trace(visitor); 86 UnderlyingSource::trace(visitor);
87 DrainingStreamNotificationClient::trace(visitor);
134 ActiveDOMObject::trace(visitor); 88 ActiveDOMObject::trace(visitor);
135 } 89 }
136 90
137 void close() 91 void close()
138 { 92 {
139 if (m_state == Closed) { 93 if (m_state == Closed) {
140 // It is possible to call |close| from the source side (such 94 // It is possible to call |close| from the source side (such
141 // as blob loading finish) and from the consumer side (such as 95 // as blob loading finish) and from the consumer side (such as
142 // calling |cancel|). Thus we should ignore it here. 96 // calling |cancel|). Thus we should ignore it here.
143 return; 97 return;
144 } 98 }
145 m_state = Closed; 99 m_state = Closed;
146 if (m_drainingStreamBuffer) 100 m_reader.clear();
147 m_drainingStreamBuffer->close();
148 m_stream->close(); 101 m_stream->close();
149 } 102 }
150 void error() 103 void error()
151 { 104 {
152 m_state = Errored; 105 m_state = Errored;
153 if (m_drainingStreamBuffer) 106 m_reader.clear();
154 m_drainingStreamBuffer->error(exception()); 107 m_stream->error(DOMException::create(NetworkError, "network error"));
155 m_stream->error(exception()); 108 }
109
110 void obtainReader()
111 {
112 m_reader = m_bodyStreamBuffer->handle()->obtainReader(this);
156 } 113 }
157 114
158 private: 115 private:
159 class Canceller : public BodyStreamBuffer::Canceller { 116 void didFetchDataLoadFinishedFromDrainingStream()
160 public: 117 {
161 Canceller(ReadableStreamSource* source) : m_source(source) { } 118 ASSERT(m_bodyStreamBuffer);
162 void cancel() override 119
163 { 120 #if ENABLE(ASSERT)
164 m_source->cancel(); 121 m_drained = false;
122 #endif
123 obtainReader();
124 // We have to call didGetReadable() now to call close()/error() if
125 // necessary.
126 // didGetReadable() would be called asynchronously, but it is too late.
127 didGetReadable();
128 }
129
130 void didGetReadable() override
131 {
132 if (!m_streamNeedsMore) {
133 // Perform zero-length read to call close()/error() early.
134 size_t readSize;
135 WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, We bDataConsumerHandle::FlagNone, &readSize);
136 switch (result) {
137 case WebDataConsumerHandle::Ok:
138 case WebDataConsumerHandle::ShouldWait:
139 return;
140 case WebDataConsumerHandle::Done:
141 close();
142 return;
143 case WebDataConsumerHandle::Busy:
144 case WebDataConsumerHandle::ResourceExhausted:
145 case WebDataConsumerHandle::UnexpectedError:
146 error();
147 return;
148 }
165 } 149 }
166 150
167 DEFINE_INLINE_VIRTUAL_TRACE() 151 processData();
168 { 152 }
169 visitor->trace(m_source);
170 BodyStreamBuffer::Canceller::trace(visitor);
171 }
172
173 private:
174 Member<ReadableStreamSource> m_source;
175 };
176 153
177 // UnderlyingSource functions. 154 // UnderlyingSource functions.
178 void pullSource() override 155 void pullSource() override
179 { 156 {
180 // Note that one |pull| is called only when |read| is called on the 157 ASSERT(!m_streamNeedsMore);
181 // associated ReadableByteStreamReader because we create a stream with 158 m_streamNeedsMore = true;
182 // StrictStrategy. 159
183 if (m_state == Initial) { 160 ASSERT(!m_drained);
184 m_state = Streaming; 161
185 if (m_bodyStreamBuffer) { 162 processData();
186 m_bodyStreamBuffer->registerObserver(this); 163 }
187 onWrite(); 164
188 if (m_bodyStreamBuffer->hasError()) 165 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride
189 return onError(); 166 {
190 if (m_bodyStreamBuffer->isClosed()) 167 close();
191 return onClose(); 168 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
192 } else { 169 }
193 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsAr rayBuffer; 170
194 m_loader = adoptPtr(new FileReaderLoader(readType, this)); 171 // Reads data and writes the data to |m_stream|, as long as data are
195 m_loader->start(executionContext(), m_blobDataHandle); 172 // available and the stream has pending reads.
173 void processData()
174 {
175 ASSERT(m_reader);
176 while (m_streamNeedsMore) {
177 const void* buffer;
178 size_t available;
179 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
180 switch (result) {
181 case WebDataConsumerHandle::Ok:
182 m_streamNeedsMore = m_stream->enqueue(DOMUint8Array::create(stat ic_cast<const unsigned char*>(buffer), available));
183 m_reader->endRead(available);
184 break;
185
186 case WebDataConsumerHandle::Done:
187 close();
188 return;
189
190 case WebDataConsumerHandle::ShouldWait:
191 return;
192
193 case WebDataConsumerHandle::Busy:
194 case WebDataConsumerHandle::ResourceExhausted:
195 case WebDataConsumerHandle::UnexpectedError:
196 error();
197 return;
196 } 198 }
197 } 199 }
198 } 200 }
199 201
200 ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) ove rride 202 // Source of data.
201 { 203 Member<BodyStreamBuffer> m_bodyStreamBuffer;
202 cancel(); 204 OwnPtr<FetchDataConsumerHandle::Reader> m_reader;
203 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
204 }
205 205
206 // BodyStreamBuffer::Observer functions.
207 void onWrite() override
208 {
209 ASSERT(m_state == Streaming);
210 while (RefPtr<DOMArrayBuffer> buf = m_bodyStreamBuffer->read()) {
211 write(buf);
212 }
213 }
214 void onClose() override
215 {
216 ASSERT(m_state == Streaming);
217 close();
218 m_bodyStreamBuffer->unregisterObserver();
219 }
220 void onError() override
221 {
222 ASSERT(m_state == Streaming);
223 error();
224 m_bodyStreamBuffer->unregisterObserver();
225 }
226
227 // FileReaderLoaderClient functions.
228 void didStartLoading() override { }
229 void didReceiveData() override { }
230 void didFinishLoading() override
231 {
232 ASSERT(m_state == Streaming);
233 write(m_loader->arrayBufferResult());
234 close();
235 }
236 void didFail(FileError::ErrorCode) override
237 {
238 ASSERT(m_state == Streaming);
239 error();
240 }
241
242 void write(PassRefPtr<DOMArrayBuffer> buf)
243 {
244 if (m_drainingStreamBuffer) {
245 m_drainingStreamBuffer->write(buf);
246 } else {
247 auto size = buf->byteLength();
248 m_stream->enqueue(DOMUint8Array::create(buf, 0, size));
249 }
250 }
251 void cancel()
252 {
253 if (m_bodyStreamBuffer) {
254 m_bodyStreamBuffer->cancel();
255 // We should not close the stream here, because it is canceller's
256 // responsibility.
257 } else {
258 if (m_loader)
259 m_loader->cancel();
260 close();
261 }
262 }
263
264 DOMException* exception()
265 {
266 if (m_state != Errored)
267 return nullptr;
268 if (m_bodyStreamBuffer) {
269 ASSERT(m_bodyStreamBuffer->exception());
270 return m_bodyStreamBuffer->exception();
271 }
272 return DOMException::create(NetworkError, "network error");
273 }
274
275 // Set when the data container of the Body is a BodyStreamBuffer.
276 Member<BodyStreamBuffer> m_bodyStreamBuffer;
277 // Set when the data container of the Body is a BlobDataHandle.
278 RefPtr<BlobDataHandle> m_blobDataHandle;
279 // Used to read the data from BlobDataHandle.
280 OwnPtr<FileReaderLoader> m_loader;
281 // Created when createDrainingStream is called to drain the data.
282 Member<BodyStreamBuffer> m_drainingStreamBuffer;
283 Member<ReadableByteStream> m_stream; 206 Member<ReadableByteStream> m_stream;
284 State m_state; 207 State m_state;
208 bool m_streamNeedsMore;
209 #if ENABLE(ASSERT)
210 bool m_drained;
211 #endif
285 }; 212 };
286 213
287 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) 214 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type)
288 { 215 {
289 if (bodyUsed()) 216 if (bodyUsed())
290 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read")); 217 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read"));
291 218
292 // When the main thread sends a V8::TerminateExecution() signal to a worker 219 // When the main thread sends a V8::TerminateExecution() signal to a worker
293 // thread, any V8 API on the worker thread starts returning an empty 220 // thread, any V8 API on the worker thread starts returning an empty
294 // handle. This can happen in Body::readAsync. To avoid the situation, we 221 // handle. This can happen in Body::readAsync. To avoid the situation, we
295 // first check the ExecutionContext and return immediately if it's already 222 // first check the ExecutionContext and return immediately if it's already
296 // gone (which means that the V8::TerminateExecution() signal has been sent 223 // gone (which means that the V8::TerminateExecution() signal has been sent
297 // to this worker thread). 224 // to this worker thread).
298 ExecutionContext* executionContext = scriptState->executionContext(); 225 ExecutionContext* executionContext = scriptState->executionContext();
299 if (!executionContext) 226 if (!executionContext)
300 return ScriptPromise(); 227 return ScriptPromise();
301 228
302 lockBody(); 229 lockBody();
303 m_responseType = type; 230 m_responseType = type;
304 231
305 ASSERT(!m_resolver); 232 ASSERT(!m_resolver);
306 m_resolver = ScriptPromiseResolver::create(scriptState); 233 m_resolver = ScriptPromiseResolver::create(scriptState);
307 ScriptPromise promise = m_resolver->promise(); 234 ScriptPromise promise = m_resolver->promise();
308 235
309 if (m_stream->stateInternal() == ReadableStream::Closed) { 236 if (m_stream->stateInternal() == ReadableStream::Closed) {
310 // We resolve the resolver manually in order not to use member 237 resolveWithEmptyDataSynchronously();
311 // variables.
312 switch (m_responseType) {
313 case ResponseAsArrayBuffer:
314 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
315 break;
316 case ResponseAsBlob: {
317 OwnPtr<BlobData> blobData = BlobData::create();
318 blobData->setContentType(mimeType());
319 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.rel ease(), 0)));
320 break;
321 }
322 case ResponseAsText:
323 m_resolver->resolve(String());
324 break;
325 case ResponseAsFormData:
326 // TODO(yhirano): Implement this.
327 ASSERT_NOT_REACHED();
328 break;
329 case ResponseAsJSON: {
330 ScriptState::Scope scope(m_resolver->scriptState());
331 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->s criptState()->isolate(), "Unexpected end of input"));
332 break;
333 }
334 case ResponseUnknown:
335 ASSERT_NOT_REACHED();
336 break;
337 }
338 m_resolver.clear();
339 } else if (m_stream->stateInternal() == ReadableStream::Errored) { 238 } else if (m_stream->stateInternal() == ReadableStream::Errored) {
340 m_resolver->reject(m_stream->storedException()); 239 m_resolver->reject(m_stream->storedException());
341 m_resolver.clear(); 240 m_resolver.clear();
342 } else if (isBodyConsumed()) {
343 m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(mimeT ype(), new BlobHandleReceiver(this));
344 } else if (buffer()) {
345 buffer()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver( this));
346 } else { 241 } else {
347 readAsyncFromBlob(blobDataHandle()); 242 readAsyncFromDrainingBodyStreamBuffer(createDrainingStream(), mimeType() );
348 } 243 }
349 return promise; 244 return promise;
350 } 245 }
351 246
352 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle) 247 void Body::resolveWithEmptyDataSynchronously()
353 { 248 {
354 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsText; 249 // We resolve the resolver manually in order not to use member
355 RefPtr<BlobDataHandle> blobHandle = handle; 250 // variables.
356 if (!blobHandle)
357 blobHandle = BlobDataHandle::create(BlobData::create(), 0);
358 switch (m_responseType) { 251 switch (m_responseType) {
359 case ResponseAsArrayBuffer: 252 case ResponseAsArrayBuffer:
360 readType = FileReaderLoader::ReadAsArrayBuffer; 253 m_resolver->resolve(DOMArrayBuffer::create(nullptr, 0));
361 break; 254 break;
255 case ResponseAsBlob: {
256 OwnPtr<BlobData> blobData = BlobData::create();
257 blobData->setContentType(mimeType());
258 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release (), 0)));
259 break;
260 }
261 case ResponseAsText:
262 m_resolver->resolve(String());
263 break;
264 case ResponseAsFormData:
265 // TODO(yhirano): Implement this.
266 ASSERT_NOT_REACHED();
267 break;
268 case ResponseAsJSON: {
269 ScriptState::Scope scope(m_resolver->scriptState());
270 m_resolver->reject(V8ThrowException::createSyntaxError(m_resolver->scrip tState()->isolate(), "Unexpected end of input"));
271 break;
272 }
273 case ResponseUnknown:
274 ASSERT_NOT_REACHED();
275 break;
276 }
277 m_resolver.clear();
278 }
279
280 void Body::readAsyncFromDrainingBodyStreamBuffer(PassOwnPtr<DrainingBodyStreamBu ffer> buffer, const String& mimeType)
281 {
282 if (!buffer) {
283 resolveWithEmptyDataSynchronously();
284 m_streamSource->close();
285 return;
286 }
287
288 FetchDataLoader* fetchDataLoader = nullptr;
289
290 switch (m_responseType) {
291 case ResponseAsArrayBuffer:
292 fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer();
293 break;
294
295 case ResponseAsJSON:
296 case ResponseAsText:
297 fetchDataLoader = FetchDataLoader::createLoaderAsString();
298 break;
299
362 case ResponseAsBlob: 300 case ResponseAsBlob:
363 if (blobHandle->size() != kuint64max) { 301 fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType);
364 // If the size of |blobHandle| is set correctly, creates Blob from
365 // it.
366 if (blobHandle->type() != mimeType()) {
367 // A new BlobDataHandle is created to override the Blob's type.
368 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobHand le->uuid(), mimeType(), blobHandle->size())));
369 } else {
370 m_resolver->resolve(Blob::create(blobHandle));
371 }
372 m_stream->close();
373 m_resolver.clear();
374 return;
375 }
376 // If the size is not set, read as ArrayBuffer and create a new blob to
377 // get the size.
378 // FIXME: This workaround is not good for performance.
379 // When we will stop using Blob as a base system of Body to support
380 // stream, this problem should be solved.
381 readType = FileReaderLoader::ReadAsArrayBuffer;
382 break; 302 break;
303
383 case ResponseAsFormData: 304 case ResponseAsFormData:
384 // FIXME: Implement this. 305 // FIXME: Implement this.
385 ASSERT_NOT_REACHED(); 306 ASSERT_NOT_REACHED();
386 break; 307 return;
387 case ResponseAsJSON: 308
388 case ResponseAsText:
389 break;
390 default: 309 default:
391 ASSERT_NOT_REACHED(); 310 ASSERT_NOT_REACHED();
311 return;
392 } 312 }
393 313
394 m_loader = adoptPtr(new FileReaderLoader(readType, this)); 314 buffer->startLoading(fetchDataLoader, this);
395 m_loader->start(m_resolver->scriptState()->executionContext(), blobHandle);
396
397 return;
398 } 315 }
399 316
400 ScriptPromise Body::arrayBuffer(ScriptState* scriptState) 317 ScriptPromise Body::arrayBuffer(ScriptState* scriptState)
401 { 318 {
402 return readAsync(scriptState, ResponseAsArrayBuffer); 319 return readAsync(scriptState, ResponseAsArrayBuffer);
403 } 320 }
404 321
405 ScriptPromise Body::blob(ScriptState* scriptState) 322 ScriptPromise Body::blob(ScriptState* scriptState)
406 { 323 {
407 return readAsync(scriptState, ResponseAsBlob); 324 return readAsync(scriptState, ResponseAsBlob);
(...skipping 29 matching lines...) Expand all
437 { 354 {
438 ASSERT(!bodyUsed()); 355 ASSERT(!bodyUsed());
439 if (option == PassBody) 356 if (option == PassBody)
440 m_bodyUsed = true; 357 m_bodyUsed = true;
441 ASSERT(!m_stream->isLocked()); 358 ASSERT(!m_stream->isLocked());
442 TrackExceptionState exceptionState; 359 TrackExceptionState exceptionState;
443 m_stream->getBytesReader(executionContext(), exceptionState); 360 m_stream->getBytesReader(executionContext(), exceptionState);
444 ASSERT(!exceptionState.hadException()); 361 ASSERT(!exceptionState.hadException());
445 } 362 }
446 363
447 bool Body::isBodyConsumed() const 364 void Body::setBody(BodyStreamBuffer* buffer)
448 { 365 {
449 if (m_streamSource->state() != m_streamSource->Initial) { 366 m_streamSource = new ReadableStreamSource(executionContext(), buffer);
450 // Some data is pulled from the source.
451 return true;
452 }
453 if (m_stream->stateInternal() == ReadableStream::Closed) {
454 // Return true if the blob handle is originally not empty.
455 RefPtr<BlobDataHandle> handle = blobDataHandle();
456 return handle && handle->size();
457 }
458 if (m_stream->stateInternal() == ReadableStream::Errored) {
459 // The stream is errored. That means an effort to read data was made.
460 return true;
461 }
462 return false;
463 }
464
465 void Body::setBody(ReadableStreamSource* source)
466 {
467 m_streamSource = source;
468 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy); 367 m_stream = new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy);
469 m_streamSource->startStream(m_stream); 368 m_streamSource->startStream(m_stream);
470 } 369 }
471 370
472 BodyStreamBuffer* Body::createDrainingStream() 371 PassOwnPtr<DrainingBodyStreamBuffer> Body::createDrainingStream()
473 { 372 {
474 return m_streamSource->createDrainingStream(); 373 return m_streamSource->createDrainingStream();
475 } 374 }
476 375
477 void Body::stop() 376 void Body::stop()
478 { 377 {
479 // Canceling the load will call didFail which will remove the resolver.
480 if (m_loader)
481 m_loader->cancel();
482 } 378 }
483 379
484 bool Body::hasPendingActivity() const 380 bool Body::hasPendingActivity() const
485 { 381 {
486 if (executionContext()->activeDOMObjectsAreStopped()) 382 if (executionContext()->activeDOMObjectsAreStopped())
487 return false; 383 return false;
488 if (m_resolver) 384 if (m_resolver)
489 return true; 385 return true;
490 if (m_stream->isLocked()) 386 if (m_stream->isLocked())
491 return true; 387 return true;
492 return false; 388 return false;
493 } 389 }
494 390
495 Body::ReadableStreamSource* Body::createBodySource(PassRefPtr<BlobDataHandle> ha ndle)
496 {
497 return new ReadableStreamSource(executionContext(), handle);
498 }
499
500 Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer* buffer)
501 {
502 return new ReadableStreamSource(executionContext(), buffer);
503 }
504
505 DEFINE_TRACE(Body) 391 DEFINE_TRACE(Body)
506 { 392 {
507 visitor->trace(m_resolver); 393 visitor->trace(m_resolver);
508 visitor->trace(m_stream); 394 visitor->trace(m_stream);
509 visitor->trace(m_streamSource); 395 visitor->trace(m_streamSource);
510 ActiveDOMObject::trace(visitor); 396 ActiveDOMObject::trace(visitor);
397 FetchDataLoader::Client::trace(visitor);
511 } 398 }
512 399
513 Body::Body(ExecutionContext* context) 400 Body::Body(ExecutionContext* context)
514 : ActiveDOMObject(context) 401 : ActiveDOMObject(context)
515 , m_bodyUsed(false) 402 , m_bodyUsed(false)
516 , m_responseType(ResponseType::ResponseUnknown) 403 , m_responseType(ResponseType::ResponseUnknown)
517 , m_streamSource(new ReadableStreamSource(context)) 404 , m_streamSource(new ReadableStreamSource(context, nullptr))
518 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy)) 405 , m_stream(new ReadableByteStream(m_streamSource, new ReadableByteStream::St rictStrategy))
519 { 406 {
520 m_streamSource->startStream(m_stream); 407 m_streamSource->startStream(m_stream);
521 } 408 }
522 409
523 void Body::resolveJSON(const String& string) 410 void Body::resolveJSON(const String& string)
524 { 411 {
525 ASSERT(m_responseType == ResponseAsJSON); 412 ASSERT(m_responseType == ResponseAsJSON);
526 ScriptState::Scope scope(m_resolver->scriptState()); 413 ScriptState::Scope scope(m_resolver->scriptState());
527 v8::Isolate* isolate = m_resolver->scriptState()->isolate(); 414 v8::Isolate* isolate = m_resolver->scriptState()->isolate();
528 v8::Local<v8::String> inputString = v8String(isolate, string); 415 v8::Local<v8::String> inputString = v8String(isolate, string);
529 v8::TryCatch trycatch; 416 v8::TryCatch trycatch;
530 v8::Local<v8::Value> parsed; 417 v8::Local<v8::Value> parsed;
531 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch)) 418 if (v8Call(v8::JSON::Parse(isolate, inputString), parsed, trycatch))
532 m_resolver->resolve(parsed); 419 m_resolver->resolve(parsed);
533 else 420 else
534 m_resolver->reject(trycatch.Exception()); 421 m_resolver->reject(trycatch.Exception());
535 } 422 }
536 423
537 // FileReaderLoaderClient functions. 424 // FetchDataLoader::Client functions.
538 void Body::didStartLoading() { } 425 void Body::didFetchDataLoadFailed()
539 void Body::didReceiveData() { } 426 {
540 void Body::didFinishLoading() 427 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
428 return;
429
430 if (m_resolver) {
431 if (!m_resolver->executionContext() || m_resolver->executionContext()->a ctiveDOMObjectsAreStopped()) {
432 m_resolver.clear();
433 return;
434 }
435 ScriptState* state = m_resolver->scriptState();
436 ScriptState::Scope scope(state);
437 m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), " Failed to fetch"));
438 m_resolver.clear();
439 }
440 }
441
442 void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandl e)
443 {
444 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
445 return;
446
447 ASSERT(m_responseType == ResponseAsBlob);
448 m_resolver->resolve(Blob::create(blobDataHandle));
449 m_resolver.clear();
450 }
451
452 void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer)
453 {
454 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
455 return;
456
457 ASSERT(m_responseType == ResponseAsArrayBuffer);
458 m_resolver->resolve(arrayBuffer);
459 m_resolver.clear();
460 }
461
462 void Body::didFetchDataLoadedString(const String& str)
541 { 463 {
542 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) 464 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
543 return; 465 return;
544 466
545 switch (m_responseType) { 467 switch (m_responseType) {
546 case ResponseAsArrayBuffer:
547 m_resolver->resolve(m_loader->arrayBufferResult());
548 break;
549 case ResponseAsBlob: {
550 ASSERT(blobDataHandle()->size() == kuint64max);
551 OwnPtr<BlobData> blobData = BlobData::create();
552 RefPtr<DOMArrayBuffer> buffer = m_loader->arrayBufferResult();
553 blobData->appendBytes(buffer->data(), buffer->byteLength());
554 blobData->setContentType(mimeType());
555 const size_t length = blobData->length();
556 m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release (), length)));
557 break;
558 }
559 case ResponseAsFormData:
560 ASSERT_NOT_REACHED();
561 break;
562 case ResponseAsJSON: 468 case ResponseAsJSON:
563 resolveJSON(m_loader->stringResult()); 469 resolveJSON(str);
564 break; 470 break;
565 case ResponseAsText: 471 case ResponseAsText:
566 m_resolver->resolve(m_loader->stringResult()); 472 m_resolver->resolve(str);
567 break; 473 break;
568 default: 474 default:
569 ASSERT_NOT_REACHED(); 475 ASSERT_NOT_REACHED();
570 } 476 }
571 m_streamSource->close(); 477
572 m_resolver.clear(); 478 m_resolver.clear();
573 } 479 }
574 480
575 void Body::didFail(FileError::ErrorCode code)
576 {
577 if (!executionContext() || executionContext()->activeDOMObjectsAreStopped())
578 return;
579
580 m_streamSource->error();
581 if (m_resolver) {
582 // FIXME: We should reject the promise.
583 m_resolver->resolve("");
584 m_resolver.clear();
585 }
586 }
587
588 void Body::didBlobHandleReceiveError(DOMException* exception)
589 {
590 if (!m_resolver)
591 return;
592 m_streamSource->error();
593 m_resolver->reject(exception);
594 m_resolver.clear();
595 }
596
597 } // namespace blink 481 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698