Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(609)

Unified Diff: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp

Issue 2269953004: Implment BytesConsumer::tee (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
index 89eb71958c9c00425f658ea58c3b47c73524fa2f..e003f2a2aebe7e8cc185cb3d8c5fb418e2a62305 100644
--- a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
+++ b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
@@ -4,11 +4,315 @@
#include "modules/fetch/BytesConsumer.h"
+#include "core/dom/ExecutionContext.h"
+#include "core/dom/TaskRunnerHelper.h"
+#include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
+#include "modules/fetch/FetchBlobDataConsumerHandle.h"
+#include "platform/blob/BlobData.h"
+#include "public/platform/WebTaskRunner.h"
+#include "wtf/Functional.h"
+#include "wtf/RefPtr.h"
#include <algorithm>
#include <string.h>
namespace blink {
+namespace {
+
+class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public BytesConsumer::Client {
+ USING_GARBAGE_COLLECTED_MIXIN(NoopClient);
+public:
+ void onStateChange() override {}
+};
+
+class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::Client {
+ USING_GARBAGE_COLLECTED_MIXIN(Tee);
+public:
+ Tee(ExecutionContext* executionContext, BytesConsumer* consumer)
+ : m_src(consumer)
+ , m_destination1(new Destination(executionContext, this))
+ , m_destination2(new Destination(executionContext, this))
+ {
+ consumer->setClient(this);
+ // As no client is set to either destinations, Destination::notify() is
+ // no-op in this function.
+ onStateChange();
+ }
+
+ void onStateChange() override
+ {
+ bool destination1WasEmpty = m_destination1->isEmpty();
+ bool destination2WasEmpty = m_destination2->isEmpty();
+ bool hasEnqueued = false;
+
+ while (true) {
+ const char* buffer = nullptr;
+ size_t available = 0;
+ switch (m_src->beginRead(&buffer, &available)) {
+ case Result::Ok: {
+ Chunk* chunk = new Chunk(buffer, available);
+ if (m_src->endRead(available) != Result::Ok) {
+ clearAndNotify();
+ return;
+ }
+ m_destination1->enqueue(chunk);
+ m_destination2->enqueue(chunk);
+ hasEnqueued = true;
+ break;
+ }
+ case Result::ShouldWait:
+ if (hasEnqueued && destination1WasEmpty)
+ m_destination1->notify();
+ if (hasEnqueued && destination2WasEmpty)
+ m_destination2->notify();
+ return;
+ case Result::Done:
+ if (destination1WasEmpty)
+ m_destination1->notify();
+ if (destination2WasEmpty)
+ m_destination2->notify();
+ return;
+ case Result::Error:
+ clearAndNotify();
+ return;
+ }
+ }
+ }
+
+ BytesConsumer::PublicState getPublicState() const
+ {
+ return m_src->getPublicState();
+ }
+
+ BytesConsumer::Error getError() const
+ {
+ return m_src->getError();
+ }
+
+ void cancel()
+ {
+ if (!m_destination1->isCancelled() || !m_destination2->isCancelled())
+ return;
+ m_src->cancel();
+ }
+
+ BytesConsumer* destination1() const { return m_destination1; }
+ BytesConsumer* destination2() const { return m_destination2; }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_src);
+ visitor->trace(m_destination1);
+ visitor->trace(m_destination2);
+ BytesConsumer::Client::trace(visitor);
+ }
+
+private:
+ using Result = BytesConsumer::Result;
+ class Chunk final : public GarbageCollectedFinalized<Chunk> {
+ public:
+ Chunk(const char* data, size_t size)
+ {
+ m_buffer.reserveInitialCapacity(size);
+ m_buffer.append(data, size);
+ }
+ const char* data() const { return m_buffer.data(); }
+ size_t size() const { return m_buffer.size(); }
+
+ DEFINE_INLINE_TRACE() {}
+
+ private:
+ Vector<char> m_buffer;
+ };
+
+ class Destination final : public BytesConsumer {
+ public:
+ Destination(ExecutionContext* executionContext, Tee* tee)
+ : m_executionContext(executionContext)
+ , m_tee(tee)
+ {
+ }
+
+ Result beginRead(const char** buffer, size_t* available) override
+ {
+ DCHECK(!m_chunkInUse);
+ *buffer = nullptr;
+ *available = 0;
+ if (m_isCancelled || m_isClosed)
+ return Result::Done;
+ if (!m_chunks.isEmpty()) {
+ Chunk* chunk = m_chunks[0];
+ DCHECK_LE(m_offset, chunk->size());
+ *buffer = chunk->data() + m_offset;
+ *available = chunk->size() - m_offset;
+ m_chunkInUse = chunk;
+ return Result::Ok;
+ }
+ switch (m_tee->getPublicState()) {
+ case PublicState::ReadableOrWaiting:
+ return Result::ShouldWait;
+ case PublicState::Closed:
+ m_isClosed = true;
+ clearClient();
+ return Result::Done;
+ case PublicState::Errored:
+ clearClient();
+ return Result::Error;
+ }
+ NOTREACHED();
+ return Result::Error;
+ }
+
+ Result endRead(size_t read) override
+ {
+ DCHECK(m_chunkInUse);
+ DCHECK(m_chunks.isEmpty() || m_chunkInUse == m_chunks[0]);
+ m_chunkInUse = nullptr;
+ if (m_chunks.isEmpty()) {
+ // This object becomes errored during the two-phase read.
+ DCHECK_EQ(PublicState::Errored, getPublicState());
+ return Result::Ok;
+ }
+ Chunk* chunk = m_chunks[0];
+ DCHECK_LE(m_offset + read, chunk->size());
+ m_offset += read;
+ if (chunk->size() == m_offset) {
+ m_offset = 0;
+ m_chunks.removeFirst();
+ }
+ if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Closed) {
+ // All data has been consumed.
+ TaskRunnerHelper::get(TaskType::Networking, m_executionContext)->postTask(BLINK_FROM_HERE, WTF::bind(&Destination::close, wrapPersistent(this)));
+ }
+ return Result::Ok;
+ }
+
+ void setClient(BytesConsumer::Client* client) override
+ {
+ DCHECK(!m_client);
+ DCHECK(client);
+ auto state = getPublicState();
+ if (state == PublicState::Closed || state == PublicState::Errored)
+ return;
+ m_client = client;
+ }
+
+ void clearClient() override
+ {
+ m_client = nullptr;
+ }
+
+ void cancel() override
+ {
+ DCHECK(!m_chunkInUse);
+ auto state = getPublicState();
+ if (state == PublicState::Closed || state == PublicState::Errored)
+ return;
+ m_isCancelled = true;
+ clearChunks();
+ clearClient();
+ m_tee->cancel();
+ }
+
+ PublicState getPublicState() const override
+ {
+ if (m_isCancelled || m_isClosed)
+ return PublicState::Closed;
+ auto state = m_tee->getPublicState();
+ // We don't say this object is closed unless m_isCancelled or
+ // m_isClosed is set.
+ return state == PublicState::Closed ? PublicState::ReadableOrWaiting : state;
+ }
+
+ Error getError() const override { return m_tee->getError(); }
+
+ String debugName() const override { return "Tee::Destination"; }
+
+ void enqueue(Chunk* chunk)
+ {
+ if (m_isCancelled)
+ return;
+ m_chunks.append(chunk);
+ }
+
+ bool isEmpty() const { return m_chunks.isEmpty(); }
+
+ void clearChunks()
+ {
+ m_chunks.clear();
+ m_offset = 0;
+ }
+
+ void notify()
+ {
+ if (m_isCancelled || m_isClosed)
+ return;
+ if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Closed) {
+ close();
+ return;
+ }
+ if (m_client) {
+ m_client->onStateChange();
+ if (getPublicState() == PublicState::Errored)
+ clearClient();
+ }
+ }
+
+ bool isCancelled() const { return m_isCancelled; }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_executionContext);
+ visitor->trace(m_tee);
+ visitor->trace(m_client);
+ visitor->trace(m_chunks);
+ visitor->trace(m_chunkInUse);
+ BytesConsumer::trace(visitor);
+ }
+
+ private:
+ void close()
+ {
+ DCHECK_EQ(PublicState::Closed, m_tee->getPublicState());
+ DCHECK(m_chunks.isEmpty());
+ if (m_isClosed || m_isCancelled) {
+ // It's possible to reach here because this function can be
+ // called asynchronously.
+ return;
+ }
+ DCHECK_EQ(PublicState::ReadableOrWaiting, getPublicState());
+ m_isClosed = true;
+ if (m_client) {
+ m_client->onStateChange();
+ clearClient();
+ }
+ }
+
+ Member<ExecutionContext> m_executionContext;
+ Member<Tee> m_tee;
+ Member<BytesConsumer::Client> m_client;
+ HeapDeque<Member<Chunk>> m_chunks;
+ Member<Chunk> m_chunkInUse;
+ size_t m_offset = 0;
+ bool m_isCancelled = false;
+ bool m_isClosed = false;
+ };
+
+ void clearAndNotify()
+ {
+ m_destination1->clearChunks();
+ m_destination2->clearChunks();
+ m_destination1->notify();
+ m_destination2->notify();
+ }
+
+ Member<BytesConsumer> m_src;
+ Member<Destination> m_destination1;
+ Member<Destination> m_destination2;
+};
+
+} // namespace
+
BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* readSize)
{
*readSize = 0;
@@ -22,4 +326,21 @@ BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea
return endRead(*readSize);
}
+void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2)
+{
+ RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizePolicy::AllowBlobWithInvalidSize);
+ if (blobDataHandle) {
+ // Register a client in order to be consistent.
+ src->setClient(new NoopClient);
+ // TODO(yhirano): Do not use FetchBlobDataConsumerHandle.
+ *dest1 = new BytesConsumerForDataConsumerHandle(executionContext, FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle));
+ *dest2 = new BytesConsumerForDataConsumerHandle(executionContext, FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle));
+ return;
+ }
+
+ Tee* tee = new Tee(executionContext, src);
+ *dest1 = tee->destination1();
+ *dest2 = tee->destination2();
+}
+
} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698