Index: Source/modules/fetch/FetchBlobDataConsumerHandle.cpp |
diff --git a/Source/modules/fetch/FetchBlobDataConsumerHandle.cpp b/Source/modules/fetch/FetchBlobDataConsumerHandle.cpp |
new file mode 100644 |
index 0000000000000000000000000000000000000000..c91d3130e8dd44f6e7812e53c080c907076023df |
--- /dev/null |
+++ b/Source/modules/fetch/FetchBlobDataConsumerHandle.cpp |
@@ -0,0 +1,493 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "config.h" |
+#include "modules/fetch/FetchBlobDataConsumerHandle.h" |
+ |
+#include "core/dom/ActiveDOMObject.h" |
+#include "core/dom/CrossThreadTask.h" |
+#include "core/dom/ExecutionContext.h" |
+#include "core/fetch/FetchInitiatorTypeNames.h" |
+#include "core/loader/ThreadableLoaderClient.h" |
+#include "modules/fetch/CompositeDataConsumerHandle.h" |
+#include "modules/fetch/DataConsumerHandleUtil.h" |
+#include "platform/Task.h" |
+#include "platform/blob/BlobRegistry.h" |
+#include "platform/blob/BlobURL.h" |
+#include "platform/network/ResourceRequest.h" |
+#include "public/platform/Platform.h" |
+#include "public/platform/WebTraceLocation.h" |
+#include "wtf/Locker.h" |
+#include "wtf/ThreadingPrimitives.h" |
+ |
+namespace blink { |
+ |
+using Result = FetchBlobDataConsumerHandle::Result; |
+ |
+namespace { |
+ |
+// CrossThreadHolder<T> provides cross-thread access to |obj| of class T. |
+// CrossThreadHolder<T> can be passed across threads |
+// while |obj|'s methods are called on the thread of |executionContext| |
+// via CrossThreadHolder<T>::postTask(). |
+// |obj| is destructed on the thread of |executionContext| |
+// when |executionContext| is stopped or |
+// CrossThreadHolder is destructed (earlier of them). |
+template<typename T> |
+class CrossThreadHolder { |
+public: |
+ static PassOwnPtr<CrossThreadHolder<T>> create(ExecutionContext* executionContext, PassOwnPtr<T> obj) |
+ { |
+ ASSERT(executionContext->isContextThread()); |
+ return adoptPtr(new CrossThreadHolder(executionContext, obj)); |
+ } |
+ |
+ // Can be called from any thread. |
+ // Executes |task| with |obj| and |executionContext| on the thread of |
+ // |executionContext|. |
+ // NOTE: |task| might be silently ignored (i.e. not executed) and |
+ // destructed (possibly on the calling thread or on the thread of |
+ // |executionContext|) when |executionContext| is stopped. |
+ void postTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> task) |
+ { |
+ m_peer->postTask(task); |
+ } |
+ |
+ ~CrossThreadHolder() |
+ { |
+ m_peer->clear(); |
+ } |
+ |
+private: |
+ CrossThreadHolder(ExecutionContext* executionContext, PassOwnPtr<T> obj) |
+ : m_peer(Peer::create()) |
+ { |
+ m_peer->initialize(new Bridge(executionContext, m_peer, obj)); |
+ } |
+ |
+ // Bridge and Peer have a reference cycle: |
+ // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer| <-(RefPtr)- CTH |
+ // | | <-CrossThreadPersistent- | | |
+ // CTH: CrossThreadHolder<T> |
+ // The reference from Peer to Bridge is protected by |Peer::m_mutex| and |
+ // cleared when Peer::clear() is called, i.e.: |
+ // [1] when |executionContext| is stopped: |
+ // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer| <-(RefPtr)- CTH |
+ // [2] or when CrossThreadHolder is destructed: |
+ // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer| |
+ // in either case, Bridge is shortly garbage collected. |
+ |
+ class Peer; |
+ |
+ // All methods must be called on |executionContext|'s thread. |
+ class Bridge |
+ : public GarbageCollectedFinalized<Bridge> |
+ , public ActiveDOMObject { |
+ WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(Bridge); |
+ public: |
+ Bridge(ExecutionContext* executionContext, PassRefPtr<Peer> peer, PassOwnPtr<T> obj) |
+ : ActiveDOMObject(executionContext) |
+ , m_peer(peer) |
+ , m_obj(obj) |
+ { |
+ suspendIfNeeded(); |
+ } |
+ |
+ DEFINE_INLINE_TRACE() |
+ { |
+ ActiveDOMObject::trace(visitor); |
+ } |
+ |
+ T* getObject() const { return m_obj.get(); } |
+ |
+ private: |
+ // ActiveDOMObject |
+ void stop() override |
+ { |
+ ASSERT(executionContext()->isContextThread()); |
+ m_obj.clear(); |
+ m_peer->clear(); |
+ } |
+ |
+ RefPtr<Peer> m_peer; |
+ OwnPtr<T> m_obj; |
+ }; |
+ |
+ class Peer : public ThreadSafeRefCounted<Peer> { |
+ public: |
+ static PassRefPtr<Peer> create() |
+ { |
+ return adoptRef(new Peer()); |
+ } |
+ |
+ // initialize() must be called only just after construction to create a |
+ // reference cycle. |
+ void initialize(Bridge* bridge) |
+ { |
+ m_bridge = bridge; |
+ } |
+ |
+ void clear() |
+ { |
+ MutexLocker locker(m_mutex); |
+ m_bridge.clear(); |
+ } |
+ |
+ void postTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> task) |
+ { |
+ MutexLocker locker(m_mutex); |
+ if (!m_bridge) { |
+ // The bridge has already disappeared. |
+ return; |
+ } |
+ m_bridge->executionContext()->postTask(FROM_HERE, createCrossThreadTask(&Peer::runTask, this, task)); |
+ } |
+ |
+ private: |
+ Peer() = default; |
+ |
+ // Must be called on the thread of |executionContext|. |
+ void runTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> task) |
+ { |
+ Bridge* bridge; |
+ { |
+ MutexLocker locker(m_mutex); |
+ bridge = m_bridge.get(); |
+ if (!bridge) { |
+ // The bridge has already disappeared. |
+ return; |
+ } |
+ ASSERT(bridge->executionContext()->isContextThread()); |
+ } |
+ |
+ // |bridge| can be set to nullptr by another thread after the |
+ // mutex is unlocked, but we can still use |loaderContext| here |
+ // because the LoaderContext belongs to the current thread |
+ // (==the loader thread). |
+ (*task)(bridge->getObject(), bridge->executionContext()); |
+ } |
+ |
+ // |m_brige| is protected by |m_mutex|. |
+ CrossThreadPersistent<Bridge> m_bridge; |
+ Mutex m_mutex; |
+ }; |
+ |
+ RefPtr<Peer> m_peer; |
+}; |
+ |
+// LoaderContext is created and destructed on the same thread |
+// (call this thread loader thread). |
+// All methods must be called on the loader thread. |
+// CommonContext and LoaderContext have a reference cycle: |
+// CTH <-----(OwnPtr)----+ |
+// | | |
+// (CrossThreadPersistent) | |
+// v | |
+// LC ----(RefPtr)----> CC <-- RC |
+// LC: LoaderContext |
+// CC: CommonContext |
+// RC: ReaderContext |
+// CTH: CrossThreadHolder<LoaderContext> |
+// This cycle is broken when CommonContext::clearLoaderContext() is called |
+// (which clears the reference to CrossThreadHolder), |
+// or ExecutionContext of the loader thread is stopped |
+// (CrossThreadHolder clears the reference to LoaderContext). |
+ |
+class LoaderContext { |
+public: |
+ virtual ~LoaderContext() { } |
+ virtual void start(ExecutionContext*) = 0; |
+}; |
+ |
+class CommonContext |
+ : public ThreadSafeRefCounted<CommonContext> { |
+public: |
+ CommonContext() |
+ : m_handle(CompositeDataConsumerHandle::create(createWaitingDataConsumerHandle())) { } |
+ |
+ // initialize() must be called only just after construction to create a |
+ // reference cycle. |
+ void initialize(ExecutionContext* executionContext, PassOwnPtr<LoaderContext> loaderContext) |
+ { |
+ m_loaderContextHolder = CrossThreadHolder<LoaderContext>::create(executionContext, loaderContext); |
+ } |
+ |
+ CompositeDataConsumerHandle* handle() { return m_handle.get(); } |
+ |
+ void startLoader() |
+ { |
+ MutexLocker locker(m_mutex); |
+ if (m_loaderContextHolder) |
+ m_loaderContextHolder->postTask(threadSafeBind<LoaderContext*, ExecutionContext*>(&LoaderContext::start)); |
+ } |
+ |
+ void clearLoaderContext() |
+ { |
+ MutexLocker locker(m_mutex); |
+ m_loaderContextHolder.clear(); |
+ } |
+ |
+ // Must be called on the loader thread. |
+ void error() |
+ { |
+ clearLoaderContext(); |
+ handle()->update(createUnexpectedErrorDataConsumerHandle()); |
+ } |
+ |
+private: |
+ OwnPtr<CompositeDataConsumerHandle> m_handle; |
+ |
+ // |m_loaderContextHolder| is protected by |m_mutex|. |
+ OwnPtr<CrossThreadHolder<LoaderContext>> m_loaderContextHolder; |
+ Mutex m_mutex; |
+}; |
+ |
+// All methods must be called on the loader thread. |
+class BlobLoaderContext final |
+ : public LoaderContext |
+ , public ThreadableLoaderClient { |
+public: |
+ BlobLoaderContext(PassRefPtr<CommonContext> commonContext, PassRefPtr<BlobDataHandle> blobDataHandle, FetchBlobDataConsumerHandle::LoaderFactory* loaderFactory) |
+ : m_commonContext(commonContext) |
+ , m_blobDataHandle(blobDataHandle) |
+ , m_loaderFactory(loaderFactory) |
+ , m_receivedResponse(false) { } |
+ |
+ ~BlobLoaderContext() override |
+ { |
+ if (m_loader && !m_receivedResponse) |
+ m_commonContext->handle()->update(createUnexpectedErrorDataConsumerHandle()); |
+ if (m_loader) { |
+ m_loader->cancel(); |
+ m_loader.clear(); |
+ } |
+ } |
+ |
+ void start(ExecutionContext* executionContext) override |
+ { |
+ ASSERT(executionContext->isContextThread()); |
+ ASSERT(!m_loader); |
+ |
+ m_loader = createLoader(executionContext, this); |
+ if (!m_loader) |
+ m_commonContext->error(); |
+ } |
+ |
+private: |
+ PassRefPtr<ThreadableLoader> createLoader(ExecutionContext* executionContext, ThreadableLoaderClient* client) const |
+ { |
+ KURL url = BlobURL::createPublicURL(executionContext->securityOrigin()); |
+ if (url.isEmpty()) { |
+ return nullptr; |
+ } |
+ BlobRegistry::registerPublicBlobURL(executionContext->securityOrigin(), url, m_blobDataHandle); |
+ |
+ ResourceRequest request(url); |
+ request.setRequestContext(WebURLRequest::RequestContextInternal); |
+ request.setUseStreamOnResponse(true); |
+ |
+ ThreadableLoaderOptions options; |
+ options.preflightPolicy = ConsiderPreflight; |
+ options.crossOriginRequestPolicy = DenyCrossOriginRequests; |
+ options.contentSecurityPolicyEnforcement = DoNotEnforceContentSecurityPolicy; |
+ options.initiator = FetchInitiatorTypeNames::internal; |
+ |
+ ResourceLoaderOptions resourceLoaderOptions; |
+ resourceLoaderOptions.dataBufferingPolicy = DoNotBufferData; |
+ |
+ return m_loaderFactory->create(*executionContext, client, request, options, resourceLoaderOptions); |
+ } |
+ |
+ // ThreadableLoaderClient |
+ void didReceiveResponse(unsigned long, const ResourceResponse&, PassOwnPtr<WebDataConsumerHandle> handle) override |
+ { |
+ ASSERT(!m_receivedResponse); |
+ m_receivedResponse = true; |
+ if (!handle) { |
+ // Here we assume WebURLLoader must return the response body as |
+ // |WebDataConsumerHandle| since we call |
+ // request.setUseStreamOnResponse(). |
+ m_commonContext->handle()->update(createUnexpectedErrorDataConsumerHandle()); |
+ return; |
+ } |
+ m_commonContext->handle()->update(handle); |
+ } |
+ void didFinishLoading(unsigned long, double) override |
+ { |
+ m_commonContext->clearLoaderContext(); |
+ } |
+ void didFail(const ResourceError&) override |
+ { |
+ if (m_receivedResponse) |
+ m_commonContext->clearLoaderContext(); |
+ else |
+ m_commonContext->error(); |
+ } |
+ void didFailRedirectCheck() override |
+ { |
+ // We don't expect redirects for Blob loading. |
+ ASSERT_NOT_REACHED(); |
+ } |
+ |
+ RefPtr<CommonContext> m_commonContext; |
+ RefPtr<BlobDataHandle> m_blobDataHandle; |
+ Persistent<FetchBlobDataConsumerHandle::LoaderFactory> m_loaderFactory; |
+ RefPtr<ThreadableLoader> m_loader; |
+ |
+ bool m_receivedResponse; |
+}; |
+ |
+class DefaultLoaderFactory final : public FetchBlobDataConsumerHandle::LoaderFactory { |
+public: |
+ PassRefPtr<ThreadableLoader> create( |
+ ExecutionContext& executionContext, |
+ ThreadableLoaderClient* client, |
+ const ResourceRequest& request, |
+ const ThreadableLoaderOptions& options, |
+ const ResourceLoaderOptions& resourceLoaderOptions) override |
+ { |
+ return ThreadableLoader::create(executionContext, client, request, options, resourceLoaderOptions); |
+ } |
+}; |
+ |
+} // namespace |
+ |
+// ReaderContext is referenced from FetchBlobDataConsumerHandle and |
+// ReaderContext::ReaderImpl. |
+// All functions/members must be called/accessed only on the reader thread, |
+// except for constructor, destructor, obtainReader() and |m_commonContext| |
+// accessed from them. |
+class FetchBlobDataConsumerHandle::ReaderContext final : public ThreadSafeRefCounted<ReaderContext> { |
+public: |
+ class ReaderImpl : public FetchDataConsumerHandle::Reader { |
+ public: |
+ ReaderImpl(Client* client, PassRefPtr<ReaderContext> readerContext, PassOwnPtr<WebDataConsumerHandle::Reader> reader) |
+ : m_readerContext(readerContext) |
+ , m_reader(reader) |
+ , m_notifier(client) { } |
+ ~ReaderImpl() override { } |
+ |
+ Result read(void* data, size_t size, Flags flags, size_t* readSize) override |
+ { |
+ if (m_readerContext->drained()) |
+ return Done; |
+ m_readerContext->ensureStartLoader(); |
+ Result r = m_reader->read(data, size, flags, readSize); |
+ if (r != ShouldWait) { |
+ // We read non-empty data, so we cannot use the blob data |
+ // handle which represents the whole data. |
+ m_readerContext->clearBlobDataHandleForDrain(); |
+ } |
+ return r; |
+ } |
+ |
+ Result beginRead(const void** buffer, Flags flags, size_t* available) override |
+ { |
+ if (m_readerContext->drained()) |
+ return Done; |
+ m_readerContext->ensureStartLoader(); |
+ Result r = m_reader->beginRead(buffer, flags, available); |
+ if (r != ShouldWait) { |
+ // We read non-empty data, so we cannot use the blob data |
+ // handle which represents the whole data. |
+ m_readerContext->clearBlobDataHandleForDrain(); |
+ } |
+ return r; |
+ } |
+ |
+ Result endRead(size_t readSize) override |
+ { |
+ return m_reader->endRead(readSize); |
+ } |
+ |
+ PassRefPtr<BlobDataHandle> drainAsBlobDataHandle() override |
+ { |
+ if (!m_readerContext->m_blobDataHandleForDrain || m_readerContext->m_blobDataHandleForDrain->size() == kuint64max) |
+ return nullptr; |
+ RefPtr<BlobDataHandle> blobDataHandle = m_readerContext->m_blobDataHandleForDrain; |
+ m_readerContext->setDrained(); |
+ m_readerContext->clearBlobDataHandleForDrain(); |
+ return blobDataHandle.release(); |
+ } |
+ |
+ private: |
+ RefPtr<ReaderContext> m_readerContext; |
+ OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
+ NotifyOnReaderCreationHelper m_notifier; |
+ }; |
+ |
+ ReaderContext(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle, FetchBlobDataConsumerHandle::LoaderFactory* loaderFactory) |
+ : m_commonContext(adoptRef(new CommonContext())) |
+ , m_loaderStarted(false) |
+ , m_drained(false) |
+ , m_blobDataHandleForDrain(blobDataHandle) |
+ { |
+ m_commonContext->initialize(executionContext, adoptPtr(new BlobLoaderContext(m_commonContext, m_blobDataHandleForDrain, loaderFactory))); |
+ } |
+ |
+ ~ReaderContext() |
+ { |
+ m_commonContext->clearLoaderContext(); |
+ } |
+ |
+ PassOwnPtr<FetchDataConsumerHandle::Reader> obtainReader(WebDataConsumerHandle::Client* client) |
+ { |
+ return adoptPtr(new ReaderImpl(client, this, m_commonContext->handle()->obtainReader(client))); |
+ } |
+ |
+private: |
+ void ensureStartLoader() |
+ { |
+ if (m_loaderStarted) |
+ return; |
+ m_loaderStarted = true; |
+ m_commonContext->startLoader(); |
+ } |
+ |
+ void clearBlobDataHandleForDrain() |
+ { |
+ m_blobDataHandleForDrain.clear(); |
+ } |
+ |
+ bool drained() const { return m_drained; } |
+ void setDrained() { m_drained = true; } |
+ |
+ RefPtr<CommonContext> m_commonContext; |
+ |
+ bool m_loaderStarted; |
+ bool m_drained; |
+ RefPtr<BlobDataHandle> m_blobDataHandleForDrain; |
+}; |
+ |
+FetchBlobDataConsumerHandle::FetchBlobDataConsumerHandle(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle, LoaderFactory* loaderFactory) |
+ : m_readerContext(adoptRef(new ReaderContext(executionContext, blobDataHandle, loaderFactory))) |
+{ |
+} |
+ |
+FetchBlobDataConsumerHandle::~FetchBlobDataConsumerHandle() |
+{ |
+} |
+ |
+PassOwnPtr<FetchDataConsumerHandle> FetchBlobDataConsumerHandle::create(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle, LoaderFactory* loaderFactory) |
+{ |
+ if (!blobDataHandle) |
+ return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()); |
+ |
+ return adoptPtr(new FetchBlobDataConsumerHandle(executionContext, blobDataHandle, loaderFactory)); |
+} |
+ |
+PassOwnPtr<FetchDataConsumerHandle> FetchBlobDataConsumerHandle::create(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle) |
+{ |
+ if (!blobDataHandle) |
+ return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()); |
+ |
+ return adoptPtr(new FetchBlobDataConsumerHandle(executionContext, blobDataHandle, new DefaultLoaderFactory)); |
+} |
+ |
+FetchDataConsumerHandle::Reader* FetchBlobDataConsumerHandle::obtainReaderInternal(Client* client) |
+{ |
+ return m_readerContext->obtainReader(client).leakPtr(); |
+} |
+ |
+} // namespace blink |