Chromium Code Reviews| Index: Source/modules/fetch/CompositeDataConsumerHandle.cpp |
| diff --git a/Source/modules/fetch/CompositeDataConsumerHandle.cpp b/Source/modules/fetch/CompositeDataConsumerHandle.cpp |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e274b6b841e4df85643d5afb19ea62868414bd7b |
| --- /dev/null |
| +++ b/Source/modules/fetch/CompositeDataConsumerHandle.cpp |
| @@ -0,0 +1,237 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "config.h" |
| +#include "modules/fetch/CompositeDataConsumerHandle.h" |
| + |
| +#include "platform/Task.h" |
| +#include "platform/ThreadSafeFunctional.h" |
| +#include "public/platform/Platform.h" |
| +#include "public/platform/WebThread.h" |
| +#include "public/platform/WebTraceLocation.h" |
| +#include "wtf/Locker.h" |
| +#include "wtf/ThreadSafeRefCounted.h" |
| +#include "wtf/ThreadingPrimitives.h" |
| + |
| +namespace blink { |
| + |
| +using Result = WebDataConsumerHandle::Result; |
| + |
| +namespace { |
| + |
| +class RepeatingReader final : public WebDataConsumerHandle::Reader { |
| +public: |
| + explicit RepeatingReader(Result r) : m_result(r) { } |
| + Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSize) override |
| + { |
| + *readSize = 0; |
| + return m_result; |
| + } |
| + Result beginRead(const void**, WebDataConsumerHandle::Flags, size_t *available) override |
| + { |
| + *available = 0; |
| + return m_result; |
| + } |
| + Result endRead(size_t) override |
| + { |
| + return WebDataConsumerHandle::UnexpectedError; |
| + } |
| + |
| +private: |
| + const Result m_result; |
| +}; |
| + |
| +class WaitingHandle final : public WebDataConsumerHandle { |
| +private: |
| + Reader* obtainReaderInternal(Client*) override { return new RepeatingReader(ShouldWait); } |
| +}; |
| + |
| +class DoneHandle final : public WebDataConsumerHandle { |
| +private: |
| + Reader* obtainReaderInternal(Client*) override { return new RepeatingReader(Done); } |
| +}; |
| + |
| +} // namespace |
| + |
| +class CompositeDataConsumerHandle::ReaderImpl final : public WebDataConsumerHandle::Reader { |
| +public: |
| + explicit ReaderImpl(PassRefPtr<Context>); |
| + ~ReaderImpl() override; |
| + Result read(void* data, size_t /* size */, Flags, size_t* readSize) override; |
| + Result beginRead(const void** buffer, Flags, size_t* available) override; |
| + Result endRead(size_t readSize) override; |
| + |
| +private: |
| + RefPtr<Context> m_context; |
| +}; |
| + |
| +class CompositeDataConsumerHandle::Context : public ThreadSafeRefCounted<Context> { |
| +public: |
| + using Token = unsigned; |
| + static PassRefPtr<Context> create(PassOwnPtr<WebDataConsumerHandle> handle) { return adoptRef(new Context(handle)); } |
| + PassOwnPtr<ReaderImpl> obtainReader(Client* client) |
| + { |
| + MutexLocker locker(m_mutex); |
| + ASSERT(!m_readerThread); |
| + ASSERT(!m_reader); |
| + ASSERT(!m_client); |
| + ++m_token; |
| + m_client = client; |
| + m_readerThread = Platform::current()->currentThread(); |
| + m_reader = m_handle->obtainReader(m_client); |
| + return adoptPtr(new ReaderImpl(this)); |
| + } |
| + void detachReader() |
| + { |
|
hiroshige
2015/06/10 06:55:25
Add ASSERT(!m_isInTwoPhaseRead) and ASSERT(!m_isUp
yhirano
2015/06/10 11:02:43
Done.
|
| + MutexLocker locker(m_mutex); |
| + ASSERT(m_readerThread); |
| + ASSERT(m_readerThread->isCurrentThread()); |
| + ASSERT(m_reader); |
|
hiroshige
2015/06/10 06:55:26
We assume obtainReader() returns non-null pointer
yhirano
2015/06/10 11:02:44
The implementation in WebDataConsumerHandle is not
|
| + ++m_token; |
| + m_reader = nullptr; |
| + m_readerThread = nullptr; |
| + m_client = nullptr; |
| + } |
| + void update(PassOwnPtr<WebDataConsumerHandle> handle) |
| + { |
| + MutexLocker locker(m_mutex); |
| + m_handle = handle; |
| + if (!m_readerThread) { |
| + // There is no reader. |
| + return; |
| + } |
| + ++m_token; |
| + updateReaderNoLock(m_token); |
| + } |
| + |
| + Result read(void* data, size_t size, Flags flags, size_t* readSize) |
| + { |
| + ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| + return m_reader->read(data, size, flags, readSize); |
| + } |
| + Result beginRead(const void** buffer, Flags flags, size_t* available) |
| + { |
| + ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| + ASSERT(!m_isInTwoPhaseRead); |
| + m_isInTwoPhaseRead = true; |
| + return m_reader->beginRead(buffer, flags, available); |
| + } |
| + Result endRead(size_t readSize) |
| + { |
| + ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| + ASSERT(m_isInTwoPhaseRead); |
| + Result r = m_reader->endRead(readSize); |
| + m_isInTwoPhaseRead = false; |
| + if (m_isUpdateWaitingForEndRead) { |
|
hiroshige
2015/06/10 06:55:25
We need MutexLocker(m_mutex) here, because we shou
yhirano
2015/06/10 11:02:44
Done.
|
| + m_reader = nullptr; |
| + m_reader = m_handle->obtainReader(m_client); |
| + m_isUpdateWaitingForEndRead = false; |
| + } |
| + return r; |
| + } |
| + |
| +private: |
| + explicit Context(PassOwnPtr<WebDataConsumerHandle> handle) |
| + : m_handle(handle) |
| + , m_readerThread(nullptr) |
| + , m_client(nullptr) |
| + , m_token(0) |
| + , m_isUpdateWaitingForEndRead(false) |
| + , m_isInTwoPhaseRead(false) |
| + { |
| + } |
| + void updateReader(Token token) |
| + { |
| + MutexLocker locker(m_mutex); |
| + updateReaderNoLock(token); |
| + } |
| + void updateReaderNoLock(Token token) |
| + { |
| + if (token != m_token) { |
| + // This request is not fresh. Ignore it. |
| + return; |
| + } |
| + ASSERT(m_readerThread); |
| + ASSERT(m_reader); |
| + if (m_readerThread->isCurrentThread()) { |
| + if (m_isInTwoPhaseRead) { |
| + // We are waiting for the two-phase read completion. |
| + m_isUpdateWaitingForEndRead = true; |
| + return; |
| + } |
| + // Unregister the old one, then register the new one. |
| + m_reader = nullptr; |
| + m_reader = m_handle->obtainReader(m_client); |
|
hiroshige
2015/06/10 06:55:26
Should we post a task for didGetReadable() here be
yhirano
2015/06/10 11:02:43
Done.
|
| + return; |
| + } |
| + ++m_token; |
| + m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::updateReader, this, m_token))); |
| + } |
| + |
| + OwnPtr<Reader> m_reader; |
| + OwnPtr<WebDataConsumerHandle> m_handle; |
| + // Note: Holding a WebThread raw pointer is not generally safe, but we can |
| + // do that in this case because: |
| + // 1. Destructing a ReaderImpl when the bound thread ends is a user's |
| + // responsibility. |
| + // 2. |m_readerThread| will never be used after the associated reader is |
| + // detached. |
| + WebThread* m_readerThread; |
| + Client* m_client; |
| + Token m_token; |
| + // These boolean values are bound to the reader thread. |
| + bool m_isUpdateWaitingForEndRead; |
| + bool m_isInTwoPhaseRead; |
| + Mutex m_mutex; |
| +}; |
| + |
| +CompositeDataConsumerHandle::ReaderImpl::ReaderImpl(PassRefPtr<Context> context) : m_context(context) { } |
| + |
| +CompositeDataConsumerHandle::ReaderImpl::~ReaderImpl() |
| +{ |
| + m_context->detachReader(); |
| +} |
| + |
| +Result CompositeDataConsumerHandle::ReaderImpl::read(void* data, size_t size, Flags flags, size_t* readSize) |
| +{ |
| + return m_context->read(data, size, flags, readSize); |
| +} |
| + |
| +Result CompositeDataConsumerHandle::ReaderImpl::beginRead(const void** buffer, Flags flags, size_t* available) |
| +{ |
| + return m_context->beginRead(buffer, flags, available); |
| +} |
| + |
| +Result CompositeDataConsumerHandle::ReaderImpl::endRead(size_t readSize) |
| +{ |
| + return m_context->endRead(readSize); |
| +} |
| + |
| +CompositeDataConsumerHandle::CompositeDataConsumerHandle(PassOwnPtr<WebDataConsumerHandle> handle) |
| + : m_context(Context::create(handle)) { } |
| + |
| +CompositeDataConsumerHandle::~CompositeDataConsumerHandle() { } |
| + |
| +WebDataConsumerHandle::Reader* CompositeDataConsumerHandle::obtainReaderInternal(Client* client) |
| +{ |
| + return m_context->obtainReader(client).leakPtr(); |
| +} |
| + |
| +void CompositeDataConsumerHandle::update(PassOwnPtr<WebDataConsumerHandle> handle) |
| +{ |
| + ASSERT(handle); |
| + m_context->update(handle); |
| +} |
| + |
| +PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createWaitingHandle() |
| +{ |
| + return adoptPtr(new WaitingHandle); |
| +} |
| + |
| +PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createDoneHandle() |
| +{ |
| + return adoptPtr(new DoneHandle); |
| +} |
| + |
| +} // namespace blink |