Chromium Code Reviews| Index: Source/modules/fetch/DataConsumerTeeTest.cpp |
| diff --git a/Source/modules/fetch/DataConsumerTeeTest.cpp b/Source/modules/fetch/DataConsumerTeeTest.cpp |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ae812d6220b869bf9a55865c554defae571b32af |
| --- /dev/null |
| +++ b/Source/modules/fetch/DataConsumerTeeTest.cpp |
| @@ -0,0 +1,664 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "config.h" |
| +#include "modules/fetch/DataConsumerTee.h" |
| + |
| +#include "core/testing/DummyPageHolder.h" |
| +#include "core/testing/NullExecutionContext.h" |
| +#include "platform/Task.h" |
| +#include "platform/ThreadSafeFunctional.h" |
| +#include "platform/WebThreadSupportingGC.h" |
| +#include "public/platform/Platform.h" |
| +#include "public/platform/WebThread.h" |
| +#include "public/platform/WebTraceLocation.h" |
| +#include "public/platform/WebWaitableEvent.h" |
| +#include "wtf/Deque.h" |
| +#include "wtf/PassRefPtr.h" |
| +#include "wtf/RefPtr.h" |
| +#include "wtf/ThreadSafeRefCounted.h" |
| +#include "wtf/ThreadingPrimitives.h" |
| +#include "wtf/Vector.h" |
| + |
| +#include <gtest/gtest.h> |
| +#include <string.h> |
| +#include <v8.h> |
| + |
| +namespace blink { |
| +namespace { |
| + |
| +using Result = WebDataConsumerHandle::Result; |
| +const WebDataConsumerHandle::Flags kNone = WebDataConsumerHandle::FlagNone; |
| +const Result kOk = WebDataConsumerHandle::Ok; |
| +const Result kShouldWait = WebDataConsumerHandle::ShouldWait; |
| +const Result kDone = WebDataConsumerHandle::Done; |
| +const Result kUnexpectedError = WebDataConsumerHandle::UnexpectedError; |
| + |
| +class Command final { |
| +public: |
| + enum Name { |
| + Data, |
| + Done, |
| + Error, |
| + Wait, |
| + }; |
| + |
| + Command(Name name) : m_name(name) { } |
| + Command(Name name, const Vector<char>& body) : m_name(name), m_body(body) { } |
| + Command(Name name, const char* body, size_t size) : m_name(name) |
| + { |
| + m_body.append(body, size); |
| + } |
| + Command(Name name, const char* body) : Command(name, body, strlen(body)) { } |
| + Name name() const { return m_name; } |
| + const Vector<char>& body() const { return m_body; } |
| + |
| +private: |
| + const Name m_name; |
| + Vector<char> m_body; |
| +}; |
| + |
| +class Handle final : public WebDataConsumerHandle { |
|
hiroshige
2015/06/23 09:20:44
Please add a comment that says Handle is a WebData
yhirano
2015/06/23 10:18:19
Done.
|
| +public: |
| + class Context final : public ThreadSafeRefCounted<Context> { |
| + public: |
| + static PassRefPtr<Context> create() { return adoptRef(new Context); } |
| + |
| + // This function cannot be called after creating a tee. |
| + void add(const Command& command) |
| + { |
| + m_commands.append(command); |
| + } |
| + |
| + void notify() |
| + { |
| + if (!m_client) |
| + return; |
| + ASSERT(m_readerThread); |
| + m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::notifyInternal, this))); |
| + } |
| + |
| + void attachReader(WebDataConsumerHandle::Client* client) |
| + { |
| + ASSERT(!m_readerThread); |
| + ASSERT(!m_client); |
| + m_readerThread = Platform::current()->currentThread(); |
| + m_client = client; |
| + |
| + if (m_client && !(isEmpty() && m_result == kShouldWait)) |
| + notify(); |
| + } |
| + void detachReader() |
| + { |
| + ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| + m_readerThread = nullptr; |
| + m_client = nullptr; |
| + if (!m_isHandleAttached) |
| + m_detachEvent->signal(); |
|
hiroshige
2015/06/23 09:20:44
Can we move |m_detachEvent->signal()| to the dtor
yhirano
2015/06/23 10:18:20
That assumption does not hold in DetachBothDestina
|
| + } |
| + |
| + void detachHandle() |
| + { |
| + m_isHandleAttached = false; |
| + if (!m_readerThread) |
| + m_detachEvent->signal(); |
|
hiroshige
2015/06/23 09:20:44
ditto, and can we remove detachHandle()?
yhirano
2015/06/23 10:18:19
Ditto
|
| + } |
| + |
| + bool isEmpty() const { return m_commands.isEmpty(); } |
| + const Command& top() |
| + { |
| + ASSERT(!isEmpty()); |
| + return m_commands.first(); |
| + } |
| + |
| + void consume(size_t size) |
| + { |
| + ASSERT(!isEmpty()); |
| + ASSERT(size + m_offset <= top().body().size()); |
| + bool fullyConsumed = (size + m_offset == top().body().size()); |
|
hiroshige
2015/06/23 09:20:45
">=" instead of "==" is a little more robust and c
yhirano
2015/06/23 10:18:20
Done.
|
| + if (fullyConsumed) { |
| + m_offset = 0; |
| + m_commands.removeFirst(); |
| + } else { |
| + m_offset += size; |
| + } |
| + } |
| + |
| + size_t offset() const { return m_offset; } |
| + Result result() const { return m_result; } |
| + void setDone() { m_result = Done; } |
| + void setError() { m_result = UnexpectedError; } |
| + |
| + Mutex& mutex() |
| + { |
| + return m_mutex; |
| + } |
| + |
| + WebWaitableEvent* detachEvent() { return m_detachEvent.get(); } |
|
hiroshige
2015/06/23 09:20:45
nit optional: detachEvent() seems "to detach an ev
yhirano
2015/06/23 10:18:19
Done.
|
| + |
| + private: |
| + Context() |
| + : m_offset(0) |
| + , m_readerThread(nullptr) |
| + , m_client(nullptr) |
| + , m_result(ShouldWait) |
| + , m_isHandleAttached(true) |
| + , m_detachEvent(adoptPtr(Platform::current()->createWaitableEvent())) |
| + { |
| + } |
| + |
| + void notifyInternal() |
| + { |
| + { |
| + MutexLocker locker(m_mutex); |
| + if (!m_client || !m_readerThread->isCurrentThread()) { |
| + // There is no client, or a new reader is attached. |
| + return; |
| + } |
| + } |
| + // The reading thread is the current thread. |
| + m_client->didGetReadable(); |
| + } |
| + |
| + Deque<Command> m_commands; |
| + size_t m_offset; |
| + WebThread* m_readerThread; |
| + Client* m_client; |
| + Result m_result; |
| + bool m_isHandleAttached; |
| + Mutex m_mutex; |
| + OwnPtr<WebWaitableEvent> m_detachEvent; |
| + }; |
| + |
| + class ReaderImpl final : public Reader { |
| + public: |
| + ReaderImpl(PassRefPtr<Context> context, Client* client) |
| + : m_context(context) |
| + { |
| + MutexLocker locker(m_context->mutex()); |
|
hiroshige
2015/06/23 09:20:44
I prefer taking lock in Context, because it is cle
yhirano
2015/06/23 10:18:19
Done.
|
| + m_context->attachReader(client); |
| + } |
| + ~ReaderImpl() |
| + { |
| + MutexLocker locker(m_context->mutex()); |
|
hiroshige
2015/06/23 09:20:44
ditto.
yhirano
2015/06/23 10:18:19
Done.
|
| + m_context->detachReader(); |
| + } |
| + |
| + Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override |
| + { |
| + const void* src = nullptr; |
| + Result result = beginRead(&src, flags, readSize); |
| + if (result != Ok) |
| + return result; |
| + memcpy(buffer, src, *readSize); |
|
hiroshige
2015/06/23 09:20:44
We should take min(size, *readSize).
yhirano
2015/06/23 10:18:20
Done.
|
| + return endRead(*readSize); |
| + } |
| + Result beginRead(const void** buffer, Flags, size_t* available) override |
| + { |
| + MutexLocker locker(m_context->mutex()); |
|
hiroshige
2015/06/23 09:20:44
ditto, and can we move this implementation to Cont
yhirano
2015/06/23 10:18:19
Done.
|
| + *buffer = nullptr; |
| + *available = 0; |
| + if (m_context->isEmpty()) |
| + return m_context->result(); |
| + |
| + const Command& command = m_context->top(); |
| + Result result = Ok; |
| + switch (command.name()) { |
| + case Command::Data: { |
| + auto& body = command.body(); |
| + *available = body.size() - m_context->offset(); |
| + *buffer = body.data() + m_context->offset(); |
| + result = Ok; |
| + break; |
| + } |
| + case Command::Done: |
| + m_context->setDone(); |
| + m_context->consume(0); |
| + result = Done; |
| + break; |
| + case Command::Wait: |
| + m_context->consume(0); |
| + result = ShouldWait; |
| + m_context->notify(); |
| + break; |
| + case Command::Error: |
| + m_context->setError(); |
| + m_context->consume(0); |
| + result = UnexpectedError; |
| + break; |
| + } |
| + return result; |
| + } |
| + Result endRead(size_t readSize) override |
| + { |
| + m_context->consume(readSize); |
| + return Ok; |
| + } |
| + |
| + private: |
| + RefPtr<Context> m_context; |
| + }; |
| + |
| + Handle() : m_context(Context::create()) { } |
| + ~Handle() |
| + { |
| + MutexLocker locker(m_context->mutex()); |
| + m_context->detachHandle(); |
| + } |
| + |
| + ReaderImpl* obtainReaderInternal(Client* client) override { return new ReaderImpl(m_context, client); } |
| + |
| + // Add a command to this handle. This function must be called on the |
| + // creator thread. This function must be called BEFORE any reader is |
| + // obtained. |
| + void add(const Command& command) |
| + { |
| + MutexLocker locker(m_context->mutex()); |
| + m_context->add(command); |
| + } |
| + |
| + Context* context() { return m_context.get(); }; |
| + |
| +private: |
| + RefPtr<Context> m_context; |
| +}; |
| + |
| +class TestingThread final { |
| +public: |
| + explicit TestingThread(const char* name) |
| + : m_thread(WebThreadSupportingGC::create(name)) |
| + , m_waitableEvent(adoptPtr(Platform::current()->createWaitableEvent())) |
| + , m_isolate(nullptr) |
| + { |
| + m_thread->postTask(FROM_HERE, new Task(threadSafeBind(&TestingThread::initialize, AllowCrossThreadAccess(this)))); |
| + m_waitableEvent->wait(); |
| + } |
| + |
| + ~TestingThread() |
| + { |
| + m_thread->postTask(FROM_HERE, new Task(threadSafeBind(&TestingThread::shutdown, AllowCrossThreadAccess(this)))); |
| + m_waitableEvent->wait(); |
| + } |
| + |
| + WebThreadSupportingGC* thread() { return m_thread.get(); } |
| + ExecutionContext* executionContext() { return m_executionContext.get(); } |
| + |
| +private: |
| + void initialize() |
| + { |
| + m_isolate = v8::Isolate::New(v8::Isolate::CreateParams()); |
| + m_isolate->Enter(); |
| + m_thread->initialize(); |
| + m_executionContext = adoptRefWillBeNoop(new NullExecutionContext()); |
| + m_waitableEvent->signal(); |
| + } |
| + |
| + void shutdown() |
| + { |
| + m_executionContext = nullptr; |
| + m_thread->shutdown(); |
| + m_isolate->Exit(); |
| + m_isolate->Dispose(); |
| + m_isolate = nullptr; |
| + m_waitableEvent->signal(); |
| + } |
| + |
| + OwnPtr<WebThreadSupportingGC> m_thread; |
| + OwnPtr<WebWaitableEvent> m_waitableEvent; |
| + RefPtrWillBePersistent<NullExecutionContext> m_executionContext; |
| + v8::Isolate* m_isolate; |
| +}; |
| + |
| +class HandleReader : public WebDataConsumerHandle::Client { |
| +public: |
| + HandleReader() : m_finalResult(kOk) { } |
| + |
| + // Need to wait for the event signal after this function is called. |
| + void start(PassOwnPtr<WebDataConsumerHandle> handle) |
| + { |
| + m_thread = adoptPtr(new TestingThread("reading thread")); |
| + m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleReader::obtainReader, AllowCrossThreadAccess(this), handle))); |
| + m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent()); |
|
hiroshige
2015/06/23 09:20:44
L322 sould be before L321, because didGetReadable(
yhirano
2015/06/23 10:18:19
Done.
|
| + } |
| + |
| + void didGetReadable() override |
| + { |
| + Result r = kOk; |
| + char buffer[3]; |
| + while (true) { |
| + size_t size; |
| + r = m_reader->read(buffer, sizeof(buffer), kNone, &size); |
| + if (r == kShouldWait) |
| + return; |
| + if (r != kOk) |
| + break; |
| + m_readString.append(String(buffer, size)); |
| + } |
| + m_finalResult = r; |
| + m_reader = nullptr; |
| + m_waitableEvent->signal(); |
| + } |
| + |
| + WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); } |
| + |
| + // These should be accessed after the thread joines. |
|
hiroshige
2015/06/23 09:20:44
nit: s/joines/joins/?
yhirano
2015/06/23 10:18:20
Done.
|
| + const String& readString() const { return m_readString; } |
| + Result finalResult() const { return m_finalResult; } |
| + |
| +private: |
| + void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle) |
| + { |
| + m_reader = handle->obtainReader(this); |
| + } |
| + |
| + OwnPtr<TestingThread> m_thread; |
| + OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
| + String m_readString; |
| + Result m_finalResult; |
| + OwnPtr<WebWaitableEvent> m_waitableEvent; |
| +}; |
| + |
| +class HandleTwoPhaseReader : public WebDataConsumerHandle::Client { |
| +public: |
| + HandleTwoPhaseReader() : m_finalResult(kOk) { } |
| + |
| + // Need to wait for the event signal after this function is called. |
| + void start(PassOwnPtr<WebDataConsumerHandle> handle) |
| + { |
| + m_thread = adoptPtr(new TestingThread("reading thread")); |
| + m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleTwoPhaseReader::obtainReader, AllowCrossThreadAccess(this), handle))); |
| + m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent()); |
|
hiroshige
2015/06/23 09:20:44
ditto.
yhirano
2015/06/23 10:18:19
Done.
|
| + } |
| + |
| + void didGetReadable() override |
| + { |
| + Result r = kOk; |
| + while (true) { |
| + const void* buffer = nullptr; |
| + size_t size; |
| + r = m_reader->beginRead(&buffer, kNone, &size); |
| + if (r == kShouldWait) |
| + return; |
| + if (r != kOk) |
| + break; |
| + // Read smaller than availabe in order to test |endRead|. |
| + size_t readSize = std::max(size * 2 / 3, static_cast<size_t>(1)); |
|
hiroshige
2015/06/23 09:20:45
BTW, can we assume beginRead()'s size > 0?
yhirano
2015/06/23 10:18:20
No I think, calling read or beginRead with zero si
|
| + m_readString.append(String(static_cast<const char*>(buffer), readSize)); |
| + m_reader->endRead(readSize); |
| + } |
| + m_finalResult = r; |
| + m_reader = nullptr; |
| + m_waitableEvent->signal(); |
| + } |
| + |
| + WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); } |
| + |
| + // These should be accessed after the thread joines. |
|
hiroshige
2015/06/23 09:20:45
nit: joins.
yhirano
2015/06/23 10:18:19
Done.
|
| + const String& readString() const { return m_readString; } |
| + Result finalResult() const { return m_finalResult; } |
| + |
| +private: |
| + void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle) |
| + { |
| + m_reader = handle->obtainReader(this); |
| + } |
| + |
| + OwnPtr<TestingThread> m_thread; |
| + OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
| + String m_readString; |
| + Result m_finalResult; |
| + OwnPtr<WebWaitableEvent> m_waitableEvent; |
| +}; |
| + |
| +class TeeCreationThread { |
| +public: |
| + void run(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataConsumerHandle>* dest2) |
| + { |
| + m_thread = adoptPtr(new TestingThread("src thread")); |
| + m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent()); |
| + m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&TeeCreationThread::runInternal, AllowCrossThreadAccess(this), src, AllowCrossThreadAccess(dest1), AllowCrossThreadAccess(dest2)))); |
| + m_waitableEvent->wait(); |
| + } |
| + |
| + TestingThread* thread() { return m_thread.get(); } |
| + |
| +private: |
| + void runInternal(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataConsumerHandle>* dest2) |
| + { |
| + DataConsumerTee::create(m_thread->executionContext(), src, dest1, dest2); |
| + m_waitableEvent->signal(); |
| + } |
| + |
| + OwnPtr<TestingThread> m_thread; |
| + OwnPtr<WebWaitableEvent> m_waitableEvent; |
| +}; |
| + |
| +TEST(DataConsumerTeeTest, CreateDone) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Done)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleReader r1, r2; |
| + r1.start(dest1.release()); |
| + r2.start(dest2.release()); |
| + |
| + r1.waitableEvent()->wait(); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r1.finalResult()); |
| + EXPECT_EQ(String(), r1.readString()); |
| + |
| + EXPECT_EQ(kDone, r2.finalResult()); |
| + EXPECT_EQ(String(), r2.readString()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, Read) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Data, "world")); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Done)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleReader r1, r2; |
| + r1.start(dest1.release()); |
| + r2.start(dest2.release()); |
| + |
| + r1.waitableEvent()->wait(); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r1.finalResult()); |
| + EXPECT_EQ("hello, world", r1.readString()); |
| + |
| + EXPECT_EQ(kDone, r2.finalResult()); |
| + EXPECT_EQ("hello, world", r2.readString()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, TwoPhaseRead) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Data, "world")); |
| + src->add(Command(Command::Wait)); |
| + src->add(Command(Command::Done)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleTwoPhaseReader r1, r2; |
| + r1.start(dest1.release()); |
| + r2.start(dest2.release()); |
| + |
| + r1.waitableEvent()->wait(); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r1.finalResult()); |
| + EXPECT_EQ("hello, world", r1.readString()); |
| + |
| + EXPECT_EQ(kDone, r2.finalResult()); |
| + EXPECT_EQ("hello, world", r2.readString()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, Error) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Data, "world")); |
| + src->add(Command(Command::Error)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleReader r1, r2; |
| + r1.start(dest1.release()); |
| + r2.start(dest2.release()); |
| + |
| + r1.waitableEvent()->wait(); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kUnexpectedError, r1.finalResult()); |
| + EXPECT_EQ(kUnexpectedError, r2.finalResult()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, DetachSource) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Data, "world")); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleReader r1, r2; |
| + r1.start(dest1.release()); |
| + r2.start(dest2.release()); |
| + |
| + t = nullptr; |
| + |
| + r1.waitableEvent()->wait(); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kUnexpectedError, r1.finalResult()); |
| + EXPECT_EQ(kUnexpectedError, r2.finalResult()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, DetachSourceAfterReadingDone) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Data, "world")); |
| + src->add(Command(Command::Done)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + HandleReader r1, r2; |
| + r1.start(dest1.release()); |
| + r1.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r1.finalResult()); |
| + EXPECT_EQ("hello, world", r1.readString()); |
| + |
| + t = nullptr; |
| + |
| + r2.start(dest2.release()); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r2.finalResult()); |
| + EXPECT_EQ("hello, world", r2.readString()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, DetachOneDestination) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Data, "world")); |
| + src->add(Command(Command::Done)); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + dest1 = nullptr; |
| + |
| + HandleReader r2; |
| + r2.start(dest2.release()); |
| + r2.waitableEvent()->wait(); |
| + |
| + EXPECT_EQ(kDone, r2.finalResult()); |
| + EXPECT_EQ("hello, world", r2.readString()); |
| +} |
| + |
| +TEST(DataConsumerTeeTest, DetachBothDestinationsShouldStopSourceReader) |
| +{ |
| + OwnPtr<Handle> src(adoptPtr(new Handle)); |
| + RefPtr<Handle::Context> context(src->context()); |
| + OwnPtr<WebDataConsumerHandle> dest1, dest2; |
| + |
| + src->add(Command(Command::Data, "hello, ")); |
| + src->add(Command(Command::Data, "world")); |
| + |
| + OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread()); |
| + t->run(src.release(), &dest1, &dest2); |
| + |
| + ASSERT_TRUE(dest1); |
| + ASSERT_TRUE(dest2); |
| + |
| + dest1 = nullptr; |
| + dest2 = nullptr; |
| + |
| + // Collect garbage to finalize the source reader. |
| + Heap::collectGarbage(ThreadState::HeapPointersOnStack, ThreadState::GCWithSweep, Heap::ForcedGC); |
| + context->detachEvent()->wait(); |
| +} |
| + |
| +} // namespace |
| +} // namespace blink |