Chromium Code Reviews| Index: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| diff --git a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| index 89eb71958c9c00425f658ea58c3b47c73524fa2f..c14cacabf1c539cc93f560fce8540a29cce5ddd4 100644 |
| --- a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| +++ b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| @@ -4,11 +4,315 @@ |
| #include "modules/fetch/BytesConsumer.h" |
| +#include "core/dom/ExecutionContext.h" |
| +#include "core/dom/TaskRunnerHelper.h" |
| +#include "modules/fetch/BytesConsumerForDataConsumerHandle.h" |
| +#include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| +#include "platform/blob/BlobData.h" |
| +#include "public/platform/WebTaskRunner.h" |
| +#include "wtf/Functional.h" |
| +#include "wtf/RefPtr.h" |
| #include <algorithm> |
| #include <string.h> |
| namespace blink { |
| +namespace { |
| + |
| +class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public BytesConsumer::Client { |
| + USING_GARBAGE_COLLECTED_MIXIN(NoopClient); |
| +public: |
| + void onStateChange() override {} |
| +}; |
| + |
| +class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::Client { |
| + USING_GARBAGE_COLLECTED_MIXIN(Tee); |
| +public: |
| + Tee(ExecutionContext* executionContext, BytesConsumer* consumer) |
| + : m_src(consumer) |
| + , m_destination1(new Destination(executionContext, this)) |
| + , m_destination2(new Destination(executionContext, this)) |
| + { |
| + consumer->setClient(this); |
| + // As no client is set to either destinations, Destination::notify() is |
| + // no-op in this function. |
| + onStateChange(); |
| + } |
| + |
| + void onStateChange() override |
| + { |
| + bool destination1WasEmpty = m_destination1->isEmpty(); |
| + bool destination2WasEmpty = m_destination2->isEmpty(); |
| + bool hasEnqueued = false; |
| + |
| + while (true) { |
| + const char* buffer = nullptr; |
| + size_t available = 0; |
| + Result result = m_src->beginRead(&buffer, &available); |
| + if (result == Result::ShouldWait) { |
| + if (hasEnqueued && destination1WasEmpty) |
| + m_destination1->notify(); |
| + if (hasEnqueued && destination2WasEmpty) |
| + m_destination2->notify(); |
| + return; |
| + } |
| + if (result == Result::Done) { |
| + if (destination1WasEmpty) |
| + m_destination1->notify(); |
| + if (destination2WasEmpty) |
| + m_destination2->notify(); |
| + return; |
| + } |
| + if (result == Result::Error) { |
| + clearAndNotify(); |
| + return; |
| + } |
| + DCHECK_EQ(Result::Ok, result); |
|
hiroshige
2016/09/08 08:25:54
nit: how about using switch() and |case Result::Ok
yhirano
2016/09/08 09:41:34
Done.
|
| + Chunk* chunk = new Chunk(buffer, available); |
| + if (m_src->endRead(available) != Result::Ok) { |
| + clearAndNotify(); |
| + return; |
| + } |
| + m_destination1->enqueue(chunk); |
| + m_destination2->enqueue(chunk); |
| + hasEnqueued = true; |
| + } |
| + } |
| + |
| + BytesConsumer::PublicState getPublicState() const |
| + { |
| + return m_src->getPublicState(); |
| + } |
| + |
| + BytesConsumer::Error getError() const |
| + { |
| + return m_src->getError(); |
| + } |
| + |
| + void cancel() |
| + { |
| + if (!m_destination1->isCancelled() || !m_destination2->isCancelled()) |
| + return; |
| + m_src->cancel(); |
| + } |
| + |
| + BytesConsumer* destination1() const { return m_destination1; } |
| + BytesConsumer* destination2() const { return m_destination2; } |
| + |
| + DEFINE_INLINE_TRACE() |
| + { |
| + visitor->trace(m_src); |
| + visitor->trace(m_destination1); |
| + visitor->trace(m_destination2); |
| + BytesConsumer::Client::trace(visitor); |
| + } |
| + |
| +private: |
| + using Result = BytesConsumer::Result; |
| + class Chunk final : public GarbageCollectedFinalized<Chunk> { |
| + public: |
| + Chunk(const char* data, size_t size) |
| + { |
| + m_buffer.reserveCapacity(size); |
|
hiroshige
2016/09/08 08:25:54
reserveInitialCapacity()? (I'm not a WTF::Vector s
yhirano
2016/09/08 09:41:34
Done.
|
| + m_buffer.append(data, size); |
| + } |
| + const char* data() const { return m_buffer.data(); } |
| + size_t size() const { return m_buffer.size(); } |
| + |
| + DEFINE_INLINE_TRACE() {} |
| + |
| + private: |
| + Vector<char> m_buffer; |
| + }; |
| + |
| + class Destination final : public BytesConsumer { |
| + public: |
| + Destination(ExecutionContext* executionContext, Tee* tee) |
| + : m_executionContext(executionContext) |
| + , m_tee(tee) |
| + { |
| + } |
| + |
| + Result beginRead(const char** buffer, size_t* available) override |
| + { |
| + DCHECK(!m_chunkInUse); |
| + *buffer = nullptr; |
| + *available = 0; |
| + if (m_isCancelled || m_isClosed) |
| + return Result::Done; |
| + if (!m_chunks.isEmpty()) { |
| + Chunk* chunk = m_chunks[0]; |
| + DCHECK_LE(m_offset, chunk->size()); |
| + *buffer = chunk->data() + m_offset; |
| + *available = chunk->size() - m_offset; |
| + m_chunkInUse = chunk; |
| + return Result::Ok; |
| + } |
| + switch (m_tee->getPublicState()) { |
| + case PublicState::ReadableOrWaiting: |
| + return Result::ShouldWait; |
| + case PublicState::Closed: |
| + m_isClosed = true; |
| + clearClient(); |
| + return Result::Done; |
| + case PublicState::Errored: |
| + clearClient(); |
| + return Result::Error; |
| + } |
| + NOTREACHED(); |
| + return Result::Error; |
| + } |
| + |
| + Result endRead(size_t read) override |
| + { |
| + DCHECK(m_chunkInUse); |
| + DCHECK(m_chunks.isEmpty() || m_chunkInUse == m_chunks[0]); |
| + m_chunkInUse = nullptr; |
| + if (m_chunks.isEmpty()) { |
| + // This object becomes errored during the two-phase read. |
| + DCHECK_EQ(PublicState::Errored, getPublicState()); |
| + return Result::Ok; |
| + } |
| + Chunk* chunk = m_chunks[0]; |
| + DCHECK_LE(m_offset + read, chunk->size()); |
| + m_offset += read; |
| + if (chunk->size() == m_offset) { |
| + m_offset = 0; |
| + m_chunks.removeFirst(); |
| + } |
| + if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Closed) { |
| + // All data has been consumed. |
| + TaskRunnerHelper::get(TaskType::Networking, m_executionContext)->postTask(BLINK_FROM_HERE, WTF::bind(&Destination::close, wrapPersistent(this))); |
| + } |
| + return Result::Ok; |
| + } |
| + |
| + void setClient(BytesConsumer::Client* client) override |
| + { |
| + DCHECK(!m_client); |
| + DCHECK(client); |
| + auto state = getPublicState(); |
| + if (state == PublicState::Closed || state == PublicState::Errored) |
| + return; |
| + m_client = client; |
| + } |
| + |
| + void clearClient() override |
| + { |
| + m_client = nullptr; |
| + } |
| + |
| + void cancel() override |
| + { |
| + DCHECK(!m_chunkInUse); |
| + auto state = getPublicState(); |
| + if (state == PublicState::Closed || state == PublicState::Errored) |
| + return; |
| + m_isCancelled = true; |
| + m_chunks.clear(); |
|
hiroshige
2016/09/08 08:25:54
How about calling clearChunks() to make |m_offset
yhirano
2016/09/08 09:41:34
Done.
|
| + clearClient(); |
| + m_tee->cancel(); |
| + } |
| + |
| + PublicState getPublicState() const override |
| + { |
| + if (m_isCancelled || m_isClosed) |
| + return PublicState::Closed; |
| + auto state = m_tee->getPublicState(); |
| + // We don't say this object is closed unless m_isCancelled or |
| + // m_isClosed is set. |
| + return state == PublicState::Closed ? PublicState::ReadableOrWaiting : state; |
| + } |
| + |
| + Error getError() const override { return m_tee->getError(); } |
| + |
| + String debugName() const override { return "Tee::Destination"; } |
| + |
| + void enqueue(Chunk* chunk) |
| + { |
| + if (m_isCancelled) |
| + return; |
| + m_chunks.append(chunk); |
| + } |
| + |
| + bool isEmpty() const { return m_chunks.isEmpty(); } |
| + |
| + void clearChunks() |
| + { |
| + m_chunks.clear(); |
| + m_offset = 0; |
| + } |
| + |
| + void notify() |
| + { |
| + if (m_isCancelled || m_isClosed) |
| + return; |
| + if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Closed) { |
| + close(); |
| + return; |
| + } |
| + if (m_client) { |
| + m_client->onStateChange(); |
| + if (getPublicState() == PublicState::Errored) |
| + clearClient(); |
| + } |
| + } |
| + |
| + bool isCancelled() const { return m_isCancelled; } |
| + |
| + DEFINE_INLINE_TRACE() |
| + { |
| + visitor->trace(m_executionContext); |
| + visitor->trace(m_tee); |
| + visitor->trace(m_client); |
| + visitor->trace(m_chunks); |
| + visitor->trace(m_chunkInUse); |
| + BytesConsumer::trace(visitor); |
| + } |
| + |
| + private: |
| + void close() |
| + { |
| + DCHECK_EQ(PublicState::Closed, m_tee->getPublicState()); |
| + DCHECK(m_chunks.isEmpty()); |
| + if (m_isClosed || m_isCancelled) { |
| + // It's possible to reach here because this function can be |
| + // called asynchronously. |
| + return; |
| + } |
| + DCHECK_EQ(PublicState::ReadableOrWaiting, getPublicState()); |
| + m_isClosed = true; |
| + if (m_client) { |
| + m_client->onStateChange(); |
| + clearClient(); |
| + } |
| + } |
| + |
| + Member<ExecutionContext> m_executionContext; |
| + Member<Tee> m_tee; |
| + Member<BytesConsumer::Client> m_client; |
| + HeapDeque<Member<Chunk>> m_chunks; |
| + Member<Chunk> m_chunkInUse; |
| + size_t m_offset = 0; |
| + bool m_isCancelled = false; |
| + bool m_isClosed = false; |
| + }; |
| + |
| + void clearAndNotify() |
| + { |
| + m_destination1->clearChunks(); |
| + m_destination2->clearChunks(); |
| + m_destination1->notify(); |
| + m_destination2->notify(); |
| + } |
| + |
| + Member<BytesConsumer> m_src; |
| + Member<Destination> m_destination1; |
| + Member<Destination> m_destination2; |
| +}; |
| + |
| +} // namespace |
| + |
| BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* readSize) |
| { |
| *readSize = 0; |
| @@ -22,4 +326,21 @@ BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea |
| return endRead(*readSize); |
| } |
| +void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2) |
| +{ |
| + RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizePolicy::AllowBlobWithInvalidSize); |
| + if (blobDataHandle) { |
| + // Register a client in order to be consistent. |
| + src->setClient(new NoopClient); |
| + // TODO(yhirano): Do not use FetchBlobDataConsumerHandle. |
| + *dest1 = new BytesConsumerForDataConsumerHandle(executionContext, FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle)); |
| + *dest2 = new BytesConsumerForDataConsumerHandle(executionContext, FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle)); |
| + return; |
| + } |
| + |
| + Tee* tee = new Tee(executionContext, src); |
| + *dest1 = tee->destination1(); |
| + *dest2 = tee->destination2(); |
| +} |
| + |
| } // namespace blink |