Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(416)

Side by Side Diff: Source/modules/fetch/Body.cpp

Issue 1018243002: [Fetch] Support various operations after reading data partially. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@async-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/Body.h ('k') | Source/modules/fetch/Response.cpp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/Body.h" 6 #include "modules/fetch/Body.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromiseResolver.h" 9 #include "bindings/core/v8/ScriptPromiseResolver.h"
10 #include "bindings/core/v8/ScriptState.h" 10 #include "bindings/core/v8/ScriptState.h"
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); 52 USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
53 public: 53 public:
54 enum State { 54 enum State {
55 Initial, 55 Initial,
56 Streaming, 56 Streaming,
57 ReadingBlob, 57 ReadingBlob,
58 Closed, 58 Closed,
59 Errored, 59 Errored,
60 BodyUsed, 60 BodyUsed,
61 }; 61 };
62 ReadableStreamSource(Body* body) : m_body(body), m_state(Initial), m_queueCo unt(0) 62 ReadableStreamSource(Body* body) : m_body(body), m_state(Initial)
63 { 63 {
64 if (m_body->buffer()) { 64 if (m_body->buffer()) {
65 m_bodyStreamBuffer = m_body->buffer(); 65 m_bodyStreamBuffer = m_body->buffer();
66 } else { 66 } else {
67 m_blobDataHandle = m_body->blobDataHandle(); 67 m_blobDataHandle = m_body->blobDataHandle();
68 if (!m_blobDataHandle) 68 if (!m_blobDataHandle)
69 m_blobDataHandle = BlobDataHandle::create(BlobData::create(), 0) ; 69 m_blobDataHandle = BlobDataHandle::create(BlobData::create(), 0) ;
70 } 70 }
71 } 71 }
72 ~ReadableStreamSource() override { } 72 ~ReadableStreamSource() override { }
(...skipping 20 matching lines...) Expand all
93 return onClose(); 93 return onClose();
94 } else { 94 } else {
95 ASSERT(m_blobDataHandle); 95 ASSERT(m_blobDataHandle);
96 m_state = ReadingBlob; 96 m_state = ReadingBlob;
97 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayB uffer; 97 FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayB uffer;
98 m_loader = adoptPtr(new FileReaderLoader(readType, this)); 98 m_loader = adoptPtr(new FileReaderLoader(readType, this));
99 m_loader->start(m_body->executionContext(), m_blobDataHandle); 99 m_loader->start(m_body->executionContext(), m_blobDataHandle);
100 } 100 }
101 } 101 }
102 // Creates a new BodyStreamBuffer to drain the data. 102 // Creates a new BodyStreamBuffer to drain the data.
103 BodyStreamBuffer* createDrainingStream(bool* dataLost) 103 BodyStreamBuffer* createDrainingStream()
104 { 104 {
105 ASSERT(!m_drainingStreamBuffer); 105 ASSERT(!m_drainingStreamBuffer);
106 ASSERT(m_state != Initial); 106 ASSERT(m_state != Initial);
107 ASSERT(m_state != BodyUsed); 107 ASSERT(m_state != BodyUsed);
108 ASSERT(m_stream); 108 ASSERT(m_stream);
109 ASSERT(dataLost);
110 m_drainingStreamBuffer = new BodyStreamBuffer(); 109 m_drainingStreamBuffer = new BodyStreamBuffer();
111 if (m_state == Errored) { 110 if (m_state == Errored) {
112 m_drainingStreamBuffer->error(exception()); 111 m_drainingStreamBuffer->error(exception());
113 return m_drainingStreamBuffer; 112 return m_drainingStreamBuffer;
114 } 113 }
115 // Take back the data in |m_stream|. 114 // Take back the data in |m_stream|.
116 Deque<std::pair<RefPtr<DOMArrayBuffer>, size_t>> tmp_queue; 115 Deque<std::pair<RefPtr<DOMArrayBuffer>, size_t>> tmp_queue;
117 if (m_stream->stateInternal() == ReadableStream::Readable) 116 if (m_stream->stateInternal() == ReadableStream::Readable)
118 m_stream->readInternal(tmp_queue); 117 m_stream->readInternal(tmp_queue);
119 *dataLost = m_queueCount != tmp_queue.size();
120 while (!tmp_queue.isEmpty()) { 118 while (!tmp_queue.isEmpty()) {
121 std::pair<RefPtr<DOMArrayBuffer>, size_t> data = tmp_queue.takeFirst (); 119 std::pair<RefPtr<DOMArrayBuffer>, size_t> data = tmp_queue.takeFirst ();
122 m_drainingStreamBuffer->write(data.first); 120 m_drainingStreamBuffer->write(data.first);
123 } 121 }
124 if (m_state == Closed) 122 if (m_state == Closed)
125 m_drainingStreamBuffer->close(); 123 m_drainingStreamBuffer->close();
126 else 124 else
127 m_stream->close(); 125 m_stream->close();
128 return m_drainingStreamBuffer; 126 return m_drainingStreamBuffer;
129 } 127 }
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 close(); 174 close();
177 } 175 }
178 void didFail(FileError::ErrorCode) override 176 void didFail(FileError::ErrorCode) override
179 { 177 {
180 ASSERT(m_state == ReadingBlob); 178 ASSERT(m_state == ReadingBlob);
181 error(); 179 error();
182 } 180 }
183 181
184 void write(PassRefPtr<DOMArrayBuffer> buf) 182 void write(PassRefPtr<DOMArrayBuffer> buf)
185 { 183 {
186 if (m_drainingStreamBuffer) { 184 if (m_drainingStreamBuffer)
187 m_drainingStreamBuffer->write(buf); 185 m_drainingStreamBuffer->write(buf);
188 } else { 186 else
189 ++m_queueCount;
190 m_stream->enqueue(buf); 187 m_stream->enqueue(buf);
191 }
192 } 188 }
193 void close() 189 void close()
194 { 190 {
195 m_state = Closed; 191 m_state = Closed;
196 if (m_drainingStreamBuffer) 192 if (m_drainingStreamBuffer)
197 m_drainingStreamBuffer->close(); 193 m_drainingStreamBuffer->close();
198 else 194 else
199 m_stream->close(); 195 m_stream->close();
200 } 196 }
201 void error() 197 void error()
(...skipping 19 matching lines...) Expand all
221 // Set when the data container of the Body is a BodyStreamBuffer. 217 // Set when the data container of the Body is a BodyStreamBuffer.
222 Member<BodyStreamBuffer> m_bodyStreamBuffer; 218 Member<BodyStreamBuffer> m_bodyStreamBuffer;
223 // Set when the data container of the Body is a BlobDataHandle. 219 // Set when the data container of the Body is a BlobDataHandle.
224 RefPtr<BlobDataHandle> m_blobDataHandle; 220 RefPtr<BlobDataHandle> m_blobDataHandle;
225 // Used to read the data from BlobDataHandle. 221 // Used to read the data from BlobDataHandle.
226 OwnPtr<FileReaderLoader> m_loader; 222 OwnPtr<FileReaderLoader> m_loader;
227 // Created when createDrainingStream is called to drain the data. 223 // Created when createDrainingStream is called to drain the data.
228 Member<BodyStreamBuffer> m_drainingStreamBuffer; 224 Member<BodyStreamBuffer> m_drainingStreamBuffer;
229 Member<ReadableStreamImpl<ReadableStreamChunkTypeTraits<DOMArrayBuffer>>> m_ stream; 225 Member<ReadableStreamImpl<ReadableStreamChunkTypeTraits<DOMArrayBuffer>>> m_ stream;
230 State m_state; 226 State m_state;
231 // The count of the chunks which were enqueued to the ReadableStream.
232 size_t m_queueCount;
233 }; 227 };
234 228
235 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) 229 ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type)
236 { 230 {
237 if (bodyUsed()) 231 if (bodyUsed())
238 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read")); 232 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "Already read"));
239 233
240 // When the main thread sends a V8::TerminateExecution() signal to a worker 234 // When the main thread sends a V8::TerminateExecution() signal to a worker
241 // thread, any V8 API on the worker thread starts returning an empty 235 // thread, any V8 API on the worker thread starts returning an empty
242 // handle. This can happen in Body::readAsync. To avoid the situation, we 236 // handle. This can happen in Body::readAsync. To avoid the situation, we
243 // first check the ExecutionContext and return immediately if it's already 237 // first check the ExecutionContext and return immediately if it's already
244 // gone (which means that the V8::TerminateExecution() signal has been sent 238 // gone (which means that the V8::TerminateExecution() signal has been sent
245 // to this worker thread). 239 // to this worker thread).
246 ExecutionContext* executionContext = scriptState->executionContext(); 240 ExecutionContext* executionContext = scriptState->executionContext();
247 if (!executionContext) 241 if (!executionContext)
248 return ScriptPromise(); 242 return ScriptPromise();
249 243
250 setBodyUsed(); 244 setBodyUsed();
251 m_responseType = type; 245 m_responseType = type;
252 246
253 ASSERT(!m_resolver); 247 ASSERT(!m_resolver);
254 m_resolver = ScriptPromiseResolver::create(scriptState); 248 m_resolver = ScriptPromiseResolver::create(scriptState);
255 ScriptPromise promise = m_resolver->promise(); 249 ScriptPromise promise = m_resolver->promise();
256 250
257 if (m_stream) { 251 if (m_stream) {
258 ASSERT(m_streamSource); 252 ASSERT(m_streamSource);
259 bool dataLost; 253 m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(conte ntTypeForBuffer(), new BlobHandleReceiver(this));
260 m_streamSource->createDrainingStream(&dataLost)->readAllAndCreateBlobHan dle(contentTypeForBuffer(), new BlobHandleReceiver(this));
261 } else if (buffer()) { 254 } else if (buffer()) {
262 buffer()->readAllAndCreateBlobHandle(contentTypeForBuffer(), new BlobHan dleReceiver(this)); 255 buffer()->readAllAndCreateBlobHandle(contentTypeForBuffer(), new BlobHan dleReceiver(this));
263 } else { 256 } else {
264 readAsyncFromBlob(blobDataHandle()); 257 readAsyncFromBlob(blobDataHandle());
265 } 258 }
266 return promise; 259 return promise;
267 } 260 }
268 261
269 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle) 262 void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle)
270 { 263 {
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
362 ASSERT(!exceptionState.hadException()); 355 ASSERT(!exceptionState.hadException());
363 } 356 }
364 m_bodyUsed = true; 357 m_bodyUsed = true;
365 } 358 }
366 359
367 bool Body::streamAccessed() const 360 bool Body::streamAccessed() const
368 { 361 {
369 return m_stream; 362 return m_stream;
370 } 363 }
371 364
372 BodyStreamBuffer* Body::createDrainingStream(bool* dataLost) 365 BodyStreamBuffer* Body::createDrainingStream()
373 { 366 {
374 ASSERT(m_stream); 367 ASSERT(m_stream);
375 BodyStreamBuffer* newBuffer = m_streamSource->createDrainingStream(dataLost) ; 368 BodyStreamBuffer* newBuffer = m_streamSource->createDrainingStream();
376 m_stream = nullptr; 369 m_stream = nullptr;
377 m_streamSource = nullptr; 370 m_streamSource = nullptr;
378 return newBuffer; 371 return newBuffer;
379 } 372 }
380 373
381 void Body::stop() 374 void Body::stop()
382 { 375 {
383 // Canceling the load will call didFail which will remove the resolver. 376 // Canceling the load will call didFail which will remove the resolver.
384 if (m_loader) 377 if (m_loader)
385 m_loader->cancel(); 378 m_loader->cancel();
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
487 480
488 void Body::didBlobHandleReceiveError(PassRefPtrWillBeRawPtr<DOMException> except ion) 481 void Body::didBlobHandleReceiveError(PassRefPtrWillBeRawPtr<DOMException> except ion)
489 { 482 {
490 if (!m_resolver) 483 if (!m_resolver)
491 return; 484 return;
492 m_resolver->reject(exception); 485 m_resolver->reject(exception);
493 m_resolver.clear(); 486 m_resolver.clear();
494 } 487 }
495 488
496 } // namespace blink 489 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/Body.h ('k') | Source/modules/fetch/Response.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698