Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(854)

Unified Diff: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp

Issue 2269953004: Implment BytesConsumer::tee (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
index 89eb71958c9c00425f658ea58c3b47c73524fa2f..b716e84a37e9627d6db788d7ea1782e72cf8eb83 100644
--- a/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
+++ b/third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp
@@ -4,11 +4,274 @@
#include "modules/fetch/BytesConsumer.h"
+#include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
+#include "modules/fetch/FetchBlobDataConsumerHandle.h"
+#include "platform/blob/BlobData.h"
+#include "wtf/RefPtr.h"
#include <algorithm>
#include <string.h>
namespace blink {
+namespace {
+
+class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public BytesConsumer::Client {
+ USING_GARBAGE_COLLECTED_MIXIN(NoopClient);
+public:
+ void onStateChange() override {}
+};
+
+class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::Client {
+ USING_GARBAGE_COLLECTED_MIXIN(Tee);
+public:
+ explicit Tee(BytesConsumer* consumer)
+ : m_src(consumer)
+ , m_destination1(new Destination(this))
+ , m_destination2(new Destination(this))
+ {
+ consumer->setClient(this);
+ // As no client is set to either destinations, Destination::notify() is
+ // no-op in this function.
+ onStateChange();
+ }
+
+ void onStateChange() override
+ {
+ bool destination1WasEmpty = m_destination1->isEmpty();
+ bool destination2WasEmpty = m_destination2->isEmpty();
+ bool hasEnqueued = false;
+
+ while (true) {
+ const char* buffer = nullptr;
+ size_t available = 0;
+ Result result = m_src->beginRead(&buffer, &available);
+ if (result == Result::ShouldWait) {
+ if (hasEnqueued && destination1WasEmpty)
+ m_destination1->notify();
+ if (hasEnqueued && destination2WasEmpty)
+ m_destination2->notify();
+ return;
+ }
+ if (result == Result::Done) {
+ if (destination1WasEmpty)
+ m_destination1->notify();
+ if (destination2WasEmpty)
+ m_destination2->notify();
+ return;
+ }
+ if (result == Result::Error) {
+ clearAndNotify();
+ return;
+ }
+ DCHECK_EQ(Result::Ok, result);
+ Bytes* bytes = new Bytes(buffer, available);
+ if (m_src->endRead(available) != Result::Ok) {
+ clearAndNotify();
+ return;
+ }
+ m_destination1->enqueue(bytes);
+ m_destination2->enqueue(bytes);
+ hasEnqueued = true;
+ }
+ }
+
+ BytesConsumer::PublicState getPublicState() const
+ {
+ return m_src->getPublicState();
+ }
+
+ BytesConsumer::Error getError() const
+ {
+ return m_src->getError();
+ }
+
+ void cancel()
+ {
+ if (!m_destination1->isCancelled() || !m_destination2->isCancelled())
+ return;
+ m_src->cancel();
+ }
+
+ BytesConsumer* destination1() const { return m_destination1; }
+ BytesConsumer* destination2() const { return m_destination2; }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_src);
+ visitor->trace(m_destination1);
+ visitor->trace(m_destination2);
+ BytesConsumer::Client::trace(visitor);
+ }
+
+private:
+ using Result = BytesConsumer::Result;
+ class Bytes final : public GarbageCollectedFinalized<Bytes> {
+ public:
+ Bytes(const char* data, size_t size)
+ {
+ m_buffer.reserveCapacity(size);
+ m_buffer.append(data, size);
+ }
+ const char* data() const { return m_buffer.data(); }
+ size_t size() const { return m_buffer.size(); }
+
+ DEFINE_INLINE_TRACE() {}
+
+ private:
+ Vector<char> m_buffer;
+ };
+
+ class Destination final : public BytesConsumer {
+ public:
+ explicit Destination(Tee* tee)
+ : m_tee(tee)
+ {
+ }
+
+ Result beginRead(const char** buffer, size_t* available) override
+ {
+ *buffer = nullptr;
+ *available = 0;
+ if (m_isCancelled)
+ return Result::Done;
+ if (m_chunks.isEmpty()) {
hiroshige 2016/08/30 07:42:47 How about turning this if statement into |if (!m_c
yhirano 2016/09/02 11:17:19 Done.
+ switch (m_tee->getPublicState()) {
+ case BytesConsumer::PublicState::ReadableOrWaiting:
+ return Result::ShouldWait;
+ case BytesConsumer::PublicState::Closed:
+ clearClientIfClosedOrErrored();
hiroshige 2016/08/30 07:42:47 Is clearClient() sufficient?
yhirano 2016/09/02 11:17:19 Done.
+ return Result::Done;
+ case BytesConsumer::PublicState::Errored:
+ clearClientIfClosedOrErrored();
hiroshige 2016/08/30 07:42:47 ditto.
yhirano 2016/09/02 11:17:19 Done.
+ return Result::Error;
+ }
+ }
+ Bytes* bytes = m_chunks[0];
+ DCHECK_LE(m_offset, bytes->size());
+ *buffer = bytes->data() + m_offset;
+ *available = bytes->size() - m_offset;
+ return Result::Ok;
+ }
+
+ Result endRead(size_t read) override
+ {
+ DCHECK(!m_chunks.isEmpty());
hiroshige 2016/08/30 07:47:38 This DCHECK means clearChunks() and thus onStateCh
yhirano 2016/09/02 11:17:19 Done.
+ Bytes* bytes = m_chunks[0];
+ DCHECK_LE(m_offset + read, bytes->size());
+ if (bytes->size() == m_offset + read) {
+ m_offset = 0;
+ m_chunks.removeFirst();
+ return Result::Ok;
+ }
+ m_offset += read;
+ return Result::Ok;
+ }
+
+ void setClient(BytesConsumer::Client* client) override
+ {
+ DCHECK(!m_client);
+ DCHECK(client);
+ auto state = getPublicState();
+ if (state == PublicState::Closed || state == PublicState::Errored)
+ return;
+ m_client = client;
+ }
+
+ void clearClient() override
+ {
+ DCHECK(m_client || getPublicState() == PublicState::Closed || getPublicState() == PublicState::Errored);
+ m_client = nullptr;
+ }
+
+ void cancel() override
+ {
+ auto state = getPublicState();
+ if (state == PublicState::Closed || state == PublicState::Errored)
hiroshige 2016/08/30 07:42:47 When |m_isCancelled| is false and |m_chunks.isEmpt
yhirano 2016/09/02 11:17:19 Done.
+ return;
+ m_isCancelled = true;
+ m_chunks.clear();
+ clearClient();
+ m_tee->cancel();
+ }
+
+ PublicState getPublicState() const override
+ {
+ if (m_isCancelled)
+ return PublicState::Closed;
+ if (!m_chunks.isEmpty())
+ return PublicState::ReadableOrWaiting;
+ return m_tee->getPublicState();
+ }
+
+ Error getError() const override { return m_tee->getError(); }
+
+ String debugName() const override { return "Tee::Destination"; }
+
+ void enqueue(Bytes* chunk)
+ {
+ if (m_isCancelled)
+ return;
+ m_chunks.append(chunk);
+ }
+
+ bool isEmpty() const { return m_chunks.isEmpty(); }
+
+ void clearChunks()
+ {
+ m_chunks.clear();
+ m_offset = 0;
+ }
+
+ void notify()
+ {
+ if (m_client) {
+ DCHECK(!m_isCancelled);
+ m_client->onStateChange();
+ clearClientIfClosedOrErrored();
+ }
+ }
+
+ bool isCancelled() const { return m_isCancelled; }
+
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_tee);
+ visitor->trace(m_client);
+ visitor->trace(m_chunks);
+ BytesConsumer::trace(visitor);
+ }
+
+ private:
+ void clearClientIfClosedOrErrored()
+ {
+ auto state = getPublicState();
+ if (state == PublicState::Closed || state == PublicState::Errored)
+ clearClient();
+ }
+
+ Member<Tee> m_tee;
+ Member<BytesConsumer::Client> m_client;
+ HeapDeque<Member<Bytes>> m_chunks;
+ size_t m_offset = 0;
+ bool m_isCancelled = false;
+ };
+
+
+ void clearAndNotify()
+ {
+ m_destination1->clearChunks();
+ m_destination2->clearChunks();
+ m_destination1->notify();
+ m_destination2->notify();
+ }
+
+ Member<BytesConsumer> m_src;
+ Member<Destination> m_destination1;
+ Member<Destination> m_destination2;
+};
+
+} // namespace
+
BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* readSize)
{
*readSize = 0;
@@ -22,4 +285,21 @@ BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea
return endRead(*readSize);
}
+void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2)
+{
+ RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizePolicy::AllowBlobWithInvalidSize);
+ if (blobDataHandle) {
+ // Register a client in order to be consistent.
+ src->setClient(new NoopClient);
+ // TODO(yhirano): Do not use FetchBlobDataConsumerHandle.
+ *dest1 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle));
+ *dest2 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle));
+ return;
+ }
+
+ Tee* tee = new Tee(src);
+ *dest1 = tee->destination1();
+ *dest2 = tee->destination2();
+}
+
} // namespace blink
« no previous file with comments | « third_party/WebKit/Source/modules/fetch/BytesConsumer.h ('k') | third_party/WebKit/Source/modules/fetch/BytesConsumerTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698