Index: blimp/net/grpc_stream_unittest.cc |
diff --git a/blimp/net/grpc_stream_unittest.cc b/blimp/net/grpc_stream_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f7299eb2e8fd9ca7f9fe251f250421c32ff194a0 |
--- /dev/null |
+++ b/blimp/net/grpc_stream_unittest.cc |
@@ -0,0 +1,302 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include <stddef.h> |
+ |
+#include <string> |
+#include <utility> |
+ |
+#include "base/callback_helpers.h" |
+#include "base/memory/ptr_util.h" |
+#include "base/message_loop/message_loop.h" |
+#include "blimp/common/public/session/assignment_options.h" |
+#include "blimp/net/common.h" |
+#include "blimp/net/connection_error_observer.h" |
+#include "blimp/net/grpc_client_stream.h" |
+#include "blimp/net/grpc_engine_stream.h" |
+#include "blimp/net/grpc_stream.h" |
+#include "blimp/net/test_common.h" |
+#include "net/base/completion_callback.h" |
+#include "net/base/test_completion_callback.h" |
+#include "testing/gmock/include/gmock/gmock.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+using testing::_; |
+using testing::InSequence; |
+using testing::Return; |
+using testing::SaveArg; |
+ |
+namespace blimp { |
+ |
+namespace { |
+// Unit-test for GrpcStream* classes - Client/Engine. |
+class GrpcStreamTest : public testing::Test { |
+ public: |
+ GrpcStreamTest() { |
+ assignment_options_.engine_endpoint = |
+ net::IPEndPoint(net::IPAddress(127, 0, 0, 1), 0); |
+ } |
+ |
+ protected: |
+ AssignmentOptions assignment_options_; |
+ base::MessageLoopForIO message_loop_; |
+}; |
+ |
+// Called when a message is received in the completion queue thread (callback |
+// invoked in the IO thread). |
+void OnReceive(HeliumWrapper* helium_msg, |
+ net::TestCompletionCallback* received, |
+ std::unique_ptr<HeliumWrapper> received_msg, |
+ helium::Result result) { |
+ EXPECT_EQ(helium_msg->serialized_helium_message(), |
+ received_msg->serialized_helium_message()); |
+ received->callback().Run(static_cast<int>(result)); |
+} |
+ |
+// Called when the message is being processed by the completion queue for |
+// sending (inboked in the IO thread). |
+void OnSend(net::TestCompletionCallback* sent, helium::Result result) { |
+ sent->callback().Run(static_cast<int>(result)); |
+} |
+ |
+class GrpcEngineClient { |
+ public: |
+ std::unique_ptr<GrpcEngineStream> engine; |
+ std::unique_ptr<GrpcClientStream> client; |
+ |
+ void Setup(AssignmentOptions assignment_options) { |
+ net::TestCompletionCallback engine_callback; |
+ engine = base::MakeUnique<GrpcEngineStream>(assignment_options, |
+ engine_callback.callback()); |
+ |
+ net::TestCompletionCallback client_callback; |
+ client = base::MakeUnique<GrpcClientStream>(engine->GetAssignmentOptions(), |
+ client_callback.callback()); |
+ |
+ EXPECT_EQ(net::OK, engine_callback.WaitForResult()); |
+ EXPECT_EQ(net::OK, client_callback.WaitForResult()); |
+ } |
+}; |
+ |
+std::unique_ptr<HeliumWrapper> MakeMsg(std::string str_msg) { |
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>(); |
+ msg->set_serialized_helium_message(str_msg); |
+ return msg; |
+} |
+ |
+TEST_F(GrpcStreamTest, SimpleConnect) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+} |
+ |
+TEST_F(GrpcStreamTest, ClientSendsData) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>(); |
+ msg->set_serialized_helium_message("test"); |
+ HeliumWrapper expected_msg = *msg; |
+ |
+ net::TestCompletionCallback sent; |
+ net::TestCompletionCallback received; |
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg), |
+ base::Unretained(&received))); |
+ |
+ grpc.client->SendMessage(std::move(msg), |
+ base::Bind(&OnSend, base::Unretained(&sent))); |
+ EXPECT_EQ(net::OK, sent.WaitForResult()); |
+ EXPECT_EQ(net::OK, received.WaitForResult()); |
+} |
+ |
+TEST_F(GrpcStreamTest, EngineSendsData) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>(); |
+ msg->set_serialized_helium_message("test"); |
+ HeliumWrapper expected_msg = *msg; |
+ |
+ net::TestCompletionCallback sent; |
+ net::TestCompletionCallback received; |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg), |
+ base::Unretained(&received))); |
+ |
+ grpc.engine->SendMessage(std::move(msg), |
+ base::Bind(&OnSend, base::Unretained(&sent))); |
+ EXPECT_EQ(net::OK, sent.WaitForResult()); |
+ EXPECT_EQ(net::OK, received.WaitForResult()); |
+} |
+ |
+TEST_F(GrpcStreamTest, BothSendData) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg1 = base::MakeUnique<HeliumWrapper>(); |
+ msg1->set_serialized_helium_message("test"); |
+ HeliumWrapper expected_msg1 = *msg1; |
+ |
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>(); |
+ msg2->set_serialized_helium_message("this is message 2"); |
+ HeliumWrapper expected_msg2 = *msg2; |
+ |
+ net::TestCompletionCallback sent_engine; |
+ net::TestCompletionCallback sent_client; |
+ net::TestCompletionCallback received_engine; |
+ net::TestCompletionCallback received_client; |
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg1), |
+ base::Unretained(&received_engine))); |
+ |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg2), |
+ base::Unretained(&received_client))); |
+ grpc.client->SendMessage(std::move(msg1), |
+ base::Bind(&OnSend, base::Unretained(&sent_client))); |
+ grpc.engine->SendMessage(std::move(msg2), |
+ base::Bind(&OnSend, base::Unretained(&sent_engine))); |
+ EXPECT_EQ(net::OK, sent_engine.WaitForResult()); |
+ EXPECT_EQ(net::OK, sent_client.WaitForResult()); |
+ EXPECT_EQ(net::OK, received_engine.WaitForResult()); |
+ EXPECT_EQ(net::OK, received_client.WaitForResult()); |
+} |
+ |
+TEST_F(GrpcStreamTest, ClientFinishesStream) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>(); |
+ msg->set_serialized_helium_message("test"); |
+ HeliumWrapper expected_msg = *msg; |
+ |
+ net::TestCompletionCallback received; |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg), |
+ base::Unretained(&received))); |
+ net::TestCompletionCallback sent; |
+ grpc.engine->SendMessage(std::move(msg), |
+ base::Bind(&OnSend, base::Unretained(&sent))); |
+ EXPECT_EQ(net::OK, sent.WaitForResult()); |
+ EXPECT_EQ(net::OK, received.WaitForResult()); |
+ |
+ // Client is now killed! |
+ grpc.client = nullptr; |
+ |
+ // Wait for engine to error-out. Note that the client completion queue is |
+ // destroyed asynchronously which means we need to wait until the engine sees |
+ // an error which is not immediate. |
+ int result = net::OK; |
+ while (result == net::OK) { |
+ net::TestCompletionCallback sent_error; |
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>(); |
+ msg2->set_serialized_helium_message("test2"); |
+ grpc.engine->SendMessage( |
+ std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error))); |
+ result = sent_error.WaitForResult(); |
+ } |
+} |
+ |
+TEST_F(GrpcStreamTest, EngineFinishesStream) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>(); |
+ msg->set_serialized_helium_message("test"); |
+ HeliumWrapper expected_msg = *msg; |
+ |
+ net::TestCompletionCallback received; |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg), |
+ base::Unretained(&received))); |
+ net::TestCompletionCallback sent; |
+ grpc.engine->SendMessage(std::move(msg), |
+ base::Bind(&OnSend, base::Unretained(&sent))); |
+ EXPECT_EQ(net::OK, sent.WaitForResult()); |
+ EXPECT_EQ(net::OK, received.WaitForResult()); |
+ |
+ // Engine is now killed! |
+ grpc.engine = nullptr; |
+ |
+ // Wait for client to error-out (similarly to ClientFinishesStream). |
+ int result = net::OK; |
+ while (result == net::OK) { |
+ net::TestCompletionCallback sent_error; |
+ std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>(); |
+ msg2->set_serialized_helium_message("test2"); |
+ grpc.client->SendMessage( |
+ std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error))); |
+ result = sent_error.WaitForResult(); |
+ } |
+} |
+ |
+TEST_F(GrpcStreamTest, InorderDelivery) { |
+ GrpcEngineClient grpc; |
+ grpc.Setup(assignment_options_); |
+ |
+ std::unique_ptr<HeliumWrapper> msg1 = MakeMsg("test1"); |
+ HeliumWrapper expected_msg1 = *msg1; |
+ |
+ std::unique_ptr<HeliumWrapper> msg2 = MakeMsg("test2"); |
+ HeliumWrapper expected_msg2 = *msg2; |
+ |
+ std::unique_ptr<HeliumWrapper> msg3 = MakeMsg("test3"); |
+ HeliumWrapper expected_msg3 = *msg3; |
+ |
+ std::unique_ptr<HeliumWrapper> msg4 = MakeMsg("test4"); |
+ HeliumWrapper expected_msg4 = *msg4; |
+ |
+ // Engine sends msg1, client sends msg2 and engine sends msg3 and finally |
+ // client sends msg4. |
+ net::TestCompletionCallback sent1; |
+ grpc.engine->SendMessage(std::move(msg1), |
+ base::Bind(&OnSend, base::Unretained(&sent1))); |
+ EXPECT_EQ(net::OK, sent1.WaitForResult()); |
+ net::TestCompletionCallback sent2; |
+ grpc.client->SendMessage(std::move(msg2), |
+ base::Bind(&OnSend, base::Unretained(&sent2))); |
+ EXPECT_EQ(net::OK, sent2.WaitForResult()); |
+ net::TestCompletionCallback sent3; |
+ grpc.engine->SendMessage(std::move(msg3), |
+ base::Bind(&OnSend, base::Unretained(&sent3))); |
+ EXPECT_EQ(net::OK, sent3.WaitForResult()); |
+ net::TestCompletionCallback sent4; |
+ grpc.client->SendMessage(std::move(msg4), |
+ base::Bind(&OnSend, base::Unretained(&sent4))); |
+ EXPECT_EQ(net::OK, sent4.WaitForResult()); |
+ |
+ // Now make sure the messages are received in the same order. The completion |
+ // queue on the other end will start processing the messages only when the tag |
+ // for receiving a message has been added. |
+ net::TestCompletionCallback received1; |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg1), |
+ base::Unretained(&received1))); |
+ net::TestCompletionCallback received2; |
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg2), |
+ base::Unretained(&received2))); |
+ |
+ // Engine and client pair can be awaited upon at the same time. However, the |
+ // next ReceiveMessage for the engine cannot be called until the engine has |
+ // processed the previous one first. |
+ EXPECT_EQ(net::OK, received1.WaitForResult()); |
+ EXPECT_EQ(net::OK, received2.WaitForResult()); |
+ |
+ net::TestCompletionCallback received3; |
+ grpc.client->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg3), |
+ base::Unretained(&received3))); |
+ |
+ net::TestCompletionCallback received4; |
+ grpc.engine->ReceiveMessage(base::Bind(&OnReceive, |
+ base::Unretained(&expected_msg4), |
+ base::Unretained(&received4))); |
+ EXPECT_EQ(net::OK, received3.WaitForResult()); |
+ EXPECT_EQ(net::OK, received4.WaitForResult()); |
+} |
+ |
+} // namespace |
+ |
+} // namespace blimp |