Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(931)

Unified Diff: third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp

Issue 2277143002: Use BytesConsumer in BodyStreamBuffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@bytes-consumer-tee
Patch Set: rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp b/third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp
deleted file mode 100644
index 30f5d0d5dc2f39a761d45b03def3414ed41d0a85..0000000000000000000000000000000000000000
--- a/third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp
+++ /dev/null
@@ -1,439 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "modules/fetch/DataConsumerTee.h"
-
-#include "core/dom/ActiveDOMObject.h"
-#include "core/dom/ExecutionContext.h"
-#include "modules/fetch/DataConsumerHandleUtil.h"
-#include "modules/fetch/FetchBlobDataConsumerHandle.h"
-#include "platform/CrossThreadFunctional.h"
-#include "platform/heap/Handle.h"
-#include "public/platform/Platform.h"
-#include "public/platform/WebTaskRunner.h"
-#include "public/platform/WebThread.h"
-#include "public/platform/WebTraceLocation.h"
-#include "wtf/Deque.h"
-#include "wtf/Functional.h"
-#include "wtf/PtrUtil.h"
-#include "wtf/ThreadSafeRefCounted.h"
-#include "wtf/ThreadingPrimitives.h"
-#include "wtf/Vector.h"
-#include <memory>
-
-namespace blink {
-
-using Result = WebDataConsumerHandle::Result;
-using Flags = WebDataConsumerHandle::Flags;
-
-namespace {
-
-// This file contains the "tee" implementation. There are several classes and
-// their relationship is complicated, so let me describe here.
-//
-// Tee::create function creates two DestinationHandles (destinations) from one
-// WebDataConsumerHandle (source). In fact, it uses a reader of the source
-// handle.
-//
-// SourceContext reads data from the source reader and enques it to two
-// destination contexts. Destination readers read data from its associated
-// contexts. Here is an object graph.
-//
-// R: the root object
-// SR: the source reader
-// SC: the SourceContext
-// DCn: nth DestinationContext
-// DRn: nth DestinationReader
-// DHn: nth DestinationHandle
-// ---------
-// (normal)
-// ---> DC1 <--- DR1 / DH1
-// |
-// |
-// SR <--SC <-> R
-// |
-// |
-// ---> DC2 <--- DR2 / DH2
-//
-// ---------
-//
-// The root object (R) refers to the SourceContext, and is referred by many
-// objects including the SourceContext. As the root object is a
-// ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
-// The root object only has "stop" function that breaks the reference cycle.
-// It will be called when:
-// - The source context finishes reading,
-// - The source context gets errored while reading,
-// - The execution context associated with the source context is stopped or
-// - All destination handles and readers are gone.
-//
-// ---------
-// (stopped)
-// ---> DC1 <--- DR1 / DH1
-// |
-// |
-// SR <--SC --> R
-// |
-// |
-// ---> DC2 <--- DR2 / DH2
-//
-// -------
-// When |stop| is called, no one has a strong reference to the source context
-// and it will be collected.
-//
-
-class SourceContext;
-
-class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
-public:
- static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObject()); }
-
- void initialize(SourceContext* sourceContext)
- {
- m_sourceContext = sourceContext;
- }
-
- // This function can be called from any thread.
- void stop()
- {
- m_sourceContext = nullptr;
- }
-
-private:
- TeeRootObject() = default;
-
- CrossThreadPersistent<SourceContext> m_sourceContext;
-};
-
-class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
-public:
- static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(std::move(root))); }
- ~DestinationTracker()
- {
- m_root->stop();
- }
-
-private:
- explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) { }
-
- RefPtr<TeeRootObject> m_root;
-};
-
-class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
-public:
- class Proxy : public ThreadSafeRefCounted<Proxy> {
- public:
- static PassRefPtr<Proxy> create(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker)
- {
- return adoptRef(new Proxy(std::move(context), std::move(tracker)));
- }
- ~Proxy()
- {
- m_context->detach();
- }
-
- DestinationContext* context() { return m_context.get(); }
-
- private:
- Proxy(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker) : m_context(context), m_tracker(tracker) { }
-
- RefPtr<DestinationContext> m_context;
- RefPtr<DestinationTracker> m_tracker;
- };
-
- static PassRefPtr<DestinationContext> create() { return adoptRef(new DestinationContext()); }
-
- void enqueue(const char* buffer, size_t size)
- {
- bool needsNotification = false;
- {
- MutexLocker locker(m_mutex);
- needsNotification = m_queue.isEmpty();
- std::unique_ptr<Vector<char>> data = wrapUnique(new Vector<char>);
- data->append(buffer, size);
- m_queue.append(std::move(data));
- }
- if (needsNotification)
- notify();
- }
-
- void setResult(Result r)
- {
- DCHECK(r != WebDataConsumerHandle::Ok);
- DCHECK(r != WebDataConsumerHandle::ShouldWait);
- {
- MutexLocker locker(m_mutex);
- if (m_result != WebDataConsumerHandle::ShouldWait) {
- // The result was already set.
- return;
- }
- m_result = r;
- if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
- m_queue.clear();
- }
- notify();
- }
-
- void notify()
- {
- {
- MutexLocker locker(m_mutex);
- if (!m_client) {
- // No client is registered.
- return;
- }
- DCHECK(m_readerThread);
- if (!m_readerThread->isCurrentThread()) {
- m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this)));
- return;
- }
- }
- // The reading thread is the current thread.
- if (m_client)
- m_client->didGetReadable();
- }
-
- Mutex& mutex() { return m_mutex; }
-
- // The following functions don't use lock. They should be protected by the
- // caller.
- void attachReader(WebDataConsumerHandle::Client* client)
- {
- DCHECK(!m_readerThread);
- DCHECK(!m_client);
- m_readerThread = Platform::current()->currentThread();
- m_client = client;
- }
- void detachReader()
- {
- DCHECK(m_readerThread && m_readerThread->isCurrentThread());
- m_readerThread = nullptr;
- m_client = nullptr;
- }
- const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); }
- bool isEmpty() const { return m_queue.isEmpty(); }
- size_t offset() const { return m_offset; }
- void consume(size_t size)
- {
- const auto& top = m_queue.first();
- DCHECK(m_offset <= m_offset + size);
- DCHECK(m_offset + size <= top->size());
- if (top->size() <= m_offset + size) {
- m_offset = 0;
- m_queue.removeFirst();
- } else {
- m_offset += size;
- }
- }
- Result getResult() { return m_result; }
-
-private:
- DestinationContext()
- : m_result(WebDataConsumerHandle::ShouldWait)
- , m_readerThread(nullptr)
- , m_client(nullptr)
- , m_offset(0)
- , m_isTwoPhaseReadInProgress(false)
- {
- }
-
- void detach()
- {
- MutexLocker locker(m_mutex);
- DCHECK(!m_client);
- DCHECK(!m_readerThread);
- m_queue.clear();
- }
-
- Result m_result;
- Deque<std::unique_ptr<Vector<char>>> m_queue;
- // Note: Holding a WebThread raw pointer is not generally safe, but we can
- // do that in this case because:
- // 1. Destructing a ReaderImpl when the bound thread ends is a user's
- // responsibility.
- // 2. |m_readerThread| will never be used after the associated reader is
- // detached.
- WebThread* m_readerThread;
- WebDataConsumerHandle::Client* m_client;
- size_t m_offset;
- bool m_isTwoPhaseReadInProgress;
- Mutex m_mutex;
-};
-
-class DestinationReader final : public WebDataConsumerHandle::Reader {
-public:
- DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDataConsumerHandle::Client* client)
- : m_contextProxy(contextProxy)
- {
- MutexLocker locker(context()->mutex());
- context()->attachReader(client);
- if (client) {
- // We need to use crossThreadBind here to retain the context. Note
- // |context()| return value is of type DestinationContext*, not
- // PassRefPtr<DestinationContext>.
- Platform::current()->currentThread()->getWebTaskRunner()->postTask(BLINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(context())));
- }
- }
- ~DestinationReader() override
- {
- MutexLocker locker(context()->mutex());
- context()->detachReader();
- }
-
- Result beginRead(const void** buffer, Flags, size_t* available) override
- {
- MutexLocker locker(context()->mutex());
- *available = 0;
- *buffer = nullptr;
- if (context()->isEmpty())
- return context()->getResult();
-
- const std::unique_ptr<Vector<char>>& chunk = context()->top();
- *available = chunk->size() - context()->offset();
- *buffer = chunk->data() + context()->offset();
- return WebDataConsumerHandle::Ok;
- }
-
- Result endRead(size_t readSize) override
- {
- MutexLocker locker(context()->mutex());
- if (context()->isEmpty())
- return WebDataConsumerHandle::UnexpectedError;
- context()->consume(readSize);
- return WebDataConsumerHandle::Ok;
- }
-
-private:
- DestinationContext* context() { return m_contextProxy->context(); }
-
- RefPtr<DestinationContext::Proxy> m_contextProxy;
-};
-
-class DestinationHandle final : public WebDataConsumerHandle {
-public:
- static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationContext::Proxy> contextProxy)
- {
- return wrapUnique(new DestinationHandle(std::move(contextProxy)));
- }
-
- std::unique_ptr<Reader> obtainReader(Client* client)
- {
- return wrapUnique(new DestinationReader(m_contextProxy, client));
- }
-
-private:
- DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_contextProxy(contextProxy) { }
- const char* debugName() const override { return "DestinationHandle"; }
-
- RefPtr<DestinationContext::Proxy> m_contextProxy;
-};
-
-// Bound to the created thread.
-class SourceContext final : public GarbageCollectedFinalized<SourceContext>, public ActiveDOMObject, public WebDataConsumerHandle::Client {
- USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
-public:
- SourceContext(
- PassRefPtr<TeeRootObject> root,
- std::unique_ptr<WebDataConsumerHandle> src,
- PassRefPtr<DestinationContext> dest1,
- PassRefPtr<DestinationContext> dest2,
- ExecutionContext* executionContext)
- : ActiveDOMObject(executionContext)
- , m_root(root)
- , m_reader(src->obtainReader(this))
- , m_dest1(dest1)
- , m_dest2(dest2)
- {
- suspendIfNeeded();
- }
- ~SourceContext() override
- {
- stopInternal();
- }
-
- void didGetReadable() override
- {
- DCHECK(m_reader);
- Result r = WebDataConsumerHandle::Ok;
- while (true) {
- const void* buffer = nullptr;
- size_t available = 0;
- r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
- if (r == WebDataConsumerHandle::ShouldWait)
- return;
- if (r != WebDataConsumerHandle::Ok)
- break;
- m_dest1->enqueue(static_cast<const char*>(buffer), available);
- m_dest2->enqueue(static_cast<const char*>(buffer), available);
- m_reader->endRead(available);
- }
- m_dest1->setResult(r);
- m_dest2->setResult(r);
- stopInternal();
- }
-
- void stop() override
- {
- stopInternal();
- ActiveDOMObject::stop();
- }
-
- DEFINE_INLINE_VIRTUAL_TRACE()
- {
- ActiveDOMObject::trace(visitor);
- }
-
-private:
- void stopInternal()
- {
- if (!m_root)
- return;
- // When we already set a result, this result setting will be ignored.
- m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
- m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
- m_root->stop();
- m_root = nullptr;
- m_reader = nullptr;
- m_dest1 = nullptr;
- m_dest2 = nullptr;
- }
-
- RefPtr<TeeRootObject> m_root;
- std::unique_ptr<WebDataConsumerHandle::Reader> m_reader;
- RefPtr<DestinationContext> m_dest1;
- RefPtr<DestinationContext> m_dest2;
-};
-
-} // namespace
-
-void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr<WebDataConsumerHandle> src, std::unique_ptr<WebDataConsumerHandle>* dest1, std::unique_ptr<WebDataConsumerHandle>* dest2)
-{
- RefPtr<TeeRootObject> root = TeeRootObject::create();
- RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
- RefPtr<DestinationContext> context1 = DestinationContext::create();
- RefPtr<DestinationContext> context2 = DestinationContext::create();
-
- root->initialize(new SourceContext(root, std::move(src), context1, context2, executionContext));
-
- *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context1, tracker));
- *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context2, tracker));
-}
-
-void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr<FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1, std::unique_ptr<FetchDataConsumerHandle>* dest2)
-{
- RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr)->drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize);
- if (blobDataHandle) {
- *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle);
- *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataHandle);
- return;
- }
-
- std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2;
- DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDataConsumerHandle>>(std::move(src)), &webDest1, &webDest2);
- *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1));
- *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2));
- return;
-}
-
-} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698