| Index: Source/modules/fetch/DataConsumerHandleTestUtil.cpp
|
| diff --git a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
|
| index 19c76b9dbae1b1c574896af456652052acbea50c..6ff3b5451dba929e7072bfa0109b8a9146d6a255 100644
|
| --- a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
|
| +++ b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
|
| @@ -53,4 +53,205 @@ void DataConsumerHandleTestUtil::Thread::shutdown()
|
| m_waitableEvent->signal();
|
| }
|
|
|
| +class DataConsumerHandleTestUtil::ReplayingHandle::Context final : public ThreadSafeRefCounted<Context> {
|
| +public:
|
| + static PassRefPtr<Context> create() { return adoptRef(new Context); }
|
| +
|
| + // This function cannot be called after creating a tee.
|
| + void add(const Command& command)
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + m_commands.append(command);
|
| + }
|
| +
|
| + void attachReader(WebDataConsumerHandle::Client* client)
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + ASSERT(!m_readerThread);
|
| + ASSERT(!m_client);
|
| + m_readerThread = Platform::current()->currentThread();
|
| + m_client = client;
|
| +
|
| + if (m_client && !(isEmpty() && m_result == ShouldWait))
|
| + notify();
|
| + }
|
| + void detachReader()
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + ASSERT(m_readerThread && m_readerThread->isCurrentThread());
|
| + m_readerThread = nullptr;
|
| + m_client = nullptr;
|
| + if (!m_isHandleAttached)
|
| + m_detached->signal();
|
| + }
|
| +
|
| + void detachHandle()
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + m_isHandleAttached = false;
|
| + if (!m_readerThread)
|
| + m_detached->signal();
|
| + }
|
| +
|
| + Result beginRead(const void** buffer, Flags, size_t* available)
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + *buffer = nullptr;
|
| + *available = 0;
|
| + if (isEmpty())
|
| + return m_result;
|
| +
|
| + const Command& command = top();
|
| + Result result = Ok;
|
| + switch (command.name()) {
|
| + case Command::Data: {
|
| + auto& body = command.body();
|
| + *available = body.size() - offset();
|
| + *buffer = body.data() + offset();
|
| + result = Ok;
|
| + break;
|
| + }
|
| + case Command::Done:
|
| + m_result = result = Done;
|
| + consume(0);
|
| + break;
|
| + case Command::Wait:
|
| + consume(0);
|
| + result = ShouldWait;
|
| + notify();
|
| + break;
|
| + case Command::Error:
|
| + m_result = result = UnexpectedError;
|
| + consume(0);
|
| + break;
|
| + }
|
| + return result;
|
| + }
|
| + Result endRead(size_t readSize)
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + consume(readSize);
|
| + return Ok;
|
| + }
|
| +
|
| + WebWaitableEvent* detached() { return m_detached.get(); }
|
| +
|
| +private:
|
| + Context()
|
| + : m_offset(0)
|
| + , m_readerThread(nullptr)
|
| + , m_client(nullptr)
|
| + , m_result(ShouldWait)
|
| + , m_isHandleAttached(true)
|
| + , m_detached(adoptPtr(Platform::current()->createWaitableEvent()))
|
| + {
|
| + }
|
| +
|
| + bool isEmpty() const { return m_commands.isEmpty(); }
|
| + const Command& top()
|
| + {
|
| + ASSERT(!isEmpty());
|
| + return m_commands.first();
|
| + }
|
| +
|
| + void consume(size_t size)
|
| + {
|
| + ASSERT(!isEmpty());
|
| + ASSERT(size + m_offset <= top().body().size());
|
| + bool fullyConsumed = (size + m_offset >= top().body().size());
|
| + if (fullyConsumed) {
|
| + m_offset = 0;
|
| + m_commands.removeFirst();
|
| + } else {
|
| + m_offset += size;
|
| + }
|
| + }
|
| +
|
| + size_t offset() const { return m_offset; }
|
| +
|
| + void notify()
|
| + {
|
| + if (!m_client)
|
| + return;
|
| + ASSERT(m_readerThread);
|
| + m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::notifyInternal, this)));
|
| + }
|
| +
|
| + void notifyInternal()
|
| + {
|
| + {
|
| + MutexLocker locker(m_mutex);
|
| + if (!m_client || !m_readerThread->isCurrentThread()) {
|
| + // There is no client, or a new reader is attached.
|
| + return;
|
| + }
|
| + }
|
| + // The reading thread is the current thread.
|
| + m_client->didGetReadable();
|
| + }
|
| +
|
| + Deque<Command> m_commands;
|
| + size_t m_offset;
|
| + WebThread* m_readerThread;
|
| + Client* m_client;
|
| + Result m_result;
|
| + bool m_isHandleAttached;
|
| + Mutex m_mutex;
|
| + OwnPtr<WebWaitableEvent> m_detached;
|
| +};
|
| +
|
| +class DataConsumerHandleTestUtil::ReplayingHandle::ReaderImpl final : public Reader {
|
| +public:
|
| + ReaderImpl(PassRefPtr<Context> context, Client* client)
|
| + : m_context(context)
|
| + {
|
| + m_context->attachReader(client);
|
| + }
|
| + ~ReaderImpl()
|
| + {
|
| + m_context->detachReader();
|
| + }
|
| +
|
| + Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override
|
| + {
|
| + const void* src = nullptr;
|
| + Result result = beginRead(&src, flags, readSize);
|
| + if (result != Ok)
|
| + return result;
|
| + *readSize = std::min(*readSize, size);
|
| + memcpy(buffer, src, *readSize);
|
| + return endRead(*readSize);
|
| + }
|
| + Result beginRead(const void** buffer, Flags flags, size_t* available) override
|
| + {
|
| + return m_context->beginRead(buffer, flags, available);
|
| + }
|
| + Result endRead(size_t readSize) override
|
| + {
|
| + return m_context->endRead(readSize);
|
| + }
|
| +
|
| +private:
|
| + RefPtr<Context> m_context;
|
| +};
|
| +
|
| +DataConsumerHandleTestUtil::ReplayingHandle::ReplayingHandle() : m_context(Context::create())
|
| +{
|
| +}
|
| +
|
| +DataConsumerHandleTestUtil::ReplayingHandle::~ReplayingHandle()
|
| +{
|
| + m_context->detachHandle();
|
| +}
|
| +
|
| +WebDataConsumerHandle::Reader* DataConsumerHandleTestUtil::ReplayingHandle::obtainReaderInternal(Client* client)
|
| +{
|
| + return new ReaderImpl(m_context, client);
|
| +}
|
| +
|
| +void DataConsumerHandleTestUtil::ReplayingHandle::add(const Command& command)
|
| +{
|
| + m_context->add(command);
|
| +}
|
| +
|
| } // namespace blink
|
|
|