Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(129)

Side by Side Diff: Source/modules/fetch/Body.cpp

Issue 1018243002: [Fetch] Support various operations after reading data partially. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@async-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/Body.h" 6 #include "modules/fetch/Body.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromiseResolver.h" 9 #include "bindings/core/v8/ScriptPromiseResolver.h"
10 #include "bindings/core/v8/ScriptState.h" 10 #include "bindings/core/v8/ScriptState.h"
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); 52 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
53 public: 53 public:
54 enum State { 54 enum State {
55 Initial, 55 Initial,
56 Streaming, 56 Streaming,
57 ReadingBlob, 57 ReadingBlob,
58 Closed, 58 Closed,
59 Errored, 59 Errored,
60 BodyUsed, 60 BodyUsed,
61 }; 61 };
62 ReadableStreamSource(Body* body) : m_body(body), m_state(Initial), m_queueCo unt(0) 62 ReadableStreamSource(Body* body) : m_body(body), m_state(Initial)
63 { 63 {
64 if (m_body->buffer()) { 64 if (m_body->buffer()) {
65 m_bodyStreamBuffer = m_body->buffer(); 65 m_bodyStreamBuffer = m_body->buffer();
66 } else { 66 } else {
67 m_blobDataHandle = m_body->blobDataHandle(); 67 m_blobDataHandle = m_body->blobDataHandle();
68 if (!m_blobDataHandle) 68 if (!m_blobDataHandle)
69 m_blobDataHandle = BlobDataHandle::create(BlobData::create(), 0) ; 69 m_blobDataHandle = BlobDataHandle::create(BlobData::create(), 0) ;
70 } 70 }
71 } 71 }
72 ~ReadableStreamSource() override { } 72 ~ReadableStreamSource() override { }
(...skipping 20 matching lines...) Expand all
93 return onClose(); 93 return onClose();
94 } else { 94 } else {
95 ASSERT(m_blobDataHandle); 95 ASSERT(m_blobDataHandle);
96 m_state = ReadingBlob; 96 m_state = ReadingBlob;
97 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayB uffer; 97 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayB uffer;
98 m_loader = adoptPtr(new FileReaderLoader(readType, this)); 98 m_loader = adoptPtr(new FileReaderLoader(readType, this));
99 m_loader->start(m_body->executionContext(), m_blobDataHandle); 99 m_loader->start(m_body->executionContext(), m_blobDataHandle);
100 } 100 }
101 } 101 }
102 // Creates a new BodyStreamBuffer to drain the data. 102 // Creates a new BodyStreamBuffer to drain the data.
103 BodyStreamBuffer* createDrainingStream(bool* dataLost) 103 BodyStreamBuffer* createDrainingStream()
104 { 104 {
105 ASSERT(!m_drainingStreamBuffer); 105 ASSERT(!m_drainingStreamBuffer);
106 ASSERT(m_state != Initial); 106 ASSERT(m_state != Initial);
107 ASSERT(m_state != BodyUsed); 107 ASSERT(m_state != BodyUsed);
108 ASSERT(m_stream); 108 ASSERT(m_stream);
109 ASSERT(dataLost);
110 m_drainingStreamBuffer = new BodyStreamBuffer(); 109 m_drainingStreamBuffer = new BodyStreamBuffer();
111 if (m_state == Errored) { 110 if (m_state == Errored) {
112 m_drainingStreamBuffer->error(exception()); 111 m_drainingStreamBuffer->error(exception());
113 return m_drainingStreamBuffer; 112 return m_drainingStreamBuffer;
114 } 113 }
115 // Take back the data in |m_stream|. 114 // Take back the data in |m_stream|.
116 Deque<std::pair<RefPtr<DOMArrayBuffer>, size_t>> tmp_queue; 115 Deque<std::pair<RefPtr<DOMArrayBuffer>, size_t>> tmp_queue;
117 if (m_stream->stateInternal() == ReadableStream::Readable) 116 if (m_stream->stateInternal() == ReadableStream::Readable)
118 m_stream->readInternal(tmp_queue); 117 m_stream->readInternal(tmp_queue);
119 *dataLost = m_queueCount != tmp_queue.size();
120 while (!tmp_queue.isEmpty()) { 118 while (!tmp_queue.isEmpty()) {
121 std::pair<RefPtr<DOMArrayBuffer>, size_t> data = tmp_queue.takeFirst (); 119 std::pair<RefPtr<DOMArrayBuffer>, size_t> data = tmp_queue.takeFirst ();
122 m_drainingStreamBuffer->write(data.first); 120 m_drainingStreamBuffer->write(data.first);
123 } 121 }
124 if (m_state == Closed) 122 if (m_state == Closed)
125 m_drainingStreamBuffer->close(); 123 m_drainingStreamBuffer->close();
126 else 124 else
127 m_stream->close(); 125 m_stream->close();
128 return m_drainingStreamBuffer; 126 return m_drainingStreamBuffer;
129 } 127 }
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 close(); 174 close();
177 } 175 }
178 void didFail(FileError::ErrorCode) override 176 void didFail(FileError::ErrorCode) override
179 { 177 {
180 ASSERT(m_state == ReadingBlob); 178 ASSERT(m_state == ReadingBlob);
181 error(); 179 error();
182 } 180 }
183 181
184 void write(PassRefPtr<DOMArrayBuffer> buf) 182 void write(PassRefPtr<DOMArrayBuffer> buf)
185 { 183 {
186 if (m_drainingStreamBuffer) { 184 if (m_drainingStreamBuffer) {
horo 2015/03/20 02:28:54 nit: we can remove brackets.
yhirano 2015/03/20 03:24:49 Done.
187 m_drainingStreamBuffer->write(buf); 185 m_drainingStreamBuffer->write(buf);
188 } else { 186 } else {
189 ++m_queueCount;
190 m_stream->enqueue(buf); 187 m_stream->enqueue(buf);
191 } 188 }
192 } 189 }
193 void close() 190 void close()
194 { 191 {
195 m_state = Closed; 192 m_state = Closed;
196 if (m_drainingStreamBuffer) 193 if (m_drainingStreamBuffer)
197 m_drainingStreamBuffer->close(); 194 m_drainingStreamBuffer->close();
198 else 195 else
199 m_stream->close(); 196 m_stream->close();
(...skipping 21 matching lines...) Expand all
221 // Set when the data container of the Body is a BodyStreamBuffer. 218 // Set when the data container of the Body is a BodyStreamBuffer.
222 Member<BodyStreamBuffer> m_bodyStreamBuffer; 219 Member<BodyStreamBuffer> m_bodyStreamBuffer;
223 // Set when the data container of the Body is a BlobDataHandle. 220 // Set when the data container of the Body is a BlobDataHandle.
224 RefPtr<BlobDataHandle> m_blobDataHandle; 221 RefPtr<BlobDataHandle> m_blobDataHandle;
225 // Used to read the data from BlobDataHandle. 222 // Used to read the data from BlobDataHandle.
226 OwnPtr<FileReaderLoader> m_loader; 223 OwnPtr<FileReaderLoader> m_loader;
227 // Created when createDrainingStream is called to drain the data. 224 // Created when createDrainingStream is called to drain the data.
228 Member<BodyStreamBuffer> m_drainingStreamBuffer; 225 Member<BodyStreamBuffer> m_drainingStreamBuffer;
229 Member<ReadableStreamImpl<ReadableStreamChunkTypeTraits<DOMArrayBuffer>>> m_ stream; 226 Member<ReadableStreamImpl<ReadableStreamChunkTypeTraits<DOMArrayBuffer>>> m_ stream;
230 State m_state; 227 State m_state;
231 // The count of the chunks which were enqueued to the ReadableStream.
232 size_t m_queueCount;
233 }; 228 };
234 229
235 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) 230 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type)
236 { 231 {
237 if (bodyUsed()) 232 if (bodyUsed())
238 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read")); 233 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read"));
239 234
240 // When the main thread sends a V8::TerminateExecution() signal to a worker 235 // When the main thread sends a V8::TerminateExecution() signal to a worker
241 // thread, any V8 API on the worker thread starts returning an empty 236 // thread, any V8 API on the worker thread starts returning an empty
242 // handle. This can happen in Body::readAsync. To avoid the situation, we 237 // handle. This can happen in Body::readAsync. To avoid the situation, we
243 // first check the ExecutionContext and return immediately if it's already 238 // first check the ExecutionContext and return immediately if it's already
244 // gone (which means that the V8::TerminateExecution() signal has been sent 239 // gone (which means that the V8::TerminateExecution() signal has been sent
245 // to this worker thread). 240 // to this worker thread).
246 ExecutionContext* executionContext = scriptState->executionContext(); 241 ExecutionContext* executionContext = scriptState->executionContext();
247 if (!executionContext) 242 if (!executionContext)
248 return ScriptPromise(); 243 return ScriptPromise();
249 244
250 setBodyUsed(); 245 setBodyUsed();
251 m_responseType = type; 246 m_responseType = type;
252 247
253 ASSERT(!m_resolver); 248 ASSERT(!m_resolver);
254 m_resolver = ScriptPromiseResolver::create(scriptState); 249 m_resolver = ScriptPromiseResolver::create(scriptState);
255 ScriptPromise promise = m_resolver->promise(); 250 ScriptPromise promise = m_resolver->promise();
256 251
257 if (m_stream) { 252 if (m_stream) {
258 ASSERT(m_streamSource); 253 ASSERT(m_streamSource);
259 bool dataLost; 254 m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(conte ntTypeForBuffer(), new BlobHandleReceiver(this));
260 m_streamSource->createDrainingStream(&dataLost)->readAllAndCreateBlobHan dle(contentTypeForBuffer(), new BlobHandleReceiver(this));
261 } else if (buffer()) { 255 } else if (buffer()) {
262 buffer()->readAllAndCreateBlobHandle(contentTypeForBuffer(), new BlobHan dleReceiver(this)); 256 buffer()->readAllAndCreateBlobHandle(contentTypeForBuffer(), new BlobHan dleReceiver(this));
263 } else { 257 } else {
264 readAsyncFromBlob(blobDataHandle()); 258 readAsyncFromBlob(blobDataHandle());
265 } 259 }
266 return promise; 260 return promise;
267 } 261 }
268 262
269 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle) 263 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle)
270 { 264 {
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
362 ASSERT(!exceptionState.hadException()); 356 ASSERT(!exceptionState.hadException());
363 } 357 }
364 m_bodyUsed = true; 358 m_bodyUsed = true;
365 } 359 }
366 360
367 bool Body::streamAccessed() const 361 bool Body::streamAccessed() const
368 { 362 {
369 return m_stream; 363 return m_stream;
370 } 364 }
371 365
372 BodyStreamBuffer* Body::createDrainingStream(bool* dataLost) 366 BodyStreamBuffer* Body::createDrainingStream()
373 { 367 {
374 ASSERT(m_stream); 368 ASSERT(m_stream);
375 BodyStreamBuffer* newBuffer = m_streamSource->createDrainingStream(dataLost) ; 369 BodyStreamBuffer* newBuffer = m_streamSource->createDrainingStream();
376 m_stream = nullptr; 370 m_stream = nullptr;
377 m_streamSource = nullptr; 371 m_streamSource = nullptr;
378 return newBuffer; 372 return newBuffer;
379 } 373 }
380 374
381 void Body::stop() 375 void Body::stop()
382 { 376 {
383 // Canceling the load will call didFail which will remove the resolver. 377 // Canceling the load will call didFail which will remove the resolver.
384 if (m_loader) 378 if (m_loader)
385 m_loader->cancel(); 379 m_loader->cancel();
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
487 481
488 void Body::didBlobHandleReceiveError(PassRefPtrWillBeRawPtr<DOMException> except ion) 482 void Body::didBlobHandleReceiveError(PassRefPtrWillBeRawPtr<DOMException> except ion)
489 { 483 {
490 if (!m_resolver) 484 if (!m_resolver)
491 return; 485 return;
492 m_resolver->reject(exception); 486 m_resolver->reject(exception);
493 m_resolver.clear(); 487 m_resolver.clear();
494 } 488 }
495 489
496 } // namespace blink 490 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698