Chromium Code Reviews| Index: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| diff --git a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| index 89eb71958c9c00425f658ea58c3b47c73524fa2f..b716e84a37e9627d6db788d7ea1782e72cf8eb83 100644 |
| --- a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| +++ b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp |
| @@ -4,11 +4,274 @@ |
| #include "modules/fetch/BytesConsumer.h" |
| +#include "modules/fetch/BytesConsumerForDataConsumerHandle.h" |
| +#include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| +#include "platform/blob/BlobData.h" |
| +#include "wtf/RefPtr.h" |
| #include <algorithm> |
| #include <string.h> |
| namespace blink { |
| +namespace { |
| + |
| +class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public BytesConsumer::Client { |
| + USING_GARBAGE_COLLECTED_MIXIN(NoopClient); |
| +public: |
| + void onStateChange() override {} |
| +}; |
| + |
| +class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::Client { |
| + USING_GARBAGE_COLLECTED_MIXIN(Tee); |
| +public: |
| + explicit Tee(BytesConsumer* consumer) |
| + : m_src(consumer) |
| + , m_destination1(new Destination(this)) |
| + , m_destination2(new Destination(this)) |
| + { |
| + consumer->setClient(this); |
| + // As no client is set to either destinations, Destination::notify() is |
| + // no-op in this function. |
| + onStateChange(); |
| + } |
| + |
| + void onStateChange() override |
| + { |
| + bool destination1WasEmpty = m_destination1->isEmpty(); |
| + bool destination2WasEmpty = m_destination2->isEmpty(); |
| + bool hasEnqueued = false; |
| + |
| + while (true) { |
| + const char* buffer = nullptr; |
| + size_t available = 0; |
| + Result result = m_src->beginRead(&buffer, &available); |
| + if (result == Result::ShouldWait) { |
| + if (hasEnqueued && destination1WasEmpty) |
| + m_destination1->notify(); |
| + if (hasEnqueued && destination2WasEmpty) |
| + m_destination2->notify(); |
| + return; |
| + } |
| + if (result == Result::Done) { |
| + if (destination1WasEmpty) |
| + m_destination1->notify(); |
| + if (destination2WasEmpty) |
| + m_destination2->notify(); |
| + return; |
| + } |
| + if (result == Result::Error) { |
| + clearAndNotify(); |
| + return; |
| + } |
| + DCHECK_EQ(Result::Ok, result); |
| + Bytes* bytes = new Bytes(buffer, available); |
| + if (m_src->endRead(available) != Result::Ok) { |
| + clearAndNotify(); |
| + return; |
| + } |
| + m_destination1->enqueue(bytes); |
| + m_destination2->enqueue(bytes); |
| + hasEnqueued = true; |
| + } |
| + } |
| + |
| + BytesConsumer::PublicState getPublicState() const |
| + { |
| + return m_src->getPublicState(); |
| + } |
| + |
| + BytesConsumer::Error getError() const |
| + { |
| + return m_src->getError(); |
| + } |
| + |
| + void cancel() |
| + { |
| + if (!m_destination1->isCancelled() || !m_destination2->isCancelled()) |
| + return; |
| + m_src->cancel(); |
| + } |
| + |
| + BytesConsumer* destination1() const { return m_destination1; } |
| + BytesConsumer* destination2() const { return m_destination2; } |
| + |
| + DEFINE_INLINE_TRACE() |
| + { |
| + visitor->trace(m_src); |
| + visitor->trace(m_destination1); |
| + visitor->trace(m_destination2); |
| + BytesConsumer::Client::trace(visitor); |
| + } |
| + |
| +private: |
| + using Result = BytesConsumer::Result; |
| + class Bytes final : public GarbageCollectedFinalized<Bytes> { |
| + public: |
| + Bytes(const char* data, size_t size) |
| + { |
| + m_buffer.reserveCapacity(size); |
| + m_buffer.append(data, size); |
| + } |
| + const char* data() const { return m_buffer.data(); } |
| + size_t size() const { return m_buffer.size(); } |
| + |
| + DEFINE_INLINE_TRACE() {} |
| + |
| + private: |
| + Vector<char> m_buffer; |
| + }; |
| + |
| + class Destination final : public BytesConsumer { |
| + public: |
| + explicit Destination(Tee* tee) |
| + : m_tee(tee) |
| + { |
| + } |
| + |
| + Result beginRead(const char** buffer, size_t* available) override |
| + { |
| + *buffer = nullptr; |
| + *available = 0; |
| + if (m_isCancelled) |
| + return Result::Done; |
| + if (m_chunks.isEmpty()) { |
|
hiroshige
2016/08/30 07:42:47
How about turning this if statement into |if (!m_c
yhirano
2016/09/02 11:17:19
Done.
|
| + switch (m_tee->getPublicState()) { |
| + case BytesConsumer::PublicState::ReadableOrWaiting: |
| + return Result::ShouldWait; |
| + case BytesConsumer::PublicState::Closed: |
| + clearClientIfClosedOrErrored(); |
|
hiroshige
2016/08/30 07:42:47
Is clearClient() sufficient?
yhirano
2016/09/02 11:17:19
Done.
|
| + return Result::Done; |
| + case BytesConsumer::PublicState::Errored: |
| + clearClientIfClosedOrErrored(); |
|
hiroshige
2016/08/30 07:42:47
ditto.
yhirano
2016/09/02 11:17:19
Done.
|
| + return Result::Error; |
| + } |
| + } |
| + Bytes* bytes = m_chunks[0]; |
| + DCHECK_LE(m_offset, bytes->size()); |
| + *buffer = bytes->data() + m_offset; |
| + *available = bytes->size() - m_offset; |
| + return Result::Ok; |
| + } |
| + |
| + Result endRead(size_t read) override |
| + { |
| + DCHECK(!m_chunks.isEmpty()); |
|
hiroshige
2016/08/30 07:47:38
This DCHECK means clearChunks() and thus onStateCh
yhirano
2016/09/02 11:17:19
Done.
|
| + Bytes* bytes = m_chunks[0]; |
| + DCHECK_LE(m_offset + read, bytes->size()); |
| + if (bytes->size() == m_offset + read) { |
| + m_offset = 0; |
| + m_chunks.removeFirst(); |
| + return Result::Ok; |
| + } |
| + m_offset += read; |
| + return Result::Ok; |
| + } |
| + |
| + void setClient(BytesConsumer::Client* client) override |
| + { |
| + DCHECK(!m_client); |
| + DCHECK(client); |
| + auto state = getPublicState(); |
| + if (state == PublicState::Closed || state == PublicState::Errored) |
| + return; |
| + m_client = client; |
| + } |
| + |
| + void clearClient() override |
| + { |
| + DCHECK(m_client || getPublicState() == PublicState::Closed || getPublicState() == PublicState::Errored); |
| + m_client = nullptr; |
| + } |
| + |
| + void cancel() override |
| + { |
| + auto state = getPublicState(); |
| + if (state == PublicState::Closed || state == PublicState::Errored) |
|
hiroshige
2016/08/30 07:42:47
When |m_isCancelled| is false and |m_chunks.isEmpt
yhirano
2016/09/02 11:17:19
Done.
|
| + return; |
| + m_isCancelled = true; |
| + m_chunks.clear(); |
| + clearClient(); |
| + m_tee->cancel(); |
| + } |
| + |
| + PublicState getPublicState() const override |
| + { |
| + if (m_isCancelled) |
| + return PublicState::Closed; |
| + if (!m_chunks.isEmpty()) |
| + return PublicState::ReadableOrWaiting; |
| + return m_tee->getPublicState(); |
| + } |
| + |
| + Error getError() const override { return m_tee->getError(); } |
| + |
| + String debugName() const override { return "Tee::Destination"; } |
| + |
| + void enqueue(Bytes* chunk) |
| + { |
| + if (m_isCancelled) |
| + return; |
| + m_chunks.append(chunk); |
| + } |
| + |
| + bool isEmpty() const { return m_chunks.isEmpty(); } |
| + |
| + void clearChunks() |
| + { |
| + m_chunks.clear(); |
| + m_offset = 0; |
| + } |
| + |
| + void notify() |
| + { |
| + if (m_client) { |
| + DCHECK(!m_isCancelled); |
| + m_client->onStateChange(); |
| + clearClientIfClosedOrErrored(); |
| + } |
| + } |
| + |
| + bool isCancelled() const { return m_isCancelled; } |
| + |
| + DEFINE_INLINE_TRACE() |
| + { |
| + visitor->trace(m_tee); |
| + visitor->trace(m_client); |
| + visitor->trace(m_chunks); |
| + BytesConsumer::trace(visitor); |
| + } |
| + |
| + private: |
| + void clearClientIfClosedOrErrored() |
| + { |
| + auto state = getPublicState(); |
| + if (state == PublicState::Closed || state == PublicState::Errored) |
| + clearClient(); |
| + } |
| + |
| + Member<Tee> m_tee; |
| + Member<BytesConsumer::Client> m_client; |
| + HeapDeque<Member<Bytes>> m_chunks; |
| + size_t m_offset = 0; |
| + bool m_isCancelled = false; |
| + }; |
| + |
| + |
| + void clearAndNotify() |
| + { |
| + m_destination1->clearChunks(); |
| + m_destination2->clearChunks(); |
| + m_destination1->notify(); |
| + m_destination2->notify(); |
| + } |
| + |
| + Member<BytesConsumer> m_src; |
| + Member<Destination> m_destination1; |
| + Member<Destination> m_destination2; |
| +}; |
| + |
| +} // namespace |
| + |
| BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* readSize) |
| { |
| *readSize = 0; |
| @@ -22,4 +285,21 @@ BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea |
| return endRead(*readSize); |
| } |
| +void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2) |
| +{ |
| + RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizePolicy::AllowBlobWithInvalidSize); |
| + if (blobDataHandle) { |
| + // Register a client in order to be consistent. |
| + src->setClient(new NoopClient); |
| + // TODO(yhirano): Do not use FetchBlobDataConsumerHandle. |
| + *dest1 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle)); |
| + *dest2 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle)); |
| + return; |
| + } |
| + |
| + Tee* tee = new Tee(src); |
| + *dest1 = tee->destination1(); |
| + *dest2 = tee->destination2(); |
| +} |
| + |
| } // namespace blink |