Chromium Code Reviews| Index: Source/modules/fetch/Body.cpp |
| diff --git a/Source/modules/fetch/Body.cpp b/Source/modules/fetch/Body.cpp |
| index 0be313e412339d2df44a9ff3303435e9b7786011..783e65df81458d0dbceb882e0fb17dc2a6926fee 100644 |
| --- a/Source/modules/fetch/Body.cpp |
| +++ b/Source/modules/fetch/Body.cpp |
| @@ -12,47 +12,21 @@ |
| #include "bindings/core/v8/V8ThrowException.h" |
| #include "core/dom/DOMArrayBuffer.h" |
| #include "core/dom/DOMTypedArray.h" |
| +#include "core/dom/ExceptionCode.h" |
| #include "core/fileapi/Blob.h" |
| -#include "core/fileapi/FileReaderLoader.h" |
| -#include "core/fileapi/FileReaderLoaderClient.h" |
| #include "core/frame/UseCounter.h" |
| #include "core/streams/ReadableByteStream.h" |
| #include "core/streams/ReadableByteStreamReader.h" |
| #include "core/streams/UnderlyingSource.h" |
| #include "modules/fetch/BodyStreamBuffer.h" |
| +#include "modules/fetch/DataConsumerHandleUtil.h" |
| +#include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| namespace blink { |
| -class Body::BlobHandleReceiver final : public BodyStreamBuffer::BlobHandleCreatorClient { |
| -public: |
| - explicit BlobHandleReceiver(Body* body) |
| - : m_body(body) |
| - { |
| - } |
| - void didCreateBlobHandle(PassRefPtr<BlobDataHandle> handle) override |
| - { |
| - ASSERT(m_body); |
| - m_body->readAsyncFromBlob(handle); |
| - m_body = nullptr; |
| - } |
| - void didFail(DOMException* exception) override |
| - { |
| - ASSERT(m_body); |
| - m_body->didBlobHandleReceiveError(exception); |
| - m_body = nullptr; |
| - } |
| - DEFINE_INLINE_VIRTUAL_TRACE() |
| - { |
| - BodyStreamBuffer::BlobHandleCreatorClient::trace(visitor); |
| - visitor->trace(m_body); |
| - } |
| -private: |
| - Member<Body> m_body; |
| -}; |
| - |
| // This class is an ActiveDOMObject subclass only for holding the |
| // ExecutionContext used in |pullSource|. |
| -class Body::ReadableStreamSource : public BodyStreamBuffer::Observer, public UnderlyingSource, public FileReaderLoaderClient, public ActiveDOMObject { |
| +class Body::ReadableStreamSource : public GarbageCollectedFinalized<Body::ReadableStreamSource>, public UnderlyingSource, public WebDataConsumerHandle::Client, public ActiveDOMObject { |
| USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); |
| public: |
| enum State { |
| @@ -61,26 +35,21 @@ public: |
| Closed, |
| Errored, |
| }; |
| - ReadableStreamSource(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> handle) |
| - : ActiveDOMObject(executionContext) |
| - , m_blobDataHandle(handle ? handle : BlobDataHandle::create(BlobData::create(), 0)) |
| - , m_state(Initial) |
| - { |
| - suspendIfNeeded(); |
| - } |
| - ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* buffer) |
| + ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer2* buffer) |
| : ActiveDOMObject(executionContext) |
| - , m_bodyStreamBuffer(buffer) |
| + , m_bodyStreamBuffer(buffer ? buffer : BodyStreamBuffer2::createEmpty()) |
| , m_state(Initial) |
| + , m_streamNeedsMore(false) |
| { |
| suspendIfNeeded(); |
| } |
| explicit ReadableStreamSource(ExecutionContext* executionContext) |
| : ActiveDOMObject(executionContext) |
| - , m_blobDataHandle(BlobDataHandle::create(BlobData::create(), 0)) |
| + , m_bodyStreamBuffer(BodyStreamBuffer2::createEmpty()) |
| , m_state(Initial) |
| + , m_streamNeedsMore(false) |
| { |
| suspendIfNeeded(); |
| } |
| @@ -95,41 +64,30 @@ public: |
| stream->didSourceStart(); |
| } |
| // Creates a new BodyStreamBuffer to drain the data. |
| - BodyStreamBuffer* createDrainingStream() |
| + BodyStreamBuffer2* createDrainingStream() |
|
yhirano
2015/06/25 10:45:31
If close / error is properly handled, I think this
hiroshige
2015/06/25 11:27:28
Yes, currently, but as we discussed offline, we ha
|
| { |
| ASSERT(m_state != Initial); |
| - auto drainingStreamBuffer = new BodyStreamBuffer(new Canceller(this)); |
| if (m_stream->stateInternal() == ReadableByteStream::Closed) { |
| - drainingStreamBuffer->close(); |
| - return drainingStreamBuffer; |
| + return BodyStreamBuffer2::createEmpty(); |
| } |
| if (m_stream->stateInternal() == ReadableByteStream::Errored) { |
| - drainingStreamBuffer->error(exception()); |
| - return drainingStreamBuffer; |
| + return BodyStreamBuffer2::create(createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle())); |
| } |
| - ASSERT(!m_drainingStreamBuffer); |
| - // Take back the data in |m_stream|. |
| - Deque<std::pair<RefPtr<DOMArrayBufferView>, size_t>> tmp_queue; |
| - ASSERT(m_stream->stateInternal() == ReadableStream::Readable); |
| - m_stream->readInternal(tmp_queue); |
| - while (!tmp_queue.isEmpty()) { |
| - std::pair<RefPtr<DOMArrayBufferView>, size_t> data = tmp_queue.takeFirst(); |
| - drainingStreamBuffer->write(data.first->buffer()); |
| - } |
| + ASSERT(!m_drained); |
| + m_drained = true; |
| + m_reader.clear(); |
| if (m_state == Closed) |
|
yhirano
2015/06/25 10:45:31
This block can be deleted.
hiroshige
2015/06/25 11:27:28
Done.
|
| - drainingStreamBuffer->close(); |
| + return BodyStreamBuffer2::createEmpty(); |
| - m_drainingStreamBuffer = drainingStreamBuffer; |
| - return m_drainingStreamBuffer; |
| + return m_bodyStreamBuffer; |
| } |
| + |
| DEFINE_INLINE_VIRTUAL_TRACE() |
| { |
| visitor->trace(m_bodyStreamBuffer); |
| - visitor->trace(m_drainingStreamBuffer); |
| visitor->trace(m_stream); |
| - BodyStreamBuffer::Observer::trace(visitor); |
| UnderlyingSource::trace(visitor); |
| ActiveDOMObject::trace(visitor); |
| } |
| @@ -143,145 +101,87 @@ public: |
| return; |
| } |
| m_state = Closed; |
| - if (m_drainingStreamBuffer) |
| - m_drainingStreamBuffer->close(); |
| + m_reader.clear(); |
| m_stream->close(); |
| } |
| void error() |
| { |
| m_state = Errored; |
| - if (m_drainingStreamBuffer) |
| - m_drainingStreamBuffer->error(exception()); |
| - m_stream->error(exception()); |
| + m_reader.clear(); |
| + m_stream->error(DOMException::create(NetworkError, "network error")); |
| } |
| private: |
| - class Canceller : public BodyStreamBuffer::Canceller { |
| - public: |
| - Canceller(ReadableStreamSource* source) : m_source(source) { } |
| - void cancel() override |
| - { |
| - m_source->cancel(); |
| - } |
| - |
| - DEFINE_INLINE_VIRTUAL_TRACE() |
| - { |
| - visitor->trace(m_source); |
| - BodyStreamBuffer::Canceller::trace(visitor); |
| - } |
| - |
| - private: |
| - Member<ReadableStreamSource> m_source; |
| - }; |
| + void didGetReadable() override |
| + { |
| + processData(); |
| + } |
| // UnderlyingSource functions. |
| void pullSource() override |
| { |
| - // Note that one |pull| is called only when |read| is called on the |
| - // associated ReadableByteStreamReader because we create a stream with |
| - // StrictStrategy. |
| + m_streamNeedsMore = true; |
|
yhirano
2015/06/25 10:45:31
+ASSERT(!m_streamNeedsMore);
hiroshige
2015/06/25 11:27:28
Done.
|
| + |
| if (m_state == Initial) { |
| m_state = Streaming; |
| - if (m_bodyStreamBuffer) { |
| - m_bodyStreamBuffer->registerObserver(this); |
| - onWrite(); |
| - if (m_bodyStreamBuffer->hasError()) |
| - return onError(); |
| - if (m_bodyStreamBuffer->isClosed()) |
| - return onClose(); |
| - } else { |
| - FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayBuffer; |
| - m_loader = adoptPtr(new FileReaderLoader(readType, this)); |
| - m_loader->start(executionContext(), m_blobDataHandle); |
| - } |
| + ASSERT(!m_drained); |
| + ASSERT(!m_reader); |
| + ASSERT(m_bodyStreamBuffer); |
| + ASSERT(m_bodyStreamBuffer->handle()); |
| + m_reader = m_bodyStreamBuffer->handle()->obtainReader(this); |
| } |
| - } |
| - ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) override |
| - { |
| - cancel(); |
| - return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
| - } |
| + if (m_drained) |
|
yhirano
2015/06/25 10:45:31
I believe m_drained will never be set here because
hiroshige
2015/06/25 11:27:28
Done.
|
| + return; |
| - // BodyStreamBuffer::Observer functions. |
| - void onWrite() override |
| - { |
| - ASSERT(m_state == Streaming); |
| - while (RefPtr<DOMArrayBuffer> buf = m_bodyStreamBuffer->read()) { |
| - write(buf); |
| - } |
| - } |
| - void onClose() override |
| - { |
| - ASSERT(m_state == Streaming); |
| - close(); |
| - m_bodyStreamBuffer->unregisterObserver(); |
| - } |
| - void onError() override |
| - { |
| - ASSERT(m_state == Streaming); |
| - error(); |
| - m_bodyStreamBuffer->unregisterObserver(); |
| + processData(); |
| } |
| - // FileReaderLoaderClient functions. |
| - void didStartLoading() override { } |
| - void didReceiveData() override { } |
| - void didFinishLoading() override |
| + ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) override |
| { |
| - ASSERT(m_state == Streaming); |
| - write(m_loader->arrayBufferResult()); |
| close(); |
| - } |
| - void didFail(FileError::ErrorCode) override |
| - { |
| - ASSERT(m_state == Streaming); |
| - error(); |
| + return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
| } |
| - void write(PassRefPtr<DOMArrayBuffer> buf) |
| - { |
| - if (m_drainingStreamBuffer) { |
| - m_drainingStreamBuffer->write(buf); |
| - } else { |
| - auto size = buf->byteLength(); |
| - m_stream->enqueue(DOMUint8Array::create(buf, 0, size)); |
| - } |
| - } |
| - void cancel() |
| + // Reads data and writes the data to |m_stream|, as long as data are |
| + // available and the stream has pending reads. |
| + void processData() |
| { |
| - if (m_bodyStreamBuffer) { |
| - m_bodyStreamBuffer->cancel(); |
| - // We should not close the stream here, because it is canceller's |
| - // responsibility. |
| - } else { |
| - if (m_loader) |
| - m_loader->cancel(); |
| - close(); |
| + ASSERT(m_reader); |
| + while (m_streamNeedsMore) { |
| + const void* buffer; |
| + size_t available; |
| + WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available); |
| + switch (result) { |
| + case WebDataConsumerHandle::Ok: |
| + m_streamNeedsMore = m_stream->enqueue(DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available)); |
| + m_reader->endRead(available); |
| + break; |
| + |
| + case WebDataConsumerHandle::Done: |
| + close(); |
| + return; |
| + |
| + case WebDataConsumerHandle::ShouldWait: |
| + return; |
| + |
| + case WebDataConsumerHandle::Busy: |
| + case WebDataConsumerHandle::ResourceExhausted: |
| + case WebDataConsumerHandle::UnexpectedError: |
| + error(); |
| + return; |
| + } |
| } |
| } |
| - DOMException* exception() |
| - { |
| - if (m_state != Errored) |
| - return nullptr; |
| - if (m_bodyStreamBuffer) { |
| - ASSERT(m_bodyStreamBuffer->exception()); |
| - return m_bodyStreamBuffer->exception(); |
| - } |
| - return DOMException::create(NetworkError, "network error"); |
| - } |
| + // Source of data. |
| + Member<BodyStreamBuffer2> m_bodyStreamBuffer; |
| + OwnPtr<FetchDataConsumerHandle::Reader> m_reader; |
| - // Set when the data container of the Body is a BodyStreamBuffer. |
| - Member<BodyStreamBuffer> m_bodyStreamBuffer; |
| - // Set when the data container of the Body is a BlobDataHandle. |
| - RefPtr<BlobDataHandle> m_blobDataHandle; |
| - // Used to read the data from BlobDataHandle. |
| - OwnPtr<FileReaderLoader> m_loader; |
| - // Created when createDrainingStream is called to drain the data. |
| - Member<BodyStreamBuffer> m_drainingStreamBuffer; |
| Member<ReadableByteStream> m_stream; |
| State m_state; |
| + bool m_drained; |
| + bool m_streamNeedsMore; |
| }; |
| ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) |
| @@ -340,63 +240,49 @@ ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) |
| m_resolver->reject(m_stream->storedException()); |
| m_resolver.clear(); |
| } else if (isBodyConsumed()) { |
| - m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver(this)); |
| - } else if (buffer()) { |
| - buffer()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver(this)); |
| + readAsyncFromFetchDataConsumerHandle(m_streamSource->createDrainingStream(), mimeType()); |
| } else { |
| - readAsyncFromBlob(blobDataHandle()); |
| + readAsyncFromFetchDataConsumerHandle(buffer(), mimeType()); |
| } |
| return promise; |
| } |
| -void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle) |
| +void Body::readAsyncFromFetchDataConsumerHandle(BodyStreamBuffer2* buffer, const String& mimeType) |
| { |
| - FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsText; |
| - RefPtr<BlobDataHandle> blobHandle = handle; |
| - if (!blobHandle) |
| - blobHandle = BlobDataHandle::create(BlobData::create(), 0); |
| + ASSERT(!m_fetchDataLoader); |
| + |
| switch (m_responseType) { |
| case ResponseAsArrayBuffer: |
| - readType = FileReaderLoader::ReadAsArrayBuffer; |
| + m_fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer(); |
| + break; |
| + |
| + case ResponseAsJSON: |
| + case ResponseAsText: |
| + m_fetchDataLoader = FetchDataLoader::createLoaderAsString(); |
| break; |
| + |
| case ResponseAsBlob: |
| - if (blobHandle->size() != kuint64max) { |
| - // If the size of |blobHandle| is set correctly, creates Blob from |
| - // it. |
| - if (blobHandle->type() != mimeType()) { |
| - // A new BlobDataHandle is created to override the Blob's type. |
| - m_resolver->resolve(Blob::create(BlobDataHandle::create(blobHandle->uuid(), mimeType(), blobHandle->size()))); |
| - } else { |
| - m_resolver->resolve(Blob::create(blobHandle)); |
| - } |
| - m_stream->close(); |
| - m_resolver.clear(); |
| - return; |
| - } |
| - // If the size is not set, read as ArrayBuffer and create a new blob to |
| - // get the size. |
| - // FIXME: This workaround is not good for performance. |
| - // When we will stop using Blob as a base system of Body to support |
| - // stream, this problem should be solved. |
| - readType = FileReaderLoader::ReadAsArrayBuffer; |
| + m_fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType); |
| break; |
| + |
| case ResponseAsFormData: |
| // FIXME: Implement this. |
| ASSERT_NOT_REACHED(); |
| - break; |
| - case ResponseAsJSON: |
| - case ResponseAsText: |
| - break; |
| + return; |
| + |
| default: |
| ASSERT_NOT_REACHED(); |
| + return; |
| } |
| - m_loader = adoptPtr(new FileReaderLoader(readType, this)); |
| - m_loader->start(m_resolver->scriptState()->executionContext(), blobHandle); |
| - |
| - return; |
| + if (buffer && buffer->handle()) |
| + m_fetchDataLoader->start(buffer->handle(), this); |
| + else |
| + m_fetchDataLoader->start(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()).get(), this); |
| } |
| + |
| + |
| ScriptPromise Body::arrayBuffer(ScriptState* scriptState) |
| { |
| return readAsync(scriptState, ResponseAsArrayBuffer); |
| @@ -452,8 +338,8 @@ bool Body::isBodyConsumed() const |
| } |
| if (m_stream->stateInternal() == ReadableStream::Closed) { |
| // Return true if the blob handle is originally not empty. |
| - RefPtr<BlobDataHandle> handle = blobDataHandle(); |
| - return handle && handle->size(); |
| + // TODO: delete isBodyConsumed() itself. |
| + return true; |
| } |
| if (m_stream->stateInternal() == ReadableStream::Errored) { |
| // The stream is errored. That means an effort to read data was made. |
| @@ -469,16 +355,17 @@ void Body::setBody(ReadableStreamSource* source) |
| m_streamSource->startStream(m_stream); |
| } |
| -BodyStreamBuffer* Body::createDrainingStream() |
| +BodyStreamBuffer2* Body::createDrainingStream() |
| { |
| return m_streamSource->createDrainingStream(); |
| } |
| void Body::stop() |
| { |
| - // Canceling the load will call didFail which will remove the resolver. |
| - if (m_loader) |
| - m_loader->cancel(); |
| + if (m_fetchDataLoader) { |
| + m_fetchDataLoader->cancel(); |
| + m_fetchDataLoader.clear(); |
| + } |
| } |
| bool Body::hasPendingActivity() const |
| @@ -492,22 +379,19 @@ bool Body::hasPendingActivity() const |
| return false; |
| } |
| -Body::ReadableStreamSource* Body::createBodySource(PassRefPtr<BlobDataHandle> handle) |
| -{ |
| - return new ReadableStreamSource(executionContext(), handle); |
| -} |
| - |
| -Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer* buffer) |
| +Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer2* buffer) |
| { |
| return new ReadableStreamSource(executionContext(), buffer); |
| } |
| DEFINE_TRACE(Body) |
| { |
| + visitor->trace(m_fetchDataLoader); |
| visitor->trace(m_resolver); |
| visitor->trace(m_stream); |
| visitor->trace(m_streamSource); |
| ActiveDOMObject::trace(visitor); |
| + FetchDataLoader::Client::trace(visitor); |
| } |
| Body::Body(ExecutionContext* context) |
| @@ -534,63 +418,76 @@ void Body::resolveJSON(const String& string) |
| m_resolver->reject(trycatch.Exception()); |
| } |
| -// FileReaderLoaderClient functions. |
| -void Body::didStartLoading() { } |
| -void Body::didReceiveData() { } |
| -void Body::didFinishLoading() |
| +// FetchDataLoader::Client functions. |
| +void Body::didFetchDataLoadFailed() |
| { |
| + ASSERT(m_fetchDataLoader); |
| + m_fetchDataLoader.clear(); |
| + |
| if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
| return; |
| - switch (m_responseType) { |
| - case ResponseAsArrayBuffer: |
| - m_resolver->resolve(m_loader->arrayBufferResult()); |
| - break; |
| - case ResponseAsBlob: { |
| - ASSERT(blobDataHandle()->size() == kuint64max); |
| - OwnPtr<BlobData> blobData = BlobData::create(); |
| - RefPtr<DOMArrayBuffer> buffer = m_loader->arrayBufferResult(); |
| - blobData->appendBytes(buffer->data(), buffer->byteLength()); |
| - blobData->setContentType(mimeType()); |
| - const size_t length = blobData->length(); |
| - m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release(), length))); |
| - break; |
| - } |
| - case ResponseAsFormData: |
| - ASSERT_NOT_REACHED(); |
| - break; |
| - case ResponseAsJSON: |
| - resolveJSON(m_loader->stringResult()); |
| - break; |
| - case ResponseAsText: |
| - m_resolver->resolve(m_loader->stringResult()); |
| - break; |
| - default: |
| - ASSERT_NOT_REACHED(); |
| + m_streamSource->error(); |
| + if (m_resolver) { |
| + if (!m_resolver->executionContext() || m_resolver->executionContext()->activeDOMObjectsAreStopped()) { |
| + m_resolver.clear(); |
| + return; |
| + } |
| + ScriptState* state = m_resolver->scriptState(); |
| + ScriptState::Scope scope(state); |
| + m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), "Failed to fetch")); |
| + m_resolver.clear(); |
| } |
| +} |
| + |
| +void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) |
| +{ |
| + ASSERT(m_fetchDataLoader); |
| + m_fetchDataLoader.clear(); |
| + |
| + if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
| + return; |
| + |
| + ASSERT(m_responseType == ResponseAsBlob); |
| + m_resolver->resolve(Blob::create(blobDataHandle)); |
| m_streamSource->close(); |
| m_resolver.clear(); |
| } |
| -void Body::didFail(FileError::ErrorCode code) |
| +void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) |
| { |
| + ASSERT(m_fetchDataLoader); |
| + m_fetchDataLoader.clear(); |
| + |
| if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
| return; |
| - m_streamSource->error(); |
| - if (m_resolver) { |
| - // FIXME: We should reject the promise. |
| - m_resolver->resolve(""); |
| - m_resolver.clear(); |
| - } |
| + ASSERT(m_responseType == ResponseAsArrayBuffer); |
| + m_resolver->resolve(arrayBuffer); |
| + m_streamSource->close(); |
| + m_resolver.clear(); |
| } |
| -void Body::didBlobHandleReceiveError(DOMException* exception) |
| +void Body::didFetchDataLoadedString(const String& str) |
| { |
| - if (!m_resolver) |
| + ASSERT(m_fetchDataLoader); |
| + m_fetchDataLoader.clear(); |
| + |
| + if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
| return; |
| - m_streamSource->error(); |
| - m_resolver->reject(exception); |
| + |
| + switch (m_responseType) { |
| + case ResponseAsJSON: |
| + resolveJSON(str); |
| + break; |
| + case ResponseAsText: |
| + m_resolver->resolve(str); |
| + break; |
| + default: |
| + ASSERT_NOT_REACHED(); |
| + } |
| + |
| + m_streamSource->close(); |
| m_resolver.clear(); |
| } |