Index: Source/modules/fetch/DataConsumerHandleTestUtil.cpp |
diff --git a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp |
index 19c76b9dbae1b1c574896af456652052acbea50c..6ff3b5451dba929e7072bfa0109b8a9146d6a255 100644 |
--- a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp |
+++ b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp |
@@ -53,4 +53,205 @@ void DataConsumerHandleTestUtil::Thread::shutdown() |
m_waitableEvent->signal(); |
} |
+class DataConsumerHandleTestUtil::ReplayingHandle::Context final : public ThreadSafeRefCounted<Context> { |
+public: |
+ static PassRefPtr<Context> create() { return adoptRef(new Context); } |
+ |
+ // This function cannot be called after creating a tee. |
+ void add(const Command& command) |
+ { |
+ MutexLocker locker(m_mutex); |
+ m_commands.append(command); |
+ } |
+ |
+ void attachReader(WebDataConsumerHandle::Client* client) |
+ { |
+ MutexLocker locker(m_mutex); |
+ ASSERT(!m_readerThread); |
+ ASSERT(!m_client); |
+ m_readerThread = Platform::current()->currentThread(); |
+ m_client = client; |
+ |
+ if (m_client && !(isEmpty() && m_result == ShouldWait)) |
+ notify(); |
+ } |
+ void detachReader() |
+ { |
+ MutexLocker locker(m_mutex); |
+ ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
+ m_readerThread = nullptr; |
+ m_client = nullptr; |
+ if (!m_isHandleAttached) |
+ m_detached->signal(); |
+ } |
+ |
+ void detachHandle() |
+ { |
+ MutexLocker locker(m_mutex); |
+ m_isHandleAttached = false; |
+ if (!m_readerThread) |
+ m_detached->signal(); |
+ } |
+ |
+ Result beginRead(const void** buffer, Flags, size_t* available) |
+ { |
+ MutexLocker locker(m_mutex); |
+ *buffer = nullptr; |
+ *available = 0; |
+ if (isEmpty()) |
+ return m_result; |
+ |
+ const Command& command = top(); |
+ Result result = Ok; |
+ switch (command.name()) { |
+ case Command::Data: { |
+ auto& body = command.body(); |
+ *available = body.size() - offset(); |
+ *buffer = body.data() + offset(); |
+ result = Ok; |
+ break; |
+ } |
+ case Command::Done: |
+ m_result = result = Done; |
+ consume(0); |
+ break; |
+ case Command::Wait: |
+ consume(0); |
+ result = ShouldWait; |
+ notify(); |
+ break; |
+ case Command::Error: |
+ m_result = result = UnexpectedError; |
+ consume(0); |
+ break; |
+ } |
+ return result; |
+ } |
+ Result endRead(size_t readSize) |
+ { |
+ MutexLocker locker(m_mutex); |
+ consume(readSize); |
+ return Ok; |
+ } |
+ |
+ WebWaitableEvent* detached() { return m_detached.get(); } |
+ |
+private: |
+ Context() |
+ : m_offset(0) |
+ , m_readerThread(nullptr) |
+ , m_client(nullptr) |
+ , m_result(ShouldWait) |
+ , m_isHandleAttached(true) |
+ , m_detached(adoptPtr(Platform::current()->createWaitableEvent())) |
+ { |
+ } |
+ |
+ bool isEmpty() const { return m_commands.isEmpty(); } |
+ const Command& top() |
+ { |
+ ASSERT(!isEmpty()); |
+ return m_commands.first(); |
+ } |
+ |
+ void consume(size_t size) |
+ { |
+ ASSERT(!isEmpty()); |
+ ASSERT(size + m_offset <= top().body().size()); |
+ bool fullyConsumed = (size + m_offset >= top().body().size()); |
+ if (fullyConsumed) { |
+ m_offset = 0; |
+ m_commands.removeFirst(); |
+ } else { |
+ m_offset += size; |
+ } |
+ } |
+ |
+ size_t offset() const { return m_offset; } |
+ |
+ void notify() |
+ { |
+ if (!m_client) |
+ return; |
+ ASSERT(m_readerThread); |
+ m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::notifyInternal, this))); |
+ } |
+ |
+ void notifyInternal() |
+ { |
+ { |
+ MutexLocker locker(m_mutex); |
+ if (!m_client || !m_readerThread->isCurrentThread()) { |
+ // There is no client, or a new reader is attached. |
+ return; |
+ } |
+ } |
+ // The reading thread is the current thread. |
+ m_client->didGetReadable(); |
+ } |
+ |
+ Deque<Command> m_commands; |
+ size_t m_offset; |
+ WebThread* m_readerThread; |
+ Client* m_client; |
+ Result m_result; |
+ bool m_isHandleAttached; |
+ Mutex m_mutex; |
+ OwnPtr<WebWaitableEvent> m_detached; |
+}; |
+ |
+class DataConsumerHandleTestUtil::ReplayingHandle::ReaderImpl final : public Reader { |
+public: |
+ ReaderImpl(PassRefPtr<Context> context, Client* client) |
+ : m_context(context) |
+ { |
+ m_context->attachReader(client); |
+ } |
+ ~ReaderImpl() |
+ { |
+ m_context->detachReader(); |
+ } |
+ |
+ Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override |
+ { |
+ const void* src = nullptr; |
+ Result result = beginRead(&src, flags, readSize); |
+ if (result != Ok) |
+ return result; |
+ *readSize = std::min(*readSize, size); |
+ memcpy(buffer, src, *readSize); |
+ return endRead(*readSize); |
+ } |
+ Result beginRead(const void** buffer, Flags flags, size_t* available) override |
+ { |
+ return m_context->beginRead(buffer, flags, available); |
+ } |
+ Result endRead(size_t readSize) override |
+ { |
+ return m_context->endRead(readSize); |
+ } |
+ |
+private: |
+ RefPtr<Context> m_context; |
+}; |
+ |
+DataConsumerHandleTestUtil::ReplayingHandle::ReplayingHandle() : m_context(Context::create()) |
+{ |
+} |
+ |
+DataConsumerHandleTestUtil::ReplayingHandle::~ReplayingHandle() |
+{ |
+ m_context->detachHandle(); |
+} |
+ |
+WebDataConsumerHandle::Reader* DataConsumerHandleTestUtil::ReplayingHandle::obtainReaderInternal(Client* client) |
+{ |
+ return new ReaderImpl(m_context, client); |
+} |
+ |
+void DataConsumerHandleTestUtil::ReplayingHandle::add(const Command& command) |
+{ |
+ m_context->add(command); |
+} |
+ |
} // namespace blink |