Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1276)

Unified Diff: Source/modules/fetch/DataConsumerHandleTestUtil.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: Source/modules/fetch/DataConsumerHandleTestUtil.cpp
diff --git a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
index 19c76b9dbae1b1c574896af456652052acbea50c..6ff3b5451dba929e7072bfa0109b8a9146d6a255 100644
--- a/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
+++ b/Source/modules/fetch/DataConsumerHandleTestUtil.cpp
@@ -53,4 +53,205 @@ void DataConsumerHandleTestUtil::Thread::shutdown()
m_waitableEvent->signal();
}
+class DataConsumerHandleTestUtil::ReplayingHandle::Context final : public ThreadSafeRefCounted<Context> {
+public:
+ static PassRefPtr<Context> create() { return adoptRef(new Context); }
+
+ // This function cannot be called after creating a tee.
+ void add(const Command& command)
+ {
+ MutexLocker locker(m_mutex);
+ m_commands.append(command);
+ }
+
+ void attachReader(WebDataConsumerHandle::Client* client)
+ {
+ MutexLocker locker(m_mutex);
+ ASSERT(!m_readerThread);
+ ASSERT(!m_client);
+ m_readerThread = Platform::current()->currentThread();
+ m_client = client;
+
+ if (m_client && !(isEmpty() && m_result == ShouldWait))
+ notify();
+ }
+ void detachReader()
+ {
+ MutexLocker locker(m_mutex);
+ ASSERT(m_readerThread && m_readerThread->isCurrentThread());
+ m_readerThread = nullptr;
+ m_client = nullptr;
+ if (!m_isHandleAttached)
+ m_detached->signal();
+ }
+
+ void detachHandle()
+ {
+ MutexLocker locker(m_mutex);
+ m_isHandleAttached = false;
+ if (!m_readerThread)
+ m_detached->signal();
+ }
+
+ Result beginRead(const void** buffer, Flags, size_t* available)
+ {
+ MutexLocker locker(m_mutex);
+ *buffer = nullptr;
+ *available = 0;
+ if (isEmpty())
+ return m_result;
+
+ const Command& command = top();
+ Result result = Ok;
+ switch (command.name()) {
+ case Command::Data: {
+ auto& body = command.body();
+ *available = body.size() - offset();
+ *buffer = body.data() + offset();
+ result = Ok;
+ break;
+ }
+ case Command::Done:
+ m_result = result = Done;
+ consume(0);
+ break;
+ case Command::Wait:
+ consume(0);
+ result = ShouldWait;
+ notify();
+ break;
+ case Command::Error:
+ m_result = result = UnexpectedError;
+ consume(0);
+ break;
+ }
+ return result;
+ }
+ Result endRead(size_t readSize)
+ {
+ MutexLocker locker(m_mutex);
+ consume(readSize);
+ return Ok;
+ }
+
+ WebWaitableEvent* detached() { return m_detached.get(); }
+
+private:
+ Context()
+ : m_offset(0)
+ , m_readerThread(nullptr)
+ , m_client(nullptr)
+ , m_result(ShouldWait)
+ , m_isHandleAttached(true)
+ , m_detached(adoptPtr(Platform::current()->createWaitableEvent()))
+ {
+ }
+
+ bool isEmpty() const { return m_commands.isEmpty(); }
+ const Command& top()
+ {
+ ASSERT(!isEmpty());
+ return m_commands.first();
+ }
+
+ void consume(size_t size)
+ {
+ ASSERT(!isEmpty());
+ ASSERT(size + m_offset <= top().body().size());
+ bool fullyConsumed = (size + m_offset >= top().body().size());
+ if (fullyConsumed) {
+ m_offset = 0;
+ m_commands.removeFirst();
+ } else {
+ m_offset += size;
+ }
+ }
+
+ size_t offset() const { return m_offset; }
+
+ void notify()
+ {
+ if (!m_client)
+ return;
+ ASSERT(m_readerThread);
+ m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::notifyInternal, this)));
+ }
+
+ void notifyInternal()
+ {
+ {
+ MutexLocker locker(m_mutex);
+ if (!m_client || !m_readerThread->isCurrentThread()) {
+ // There is no client, or a new reader is attached.
+ return;
+ }
+ }
+ // The reading thread is the current thread.
+ m_client->didGetReadable();
+ }
+
+ Deque<Command> m_commands;
+ size_t m_offset;
+ WebThread* m_readerThread;
+ Client* m_client;
+ Result m_result;
+ bool m_isHandleAttached;
+ Mutex m_mutex;
+ OwnPtr<WebWaitableEvent> m_detached;
+};
+
+class DataConsumerHandleTestUtil::ReplayingHandle::ReaderImpl final : public Reader {
+public:
+ ReaderImpl(PassRefPtr<Context> context, Client* client)
+ : m_context(context)
+ {
+ m_context->attachReader(client);
+ }
+ ~ReaderImpl()
+ {
+ m_context->detachReader();
+ }
+
+ Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override
+ {
+ const void* src = nullptr;
+ Result result = beginRead(&src, flags, readSize);
+ if (result != Ok)
+ return result;
+ *readSize = std::min(*readSize, size);
+ memcpy(buffer, src, *readSize);
+ return endRead(*readSize);
+ }
+ Result beginRead(const void** buffer, Flags flags, size_t* available) override
+ {
+ return m_context->beginRead(buffer, flags, available);
+ }
+ Result endRead(size_t readSize) override
+ {
+ return m_context->endRead(readSize);
+ }
+
+private:
+ RefPtr<Context> m_context;
+};
+
+DataConsumerHandleTestUtil::ReplayingHandle::ReplayingHandle() : m_context(Context::create())
+{
+}
+
+DataConsumerHandleTestUtil::ReplayingHandle::~ReplayingHandle()
+{
+ m_context->detachHandle();
+}
+
+WebDataConsumerHandle::Reader* DataConsumerHandleTestUtil::ReplayingHandle::obtainReaderInternal(Client* client)
+{
+ return new ReaderImpl(m_context, client);
+}
+
+void DataConsumerHandleTestUtil::ReplayingHandle::add(const Command& command)
+{
+ m_context->add(command);
+}
+
} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698