Index: Source/modules/fetch/Body.cpp |
diff --git a/Source/modules/fetch/Body.cpp b/Source/modules/fetch/Body.cpp |
index 0be313e412339d2df44a9ff3303435e9b7786011..783e65df81458d0dbceb882e0fb17dc2a6926fee 100644 |
--- a/Source/modules/fetch/Body.cpp |
+++ b/Source/modules/fetch/Body.cpp |
@@ -12,47 +12,21 @@ |
#include "bindings/core/v8/V8ThrowException.h" |
#include "core/dom/DOMArrayBuffer.h" |
#include "core/dom/DOMTypedArray.h" |
+#include "core/dom/ExceptionCode.h" |
#include "core/fileapi/Blob.h" |
-#include "core/fileapi/FileReaderLoader.h" |
-#include "core/fileapi/FileReaderLoaderClient.h" |
#include "core/frame/UseCounter.h" |
#include "core/streams/ReadableByteStream.h" |
#include "core/streams/ReadableByteStreamReader.h" |
#include "core/streams/UnderlyingSource.h" |
#include "modules/fetch/BodyStreamBuffer.h" |
+#include "modules/fetch/DataConsumerHandleUtil.h" |
+#include "modules/fetch/FetchBlobDataConsumerHandle.h" |
namespace blink { |
-class Body::BlobHandleReceiver final : public BodyStreamBuffer::BlobHandleCreatorClient { |
-public: |
- explicit BlobHandleReceiver(Body* body) |
- : m_body(body) |
- { |
- } |
- void didCreateBlobHandle(PassRefPtr<BlobDataHandle> handle) override |
- { |
- ASSERT(m_body); |
- m_body->readAsyncFromBlob(handle); |
- m_body = nullptr; |
- } |
- void didFail(DOMException* exception) override |
- { |
- ASSERT(m_body); |
- m_body->didBlobHandleReceiveError(exception); |
- m_body = nullptr; |
- } |
- DEFINE_INLINE_VIRTUAL_TRACE() |
- { |
- BodyStreamBuffer::BlobHandleCreatorClient::trace(visitor); |
- visitor->trace(m_body); |
- } |
-private: |
- Member<Body> m_body; |
-}; |
- |
// This class is an ActiveDOMObject subclass only for holding the |
// ExecutionContext used in |pullSource|. |
-class Body::ReadableStreamSource : public BodyStreamBuffer::Observer, public UnderlyingSource, public FileReaderLoaderClient, public ActiveDOMObject { |
+class Body::ReadableStreamSource : public GarbageCollectedFinalized<Body::ReadableStreamSource>, public UnderlyingSource, public WebDataConsumerHandle::Client, public ActiveDOMObject { |
USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource); |
public: |
enum State { |
@@ -61,26 +35,21 @@ public: |
Closed, |
Errored, |
}; |
- ReadableStreamSource(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> handle) |
- : ActiveDOMObject(executionContext) |
- , m_blobDataHandle(handle ? handle : BlobDataHandle::create(BlobData::create(), 0)) |
- , m_state(Initial) |
- { |
- suspendIfNeeded(); |
- } |
- ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer* buffer) |
+ ReadableStreamSource(ExecutionContext* executionContext, BodyStreamBuffer2* buffer) |
: ActiveDOMObject(executionContext) |
- , m_bodyStreamBuffer(buffer) |
+ , m_bodyStreamBuffer(buffer ? buffer : BodyStreamBuffer2::createEmpty()) |
, m_state(Initial) |
+ , m_streamNeedsMore(false) |
{ |
suspendIfNeeded(); |
} |
explicit ReadableStreamSource(ExecutionContext* executionContext) |
: ActiveDOMObject(executionContext) |
- , m_blobDataHandle(BlobDataHandle::create(BlobData::create(), 0)) |
+ , m_bodyStreamBuffer(BodyStreamBuffer2::createEmpty()) |
, m_state(Initial) |
+ , m_streamNeedsMore(false) |
{ |
suspendIfNeeded(); |
} |
@@ -95,41 +64,30 @@ public: |
stream->didSourceStart(); |
} |
// Creates a new BodyStreamBuffer to drain the data. |
- BodyStreamBuffer* createDrainingStream() |
+ BodyStreamBuffer2* createDrainingStream() |
yhirano
2015/06/25 10:45:31
If close / error is properly handled, I think this
hiroshige
2015/06/25 11:27:28
Yes, currently, but as we discussed offline, we ha
|
{ |
ASSERT(m_state != Initial); |
- auto drainingStreamBuffer = new BodyStreamBuffer(new Canceller(this)); |
if (m_stream->stateInternal() == ReadableByteStream::Closed) { |
- drainingStreamBuffer->close(); |
- return drainingStreamBuffer; |
+ return BodyStreamBuffer2::createEmpty(); |
} |
if (m_stream->stateInternal() == ReadableByteStream::Errored) { |
- drainingStreamBuffer->error(exception()); |
- return drainingStreamBuffer; |
+ return BodyStreamBuffer2::create(createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle())); |
} |
- ASSERT(!m_drainingStreamBuffer); |
- // Take back the data in |m_stream|. |
- Deque<std::pair<RefPtr<DOMArrayBufferView>, size_t>> tmp_queue; |
- ASSERT(m_stream->stateInternal() == ReadableStream::Readable); |
- m_stream->readInternal(tmp_queue); |
- while (!tmp_queue.isEmpty()) { |
- std::pair<RefPtr<DOMArrayBufferView>, size_t> data = tmp_queue.takeFirst(); |
- drainingStreamBuffer->write(data.first->buffer()); |
- } |
+ ASSERT(!m_drained); |
+ m_drained = true; |
+ m_reader.clear(); |
if (m_state == Closed) |
yhirano
2015/06/25 10:45:31
This block can be deleted.
hiroshige
2015/06/25 11:27:28
Done.
|
- drainingStreamBuffer->close(); |
+ return BodyStreamBuffer2::createEmpty(); |
- m_drainingStreamBuffer = drainingStreamBuffer; |
- return m_drainingStreamBuffer; |
+ return m_bodyStreamBuffer; |
} |
+ |
DEFINE_INLINE_VIRTUAL_TRACE() |
{ |
visitor->trace(m_bodyStreamBuffer); |
- visitor->trace(m_drainingStreamBuffer); |
visitor->trace(m_stream); |
- BodyStreamBuffer::Observer::trace(visitor); |
UnderlyingSource::trace(visitor); |
ActiveDOMObject::trace(visitor); |
} |
@@ -143,145 +101,87 @@ public: |
return; |
} |
m_state = Closed; |
- if (m_drainingStreamBuffer) |
- m_drainingStreamBuffer->close(); |
+ m_reader.clear(); |
m_stream->close(); |
} |
void error() |
{ |
m_state = Errored; |
- if (m_drainingStreamBuffer) |
- m_drainingStreamBuffer->error(exception()); |
- m_stream->error(exception()); |
+ m_reader.clear(); |
+ m_stream->error(DOMException::create(NetworkError, "network error")); |
} |
private: |
- class Canceller : public BodyStreamBuffer::Canceller { |
- public: |
- Canceller(ReadableStreamSource* source) : m_source(source) { } |
- void cancel() override |
- { |
- m_source->cancel(); |
- } |
- |
- DEFINE_INLINE_VIRTUAL_TRACE() |
- { |
- visitor->trace(m_source); |
- BodyStreamBuffer::Canceller::trace(visitor); |
- } |
- |
- private: |
- Member<ReadableStreamSource> m_source; |
- }; |
+ void didGetReadable() override |
+ { |
+ processData(); |
+ } |
// UnderlyingSource functions. |
void pullSource() override |
{ |
- // Note that one |pull| is called only when |read| is called on the |
- // associated ReadableByteStreamReader because we create a stream with |
- // StrictStrategy. |
+ m_streamNeedsMore = true; |
yhirano
2015/06/25 10:45:31
+ASSERT(!m_streamNeedsMore);
hiroshige
2015/06/25 11:27:28
Done.
|
+ |
if (m_state == Initial) { |
m_state = Streaming; |
- if (m_bodyStreamBuffer) { |
- m_bodyStreamBuffer->registerObserver(this); |
- onWrite(); |
- if (m_bodyStreamBuffer->hasError()) |
- return onError(); |
- if (m_bodyStreamBuffer->isClosed()) |
- return onClose(); |
- } else { |
- FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsArrayBuffer; |
- m_loader = adoptPtr(new FileReaderLoader(readType, this)); |
- m_loader->start(executionContext(), m_blobDataHandle); |
- } |
+ ASSERT(!m_drained); |
+ ASSERT(!m_reader); |
+ ASSERT(m_bodyStreamBuffer); |
+ ASSERT(m_bodyStreamBuffer->handle()); |
+ m_reader = m_bodyStreamBuffer->handle()->obtainReader(this); |
} |
- } |
- ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) override |
- { |
- cancel(); |
- return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
- } |
+ if (m_drained) |
yhirano
2015/06/25 10:45:31
I believe m_drained will never be set here because
hiroshige
2015/06/25 11:27:28
Done.
|
+ return; |
- // BodyStreamBuffer::Observer functions. |
- void onWrite() override |
- { |
- ASSERT(m_state == Streaming); |
- while (RefPtr<DOMArrayBuffer> buf = m_bodyStreamBuffer->read()) { |
- write(buf); |
- } |
- } |
- void onClose() override |
- { |
- ASSERT(m_state == Streaming); |
- close(); |
- m_bodyStreamBuffer->unregisterObserver(); |
- } |
- void onError() override |
- { |
- ASSERT(m_state == Streaming); |
- error(); |
- m_bodyStreamBuffer->unregisterObserver(); |
+ processData(); |
} |
- // FileReaderLoaderClient functions. |
- void didStartLoading() override { } |
- void didReceiveData() override { } |
- void didFinishLoading() override |
+ ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) override |
{ |
- ASSERT(m_state == Streaming); |
- write(m_loader->arrayBufferResult()); |
close(); |
- } |
- void didFail(FileError::ErrorCode) override |
- { |
- ASSERT(m_state == Streaming); |
- error(); |
+ return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
} |
- void write(PassRefPtr<DOMArrayBuffer> buf) |
- { |
- if (m_drainingStreamBuffer) { |
- m_drainingStreamBuffer->write(buf); |
- } else { |
- auto size = buf->byteLength(); |
- m_stream->enqueue(DOMUint8Array::create(buf, 0, size)); |
- } |
- } |
- void cancel() |
+ // Reads data and writes the data to |m_stream|, as long as data are |
+ // available and the stream has pending reads. |
+ void processData() |
{ |
- if (m_bodyStreamBuffer) { |
- m_bodyStreamBuffer->cancel(); |
- // We should not close the stream here, because it is canceller's |
- // responsibility. |
- } else { |
- if (m_loader) |
- m_loader->cancel(); |
- close(); |
+ ASSERT(m_reader); |
+ while (m_streamNeedsMore) { |
+ const void* buffer; |
+ size_t available; |
+ WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available); |
+ switch (result) { |
+ case WebDataConsumerHandle::Ok: |
+ m_streamNeedsMore = m_stream->enqueue(DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available)); |
+ m_reader->endRead(available); |
+ break; |
+ |
+ case WebDataConsumerHandle::Done: |
+ close(); |
+ return; |
+ |
+ case WebDataConsumerHandle::ShouldWait: |
+ return; |
+ |
+ case WebDataConsumerHandle::Busy: |
+ case WebDataConsumerHandle::ResourceExhausted: |
+ case WebDataConsumerHandle::UnexpectedError: |
+ error(); |
+ return; |
+ } |
} |
} |
- DOMException* exception() |
- { |
- if (m_state != Errored) |
- return nullptr; |
- if (m_bodyStreamBuffer) { |
- ASSERT(m_bodyStreamBuffer->exception()); |
- return m_bodyStreamBuffer->exception(); |
- } |
- return DOMException::create(NetworkError, "network error"); |
- } |
+ // Source of data. |
+ Member<BodyStreamBuffer2> m_bodyStreamBuffer; |
+ OwnPtr<FetchDataConsumerHandle::Reader> m_reader; |
- // Set when the data container of the Body is a BodyStreamBuffer. |
- Member<BodyStreamBuffer> m_bodyStreamBuffer; |
- // Set when the data container of the Body is a BlobDataHandle. |
- RefPtr<BlobDataHandle> m_blobDataHandle; |
- // Used to read the data from BlobDataHandle. |
- OwnPtr<FileReaderLoader> m_loader; |
- // Created when createDrainingStream is called to drain the data. |
- Member<BodyStreamBuffer> m_drainingStreamBuffer; |
Member<ReadableByteStream> m_stream; |
State m_state; |
+ bool m_drained; |
+ bool m_streamNeedsMore; |
}; |
ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) |
@@ -340,63 +240,49 @@ ScriptPromise Body::readAsync(ScriptState* scriptState, ResponseType type) |
m_resolver->reject(m_stream->storedException()); |
m_resolver.clear(); |
} else if (isBodyConsumed()) { |
- m_streamSource->createDrainingStream()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver(this)); |
- } else if (buffer()) { |
- buffer()->readAllAndCreateBlobHandle(mimeType(), new BlobHandleReceiver(this)); |
+ readAsyncFromFetchDataConsumerHandle(m_streamSource->createDrainingStream(), mimeType()); |
} else { |
- readAsyncFromBlob(blobDataHandle()); |
+ readAsyncFromFetchDataConsumerHandle(buffer(), mimeType()); |
} |
return promise; |
} |
-void Body::readAsyncFromBlob(PassRefPtr<BlobDataHandle> handle) |
+void Body::readAsyncFromFetchDataConsumerHandle(BodyStreamBuffer2* buffer, const String& mimeType) |
{ |
- FileReaderLoader::ReadType readType = FileReaderLoader::ReadAsText; |
- RefPtr<BlobDataHandle> blobHandle = handle; |
- if (!blobHandle) |
- blobHandle = BlobDataHandle::create(BlobData::create(), 0); |
+ ASSERT(!m_fetchDataLoader); |
+ |
switch (m_responseType) { |
case ResponseAsArrayBuffer: |
- readType = FileReaderLoader::ReadAsArrayBuffer; |
+ m_fetchDataLoader = FetchDataLoader::createLoaderAsArrayBuffer(); |
+ break; |
+ |
+ case ResponseAsJSON: |
+ case ResponseAsText: |
+ m_fetchDataLoader = FetchDataLoader::createLoaderAsString(); |
break; |
+ |
case ResponseAsBlob: |
- if (blobHandle->size() != kuint64max) { |
- // If the size of |blobHandle| is set correctly, creates Blob from |
- // it. |
- if (blobHandle->type() != mimeType()) { |
- // A new BlobDataHandle is created to override the Blob's type. |
- m_resolver->resolve(Blob::create(BlobDataHandle::create(blobHandle->uuid(), mimeType(), blobHandle->size()))); |
- } else { |
- m_resolver->resolve(Blob::create(blobHandle)); |
- } |
- m_stream->close(); |
- m_resolver.clear(); |
- return; |
- } |
- // If the size is not set, read as ArrayBuffer and create a new blob to |
- // get the size. |
- // FIXME: This workaround is not good for performance. |
- // When we will stop using Blob as a base system of Body to support |
- // stream, this problem should be solved. |
- readType = FileReaderLoader::ReadAsArrayBuffer; |
+ m_fetchDataLoader = FetchDataLoader::createLoaderAsBlobHandle(mimeType); |
break; |
+ |
case ResponseAsFormData: |
// FIXME: Implement this. |
ASSERT_NOT_REACHED(); |
- break; |
- case ResponseAsJSON: |
- case ResponseAsText: |
- break; |
+ return; |
+ |
default: |
ASSERT_NOT_REACHED(); |
+ return; |
} |
- m_loader = adoptPtr(new FileReaderLoader(readType, this)); |
- m_loader->start(m_resolver->scriptState()->executionContext(), blobHandle); |
- |
- return; |
+ if (buffer && buffer->handle()) |
+ m_fetchDataLoader->start(buffer->handle(), this); |
+ else |
+ m_fetchDataLoader->start(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()).get(), this); |
} |
+ |
+ |
ScriptPromise Body::arrayBuffer(ScriptState* scriptState) |
{ |
return readAsync(scriptState, ResponseAsArrayBuffer); |
@@ -452,8 +338,8 @@ bool Body::isBodyConsumed() const |
} |
if (m_stream->stateInternal() == ReadableStream::Closed) { |
// Return true if the blob handle is originally not empty. |
- RefPtr<BlobDataHandle> handle = blobDataHandle(); |
- return handle && handle->size(); |
+ // TODO: delete isBodyConsumed() itself. |
+ return true; |
} |
if (m_stream->stateInternal() == ReadableStream::Errored) { |
// The stream is errored. That means an effort to read data was made. |
@@ -469,16 +355,17 @@ void Body::setBody(ReadableStreamSource* source) |
m_streamSource->startStream(m_stream); |
} |
-BodyStreamBuffer* Body::createDrainingStream() |
+BodyStreamBuffer2* Body::createDrainingStream() |
{ |
return m_streamSource->createDrainingStream(); |
} |
void Body::stop() |
{ |
- // Canceling the load will call didFail which will remove the resolver. |
- if (m_loader) |
- m_loader->cancel(); |
+ if (m_fetchDataLoader) { |
+ m_fetchDataLoader->cancel(); |
+ m_fetchDataLoader.clear(); |
+ } |
} |
bool Body::hasPendingActivity() const |
@@ -492,22 +379,19 @@ bool Body::hasPendingActivity() const |
return false; |
} |
-Body::ReadableStreamSource* Body::createBodySource(PassRefPtr<BlobDataHandle> handle) |
-{ |
- return new ReadableStreamSource(executionContext(), handle); |
-} |
- |
-Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer* buffer) |
+Body::ReadableStreamSource* Body::createBodySource(BodyStreamBuffer2* buffer) |
{ |
return new ReadableStreamSource(executionContext(), buffer); |
} |
DEFINE_TRACE(Body) |
{ |
+ visitor->trace(m_fetchDataLoader); |
visitor->trace(m_resolver); |
visitor->trace(m_stream); |
visitor->trace(m_streamSource); |
ActiveDOMObject::trace(visitor); |
+ FetchDataLoader::Client::trace(visitor); |
} |
Body::Body(ExecutionContext* context) |
@@ -534,63 +418,76 @@ void Body::resolveJSON(const String& string) |
m_resolver->reject(trycatch.Exception()); |
} |
-// FileReaderLoaderClient functions. |
-void Body::didStartLoading() { } |
-void Body::didReceiveData() { } |
-void Body::didFinishLoading() |
+// FetchDataLoader::Client functions. |
+void Body::didFetchDataLoadFailed() |
{ |
+ ASSERT(m_fetchDataLoader); |
+ m_fetchDataLoader.clear(); |
+ |
if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
return; |
- switch (m_responseType) { |
- case ResponseAsArrayBuffer: |
- m_resolver->resolve(m_loader->arrayBufferResult()); |
- break; |
- case ResponseAsBlob: { |
- ASSERT(blobDataHandle()->size() == kuint64max); |
- OwnPtr<BlobData> blobData = BlobData::create(); |
- RefPtr<DOMArrayBuffer> buffer = m_loader->arrayBufferResult(); |
- blobData->appendBytes(buffer->data(), buffer->byteLength()); |
- blobData->setContentType(mimeType()); |
- const size_t length = blobData->length(); |
- m_resolver->resolve(Blob::create(BlobDataHandle::create(blobData.release(), length))); |
- break; |
- } |
- case ResponseAsFormData: |
- ASSERT_NOT_REACHED(); |
- break; |
- case ResponseAsJSON: |
- resolveJSON(m_loader->stringResult()); |
- break; |
- case ResponseAsText: |
- m_resolver->resolve(m_loader->stringResult()); |
- break; |
- default: |
- ASSERT_NOT_REACHED(); |
+ m_streamSource->error(); |
+ if (m_resolver) { |
+ if (!m_resolver->executionContext() || m_resolver->executionContext()->activeDOMObjectsAreStopped()) { |
+ m_resolver.clear(); |
+ return; |
+ } |
+ ScriptState* state = m_resolver->scriptState(); |
+ ScriptState::Scope scope(state); |
+ m_resolver->reject(V8ThrowException::createTypeError(state->isolate(), "Failed to fetch")); |
+ m_resolver.clear(); |
} |
+} |
+ |
+void Body::didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) |
+{ |
+ ASSERT(m_fetchDataLoader); |
+ m_fetchDataLoader.clear(); |
+ |
+ if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
+ return; |
+ |
+ ASSERT(m_responseType == ResponseAsBlob); |
+ m_resolver->resolve(Blob::create(blobDataHandle)); |
m_streamSource->close(); |
m_resolver.clear(); |
} |
-void Body::didFail(FileError::ErrorCode code) |
+void Body::didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) |
{ |
+ ASSERT(m_fetchDataLoader); |
+ m_fetchDataLoader.clear(); |
+ |
if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
return; |
- m_streamSource->error(); |
- if (m_resolver) { |
- // FIXME: We should reject the promise. |
- m_resolver->resolve(""); |
- m_resolver.clear(); |
- } |
+ ASSERT(m_responseType == ResponseAsArrayBuffer); |
+ m_resolver->resolve(arrayBuffer); |
+ m_streamSource->close(); |
+ m_resolver.clear(); |
} |
-void Body::didBlobHandleReceiveError(DOMException* exception) |
+void Body::didFetchDataLoadedString(const String& str) |
{ |
- if (!m_resolver) |
+ ASSERT(m_fetchDataLoader); |
+ m_fetchDataLoader.clear(); |
+ |
+ if (!executionContext() || executionContext()->activeDOMObjectsAreStopped()) |
return; |
- m_streamSource->error(); |
- m_resolver->reject(exception); |
+ |
+ switch (m_responseType) { |
+ case ResponseAsJSON: |
+ resolveJSON(str); |
+ break; |
+ case ResponseAsText: |
+ m_resolver->resolve(str); |
+ break; |
+ default: |
+ ASSERT_NOT_REACHED(); |
+ } |
+ |
+ m_streamSource->close(); |
m_resolver.clear(); |
} |