Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3461)

Unified Diff: blimp/net/grpc_stream_unittest.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « blimp/net/grpc_stream.cc ('k') | blimp/net/tcp_engine_transport.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: blimp/net/grpc_stream_unittest.cc
diff --git a/blimp/net/grpc_stream_unittest.cc b/blimp/net/grpc_stream_unittest.cc
new file mode 100644
index 0000000000000000000000000000000000000000..f7299eb2e8fd9ca7f9fe251f250421c32ff194a0
--- /dev/null
+++ b/blimp/net/grpc_stream_unittest.cc
@@ -0,0 +1,302 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <stddef.h>
+
+#include <string>
+#include <utility>
+
+#include "base/callback_helpers.h"
+#include "base/memory/ptr_util.h"
+#include "base/message_loop/message_loop.h"
+#include "blimp/common/public/session/assignment_options.h"
+#include "blimp/net/common.h"
+#include "blimp/net/connection_error_observer.h"
+#include "blimp/net/grpc_client_stream.h"
+#include "blimp/net/grpc_engine_stream.h"
+#include "blimp/net/grpc_stream.h"
+#include "blimp/net/test_common.h"
+#include "net/base/completion_callback.h"
+#include "net/base/test_completion_callback.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::InSequence;
+using testing::Return;
+using testing::SaveArg;
+
+namespace blimp {
+
+namespace {
+// Unit-test for GrpcStream* classes - Client/Engine.
+class GrpcStreamTest : public testing::Test {
+ public:
+ GrpcStreamTest() {
+ assignment_options_.engine_endpoint =
+ net::IPEndPoint(net::IPAddress(127, 0, 0, 1), 0);
+ }
+
+ protected:
+ AssignmentOptions assignment_options_;
+ base::MessageLoopForIO message_loop_;
+};
+
+// Called when a message is received in the completion queue thread (callback
+// invoked in the IO thread).
+void OnReceive(HeliumWrapper* helium_msg,
+ net::TestCompletionCallback* received,
+ std::unique_ptr<HeliumWrapper> received_msg,
+ helium::Result result) {
+ EXPECT_EQ(helium_msg->serialized_helium_message(),
+ received_msg->serialized_helium_message());
+ received->callback().Run(static_cast<int>(result));
+}
+
+// Called when the message is being processed by the completion queue for
+// sending (inboked in the IO thread).
+void OnSend(net::TestCompletionCallback* sent, helium::Result result) {
+ sent->callback().Run(static_cast<int>(result));
+}
+
+class GrpcEngineClient {
+ public:
+ std::unique_ptr<GrpcEngineStream> engine;
+ std::unique_ptr<GrpcClientStream> client;
+
+ void Setup(AssignmentOptions assignment_options) {
+ net::TestCompletionCallback engine_callback;
+ engine = base::MakeUnique<GrpcEngineStream>(assignment_options,
+ engine_callback.callback());
+
+ net::TestCompletionCallback client_callback;
+ client = base::MakeUnique<GrpcClientStream>(engine->GetAssignmentOptions(),
+ client_callback.callback());
+
+ EXPECT_EQ(net::OK, engine_callback.WaitForResult());
+ EXPECT_EQ(net::OK, client_callback.WaitForResult());
+ }
+};
+
+std::unique_ptr<HeliumWrapper> MakeMsg(std::string str_msg) {
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
+ msg->set_serialized_helium_message(str_msg);
+ return msg;
+}
+
+TEST_F(GrpcStreamTest, SimpleConnect) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+}
+
+TEST_F(GrpcStreamTest, ClientSendsData) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
+ msg->set_serialized_helium_message("test");
+ HeliumWrapper expected_msg = *msg;
+
+ net::TestCompletionCallback sent;
+ net::TestCompletionCallback received;
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg),
+ base::Unretained(&received)));
+
+ grpc.client->SendMessage(std::move(msg),
+ base::Bind(&OnSend, base::Unretained(&sent)));
+ EXPECT_EQ(net::OK, sent.WaitForResult());
+ EXPECT_EQ(net::OK, received.WaitForResult());
+}
+
+TEST_F(GrpcStreamTest, EngineSendsData) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
+ msg->set_serialized_helium_message("test");
+ HeliumWrapper expected_msg = *msg;
+
+ net::TestCompletionCallback sent;
+ net::TestCompletionCallback received;
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg),
+ base::Unretained(&received)));
+
+ grpc.engine->SendMessage(std::move(msg),
+ base::Bind(&OnSend, base::Unretained(&sent)));
+ EXPECT_EQ(net::OK, sent.WaitForResult());
+ EXPECT_EQ(net::OK, received.WaitForResult());
+}
+
+TEST_F(GrpcStreamTest, BothSendData) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg1 = base::MakeUnique<HeliumWrapper>();
+ msg1->set_serialized_helium_message("test");
+ HeliumWrapper expected_msg1 = *msg1;
+
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
+ msg2->set_serialized_helium_message("this is message 2");
+ HeliumWrapper expected_msg2 = *msg2;
+
+ net::TestCompletionCallback sent_engine;
+ net::TestCompletionCallback sent_client;
+ net::TestCompletionCallback received_engine;
+ net::TestCompletionCallback received_client;
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg1),
+ base::Unretained(&received_engine)));
+
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg2),
+ base::Unretained(&received_client)));
+ grpc.client->SendMessage(std::move(msg1),
+ base::Bind(&OnSend, base::Unretained(&sent_client)));
+ grpc.engine->SendMessage(std::move(msg2),
+ base::Bind(&OnSend, base::Unretained(&sent_engine)));
+ EXPECT_EQ(net::OK, sent_engine.WaitForResult());
+ EXPECT_EQ(net::OK, sent_client.WaitForResult());
+ EXPECT_EQ(net::OK, received_engine.WaitForResult());
+ EXPECT_EQ(net::OK, received_client.WaitForResult());
+}
+
+TEST_F(GrpcStreamTest, ClientFinishesStream) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
+ msg->set_serialized_helium_message("test");
+ HeliumWrapper expected_msg = *msg;
+
+ net::TestCompletionCallback received;
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg),
+ base::Unretained(&received)));
+ net::TestCompletionCallback sent;
+ grpc.engine->SendMessage(std::move(msg),
+ base::Bind(&OnSend, base::Unretained(&sent)));
+ EXPECT_EQ(net::OK, sent.WaitForResult());
+ EXPECT_EQ(net::OK, received.WaitForResult());
+
+ // Client is now killed!
+ grpc.client = nullptr;
+
+ // Wait for engine to error-out. Note that the client completion queue is
+ // destroyed asynchronously which means we need to wait until the engine sees
+ // an error which is not immediate.
+ int result = net::OK;
+ while (result == net::OK) {
+ net::TestCompletionCallback sent_error;
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
+ msg2->set_serialized_helium_message("test2");
+ grpc.engine->SendMessage(
+ std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
+ result = sent_error.WaitForResult();
+ }
+}
+
+TEST_F(GrpcStreamTest, EngineFinishesStream) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
+ msg->set_serialized_helium_message("test");
+ HeliumWrapper expected_msg = *msg;
+
+ net::TestCompletionCallback received;
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg),
+ base::Unretained(&received)));
+ net::TestCompletionCallback sent;
+ grpc.engine->SendMessage(std::move(msg),
+ base::Bind(&OnSend, base::Unretained(&sent)));
+ EXPECT_EQ(net::OK, sent.WaitForResult());
+ EXPECT_EQ(net::OK, received.WaitForResult());
+
+ // Engine is now killed!
+ grpc.engine = nullptr;
+
+ // Wait for client to error-out (similarly to ClientFinishesStream).
+ int result = net::OK;
+ while (result == net::OK) {
+ net::TestCompletionCallback sent_error;
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
+ msg2->set_serialized_helium_message("test2");
+ grpc.client->SendMessage(
+ std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
+ result = sent_error.WaitForResult();
+ }
+}
+
+TEST_F(GrpcStreamTest, InorderDelivery) {
+ GrpcEngineClient grpc;
+ grpc.Setup(assignment_options_);
+
+ std::unique_ptr<HeliumWrapper> msg1 = MakeMsg("test1");
+ HeliumWrapper expected_msg1 = *msg1;
+
+ std::unique_ptr<HeliumWrapper> msg2 = MakeMsg("test2");
+ HeliumWrapper expected_msg2 = *msg2;
+
+ std::unique_ptr<HeliumWrapper> msg3 = MakeMsg("test3");
+ HeliumWrapper expected_msg3 = *msg3;
+
+ std::unique_ptr<HeliumWrapper> msg4 = MakeMsg("test4");
+ HeliumWrapper expected_msg4 = *msg4;
+
+ // Engine sends msg1, client sends msg2 and engine sends msg3 and finally
+ // client sends msg4.
+ net::TestCompletionCallback sent1;
+ grpc.engine->SendMessage(std::move(msg1),
+ base::Bind(&OnSend, base::Unretained(&sent1)));
+ EXPECT_EQ(net::OK, sent1.WaitForResult());
+ net::TestCompletionCallback sent2;
+ grpc.client->SendMessage(std::move(msg2),
+ base::Bind(&OnSend, base::Unretained(&sent2)));
+ EXPECT_EQ(net::OK, sent2.WaitForResult());
+ net::TestCompletionCallback sent3;
+ grpc.engine->SendMessage(std::move(msg3),
+ base::Bind(&OnSend, base::Unretained(&sent3)));
+ EXPECT_EQ(net::OK, sent3.WaitForResult());
+ net::TestCompletionCallback sent4;
+ grpc.client->SendMessage(std::move(msg4),
+ base::Bind(&OnSend, base::Unretained(&sent4)));
+ EXPECT_EQ(net::OK, sent4.WaitForResult());
+
+ // Now make sure the messages are received in the same order. The completion
+ // queue on the other end will start processing the messages only when the tag
+ // for receiving a message has been added.
+ net::TestCompletionCallback received1;
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg1),
+ base::Unretained(&received1)));
+ net::TestCompletionCallback received2;
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg2),
+ base::Unretained(&received2)));
+
+ // Engine and client pair can be awaited upon at the same time. However, the
+ // next ReceiveMessage for the engine cannot be called until the engine has
+ // processed the previous one first.
+ EXPECT_EQ(net::OK, received1.WaitForResult());
+ EXPECT_EQ(net::OK, received2.WaitForResult());
+
+ net::TestCompletionCallback received3;
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg3),
+ base::Unretained(&received3)));
+
+ net::TestCompletionCallback received4;
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
+ base::Unretained(&expected_msg4),
+ base::Unretained(&received4)));
+ EXPECT_EQ(net::OK, received3.WaitForResult());
+ EXPECT_EQ(net::OK, received4.WaitForResult());
+}
+
+} // namespace
+
+} // namespace blimp
« no previous file with comments | « blimp/net/grpc_stream.cc ('k') | blimp/net/tcp_engine_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698