| Index: blimp/net/grpc_stream_unittest.cc
|
| diff --git a/blimp/net/grpc_stream_unittest.cc b/blimp/net/grpc_stream_unittest.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..f7299eb2e8fd9ca7f9fe251f250421c32ff194a0
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_stream_unittest.cc
|
| @@ -0,0 +1,302 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include <stddef.h>
|
| +
|
| +#include <string>
|
| +#include <utility>
|
| +
|
| +#include "base/callback_helpers.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "blimp/common/public/session/assignment_options.h"
|
| +#include "blimp/net/common.h"
|
| +#include "blimp/net/connection_error_observer.h"
|
| +#include "blimp/net/grpc_client_stream.h"
|
| +#include "blimp/net/grpc_engine_stream.h"
|
| +#include "blimp/net/grpc_stream.h"
|
| +#include "blimp/net/test_common.h"
|
| +#include "net/base/completion_callback.h"
|
| +#include "net/base/test_completion_callback.h"
|
| +#include "testing/gmock/include/gmock/gmock.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +using testing::_;
|
| +using testing::InSequence;
|
| +using testing::Return;
|
| +using testing::SaveArg;
|
| +
|
| +namespace blimp {
|
| +
|
| +namespace {
|
| +// Unit-test for GrpcStream* classes - Client/Engine.
|
| +class GrpcStreamTest : public testing::Test {
|
| + public:
|
| + GrpcStreamTest() {
|
| + assignment_options_.engine_endpoint =
|
| + net::IPEndPoint(net::IPAddress(127, 0, 0, 1), 0);
|
| + }
|
| +
|
| + protected:
|
| + AssignmentOptions assignment_options_;
|
| + base::MessageLoopForIO message_loop_;
|
| +};
|
| +
|
| +// Called when a message is received in the completion queue thread (callback
|
| +// invoked in the IO thread).
|
| +void OnReceive(HeliumWrapper* helium_msg,
|
| + net::TestCompletionCallback* received,
|
| + std::unique_ptr<HeliumWrapper> received_msg,
|
| + helium::Result result) {
|
| + EXPECT_EQ(helium_msg->serialized_helium_message(),
|
| + received_msg->serialized_helium_message());
|
| + received->callback().Run(static_cast<int>(result));
|
| +}
|
| +
|
| +// Called when the message is being processed by the completion queue for
|
| +// sending (inboked in the IO thread).
|
| +void OnSend(net::TestCompletionCallback* sent, helium::Result result) {
|
| + sent->callback().Run(static_cast<int>(result));
|
| +}
|
| +
|
| +class GrpcEngineClient {
|
| + public:
|
| + std::unique_ptr<GrpcEngineStream> engine;
|
| + std::unique_ptr<GrpcClientStream> client;
|
| +
|
| + void Setup(AssignmentOptions assignment_options) {
|
| + net::TestCompletionCallback engine_callback;
|
| + engine = base::MakeUnique<GrpcEngineStream>(assignment_options,
|
| + engine_callback.callback());
|
| +
|
| + net::TestCompletionCallback client_callback;
|
| + client = base::MakeUnique<GrpcClientStream>(engine->GetAssignmentOptions(),
|
| + client_callback.callback());
|
| +
|
| + EXPECT_EQ(net::OK, engine_callback.WaitForResult());
|
| + EXPECT_EQ(net::OK, client_callback.WaitForResult());
|
| + }
|
| +};
|
| +
|
| +std::unique_ptr<HeliumWrapper> MakeMsg(std::string str_msg) {
|
| + std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
|
| + msg->set_serialized_helium_message(str_msg);
|
| + return msg;
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, SimpleConnect) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, ClientSendsData) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
|
| + msg->set_serialized_helium_message("test");
|
| + HeliumWrapper expected_msg = *msg;
|
| +
|
| + net::TestCompletionCallback sent;
|
| + net::TestCompletionCallback received;
|
| + grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg),
|
| + base::Unretained(&received)));
|
| +
|
| + grpc.client->SendMessage(std::move(msg),
|
| + base::Bind(&OnSend, base::Unretained(&sent)));
|
| + EXPECT_EQ(net::OK, sent.WaitForResult());
|
| + EXPECT_EQ(net::OK, received.WaitForResult());
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, EngineSendsData) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
|
| + msg->set_serialized_helium_message("test");
|
| + HeliumWrapper expected_msg = *msg;
|
| +
|
| + net::TestCompletionCallback sent;
|
| + net::TestCompletionCallback received;
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg),
|
| + base::Unretained(&received)));
|
| +
|
| + grpc.engine->SendMessage(std::move(msg),
|
| + base::Bind(&OnSend, base::Unretained(&sent)));
|
| + EXPECT_EQ(net::OK, sent.WaitForResult());
|
| + EXPECT_EQ(net::OK, received.WaitForResult());
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, BothSendData) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg1 = base::MakeUnique<HeliumWrapper>();
|
| + msg1->set_serialized_helium_message("test");
|
| + HeliumWrapper expected_msg1 = *msg1;
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
|
| + msg2->set_serialized_helium_message("this is message 2");
|
| + HeliumWrapper expected_msg2 = *msg2;
|
| +
|
| + net::TestCompletionCallback sent_engine;
|
| + net::TestCompletionCallback sent_client;
|
| + net::TestCompletionCallback received_engine;
|
| + net::TestCompletionCallback received_client;
|
| + grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg1),
|
| + base::Unretained(&received_engine)));
|
| +
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg2),
|
| + base::Unretained(&received_client)));
|
| + grpc.client->SendMessage(std::move(msg1),
|
| + base::Bind(&OnSend, base::Unretained(&sent_client)));
|
| + grpc.engine->SendMessage(std::move(msg2),
|
| + base::Bind(&OnSend, base::Unretained(&sent_engine)));
|
| + EXPECT_EQ(net::OK, sent_engine.WaitForResult());
|
| + EXPECT_EQ(net::OK, sent_client.WaitForResult());
|
| + EXPECT_EQ(net::OK, received_engine.WaitForResult());
|
| + EXPECT_EQ(net::OK, received_client.WaitForResult());
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, ClientFinishesStream) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
|
| + msg->set_serialized_helium_message("test");
|
| + HeliumWrapper expected_msg = *msg;
|
| +
|
| + net::TestCompletionCallback received;
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg),
|
| + base::Unretained(&received)));
|
| + net::TestCompletionCallback sent;
|
| + grpc.engine->SendMessage(std::move(msg),
|
| + base::Bind(&OnSend, base::Unretained(&sent)));
|
| + EXPECT_EQ(net::OK, sent.WaitForResult());
|
| + EXPECT_EQ(net::OK, received.WaitForResult());
|
| +
|
| + // Client is now killed!
|
| + grpc.client = nullptr;
|
| +
|
| + // Wait for engine to error-out. Note that the client completion queue is
|
| + // destroyed asynchronously which means we need to wait until the engine sees
|
| + // an error which is not immediate.
|
| + int result = net::OK;
|
| + while (result == net::OK) {
|
| + net::TestCompletionCallback sent_error;
|
| + std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
|
| + msg2->set_serialized_helium_message("test2");
|
| + grpc.engine->SendMessage(
|
| + std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
|
| + result = sent_error.WaitForResult();
|
| + }
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, EngineFinishesStream) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
|
| + msg->set_serialized_helium_message("test");
|
| + HeliumWrapper expected_msg = *msg;
|
| +
|
| + net::TestCompletionCallback received;
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg),
|
| + base::Unretained(&received)));
|
| + net::TestCompletionCallback sent;
|
| + grpc.engine->SendMessage(std::move(msg),
|
| + base::Bind(&OnSend, base::Unretained(&sent)));
|
| + EXPECT_EQ(net::OK, sent.WaitForResult());
|
| + EXPECT_EQ(net::OK, received.WaitForResult());
|
| +
|
| + // Engine is now killed!
|
| + grpc.engine = nullptr;
|
| +
|
| + // Wait for client to error-out (similarly to ClientFinishesStream).
|
| + int result = net::OK;
|
| + while (result == net::OK) {
|
| + net::TestCompletionCallback sent_error;
|
| + std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
|
| + msg2->set_serialized_helium_message("test2");
|
| + grpc.client->SendMessage(
|
| + std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
|
| + result = sent_error.WaitForResult();
|
| + }
|
| +}
|
| +
|
| +TEST_F(GrpcStreamTest, InorderDelivery) {
|
| + GrpcEngineClient grpc;
|
| + grpc.Setup(assignment_options_);
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg1 = MakeMsg("test1");
|
| + HeliumWrapper expected_msg1 = *msg1;
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg2 = MakeMsg("test2");
|
| + HeliumWrapper expected_msg2 = *msg2;
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg3 = MakeMsg("test3");
|
| + HeliumWrapper expected_msg3 = *msg3;
|
| +
|
| + std::unique_ptr<HeliumWrapper> msg4 = MakeMsg("test4");
|
| + HeliumWrapper expected_msg4 = *msg4;
|
| +
|
| + // Engine sends msg1, client sends msg2 and engine sends msg3 and finally
|
| + // client sends msg4.
|
| + net::TestCompletionCallback sent1;
|
| + grpc.engine->SendMessage(std::move(msg1),
|
| + base::Bind(&OnSend, base::Unretained(&sent1)));
|
| + EXPECT_EQ(net::OK, sent1.WaitForResult());
|
| + net::TestCompletionCallback sent2;
|
| + grpc.client->SendMessage(std::move(msg2),
|
| + base::Bind(&OnSend, base::Unretained(&sent2)));
|
| + EXPECT_EQ(net::OK, sent2.WaitForResult());
|
| + net::TestCompletionCallback sent3;
|
| + grpc.engine->SendMessage(std::move(msg3),
|
| + base::Bind(&OnSend, base::Unretained(&sent3)));
|
| + EXPECT_EQ(net::OK, sent3.WaitForResult());
|
| + net::TestCompletionCallback sent4;
|
| + grpc.client->SendMessage(std::move(msg4),
|
| + base::Bind(&OnSend, base::Unretained(&sent4)));
|
| + EXPECT_EQ(net::OK, sent4.WaitForResult());
|
| +
|
| + // Now make sure the messages are received in the same order. The completion
|
| + // queue on the other end will start processing the messages only when the tag
|
| + // for receiving a message has been added.
|
| + net::TestCompletionCallback received1;
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg1),
|
| + base::Unretained(&received1)));
|
| + net::TestCompletionCallback received2;
|
| + grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg2),
|
| + base::Unretained(&received2)));
|
| +
|
| + // Engine and client pair can be awaited upon at the same time. However, the
|
| + // next ReceiveMessage for the engine cannot be called until the engine has
|
| + // processed the previous one first.
|
| + EXPECT_EQ(net::OK, received1.WaitForResult());
|
| + EXPECT_EQ(net::OK, received2.WaitForResult());
|
| +
|
| + net::TestCompletionCallback received3;
|
| + grpc.client->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg3),
|
| + base::Unretained(&received3)));
|
| +
|
| + net::TestCompletionCallback received4;
|
| + grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
|
| + base::Unretained(&expected_msg4),
|
| + base::Unretained(&received4)));
|
| + EXPECT_EQ(net::OK, received3.WaitForResult());
|
| + EXPECT_EQ(net::OK, received4.WaitForResult());
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +} // namespace blimp
|
|
|