Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(493)

Side by Side Diff: blimp/net/grpc_stream_unittest.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/grpc_stream.cc ('k') | blimp/net/tcp_engine_transport.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stddef.h>
6
7 #include <string>
8 #include <utility>
9
10 #include "base/callback_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/message_loop/message_loop.h"
13 #include "blimp/common/public/session/assignment_options.h"
14 #include "blimp/net/common.h"
15 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/grpc_client_stream.h"
17 #include "blimp/net/grpc_engine_stream.h"
18 #include "blimp/net/grpc_stream.h"
19 #include "blimp/net/test_common.h"
20 #include "net/base/completion_callback.h"
21 #include "net/base/test_completion_callback.h"
22 #include "testing/gmock/include/gmock/gmock.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24
25 using testing::_;
26 using testing::InSequence;
27 using testing::Return;
28 using testing::SaveArg;
29
30 namespace blimp {
31
32 namespace {
33 // Unit-test for GrpcStream* classes - Client/Engine.
34 class GrpcStreamTest : public testing::Test {
35 public:
36 GrpcStreamTest() {
37 assignment_options_.engine_endpoint =
38 net::IPEndPoint(net::IPAddress(127, 0, 0, 1), 0);
39 }
40
41 protected:
42 AssignmentOptions assignment_options_;
43 base::MessageLoopForIO message_loop_;
44 };
45
46 // Called when a message is received in the completion queue thread (callback
47 // invoked in the IO thread).
48 void OnReceive(HeliumWrapper* helium_msg,
49 net::TestCompletionCallback* received,
50 std::unique_ptr<HeliumWrapper> received_msg,
51 helium::Result result) {
52 EXPECT_EQ(helium_msg->serialized_helium_message(),
53 received_msg->serialized_helium_message());
54 received->callback().Run(static_cast<int>(result));
55 }
56
57 // Called when the message is being processed by the completion queue for
58 // sending (inboked in the IO thread).
59 void OnSend(net::TestCompletionCallback* sent, helium::Result result) {
60 sent->callback().Run(static_cast<int>(result));
61 }
62
63 class GrpcEngineClient {
64 public:
65 std::unique_ptr<GrpcEngineStream> engine;
66 std::unique_ptr<GrpcClientStream> client;
67
68 void Setup(AssignmentOptions assignment_options) {
69 net::TestCompletionCallback engine_callback;
70 engine = base::MakeUnique<GrpcEngineStream>(assignment_options,
71 engine_callback.callback());
72
73 net::TestCompletionCallback client_callback;
74 client = base::MakeUnique<GrpcClientStream>(engine->GetAssignmentOptions(),
75 client_callback.callback());
76
77 EXPECT_EQ(net::OK, engine_callback.WaitForResult());
78 EXPECT_EQ(net::OK, client_callback.WaitForResult());
79 }
80 };
81
82 std::unique_ptr<HeliumWrapper> MakeMsg(std::string str_msg) {
83 std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
84 msg->set_serialized_helium_message(str_msg);
85 return msg;
86 }
87
88 TEST_F(GrpcStreamTest, SimpleConnect) {
89 GrpcEngineClient grpc;
90 grpc.Setup(assignment_options_);
91 }
92
93 TEST_F(GrpcStreamTest, ClientSendsData) {
94 GrpcEngineClient grpc;
95 grpc.Setup(assignment_options_);
96
97 std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
98 msg->set_serialized_helium_message("test");
99 HeliumWrapper expected_msg = *msg;
100
101 net::TestCompletionCallback sent;
102 net::TestCompletionCallback received;
103 grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
104 base::Unretained(&expected_msg),
105 base::Unretained(&received)));
106
107 grpc.client->SendMessage(std::move(msg),
108 base::Bind(&OnSend, base::Unretained(&sent)));
109 EXPECT_EQ(net::OK, sent.WaitForResult());
110 EXPECT_EQ(net::OK, received.WaitForResult());
111 }
112
113 TEST_F(GrpcStreamTest, EngineSendsData) {
114 GrpcEngineClient grpc;
115 grpc.Setup(assignment_options_);
116
117 std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
118 msg->set_serialized_helium_message("test");
119 HeliumWrapper expected_msg = *msg;
120
121 net::TestCompletionCallback sent;
122 net::TestCompletionCallback received;
123 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
124 base::Unretained(&expected_msg),
125 base::Unretained(&received)));
126
127 grpc.engine->SendMessage(std::move(msg),
128 base::Bind(&OnSend, base::Unretained(&sent)));
129 EXPECT_EQ(net::OK, sent.WaitForResult());
130 EXPECT_EQ(net::OK, received.WaitForResult());
131 }
132
133 TEST_F(GrpcStreamTest, BothSendData) {
134 GrpcEngineClient grpc;
135 grpc.Setup(assignment_options_);
136
137 std::unique_ptr<HeliumWrapper> msg1 = base::MakeUnique<HeliumWrapper>();
138 msg1->set_serialized_helium_message("test");
139 HeliumWrapper expected_msg1 = *msg1;
140
141 std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
142 msg2->set_serialized_helium_message("this is message 2");
143 HeliumWrapper expected_msg2 = *msg2;
144
145 net::TestCompletionCallback sent_engine;
146 net::TestCompletionCallback sent_client;
147 net::TestCompletionCallback received_engine;
148 net::TestCompletionCallback received_client;
149 grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
150 base::Unretained(&expected_msg1),
151 base::Unretained(&received_engine)));
152
153 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
154 base::Unretained(&expected_msg2),
155 base::Unretained(&received_client)));
156 grpc.client->SendMessage(std::move(msg1),
157 base::Bind(&OnSend, base::Unretained(&sent_client)));
158 grpc.engine->SendMessage(std::move(msg2),
159 base::Bind(&OnSend, base::Unretained(&sent_engine)));
160 EXPECT_EQ(net::OK, sent_engine.WaitForResult());
161 EXPECT_EQ(net::OK, sent_client.WaitForResult());
162 EXPECT_EQ(net::OK, received_engine.WaitForResult());
163 EXPECT_EQ(net::OK, received_client.WaitForResult());
164 }
165
166 TEST_F(GrpcStreamTest, ClientFinishesStream) {
167 GrpcEngineClient grpc;
168 grpc.Setup(assignment_options_);
169
170 std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
171 msg->set_serialized_helium_message("test");
172 HeliumWrapper expected_msg = *msg;
173
174 net::TestCompletionCallback received;
175 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
176 base::Unretained(&expected_msg),
177 base::Unretained(&received)));
178 net::TestCompletionCallback sent;
179 grpc.engine->SendMessage(std::move(msg),
180 base::Bind(&OnSend, base::Unretained(&sent)));
181 EXPECT_EQ(net::OK, sent.WaitForResult());
182 EXPECT_EQ(net::OK, received.WaitForResult());
183
184 // Client is now killed!
185 grpc.client = nullptr;
186
187 // Wait for engine to error-out. Note that the client completion queue is
188 // destroyed asynchronously which means we need to wait until the engine sees
189 // an error which is not immediate.
190 int result = net::OK;
191 while (result == net::OK) {
192 net::TestCompletionCallback sent_error;
193 std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
194 msg2->set_serialized_helium_message("test2");
195 grpc.engine->SendMessage(
196 std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
197 result = sent_error.WaitForResult();
198 }
199 }
200
201 TEST_F(GrpcStreamTest, EngineFinishesStream) {
202 GrpcEngineClient grpc;
203 grpc.Setup(assignment_options_);
204
205 std::unique_ptr<HeliumWrapper> msg = base::MakeUnique<HeliumWrapper>();
206 msg->set_serialized_helium_message("test");
207 HeliumWrapper expected_msg = *msg;
208
209 net::TestCompletionCallback received;
210 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
211 base::Unretained(&expected_msg),
212 base::Unretained(&received)));
213 net::TestCompletionCallback sent;
214 grpc.engine->SendMessage(std::move(msg),
215 base::Bind(&OnSend, base::Unretained(&sent)));
216 EXPECT_EQ(net::OK, sent.WaitForResult());
217 EXPECT_EQ(net::OK, received.WaitForResult());
218
219 // Engine is now killed!
220 grpc.engine = nullptr;
221
222 // Wait for client to error-out (similarly to ClientFinishesStream).
223 int result = net::OK;
224 while (result == net::OK) {
225 net::TestCompletionCallback sent_error;
226 std::unique_ptr<HeliumWrapper> msg2 = base::MakeUnique<HeliumWrapper>();
227 msg2->set_serialized_helium_message("test2");
228 grpc.client->SendMessage(
229 std::move(msg2), base::Bind(&OnSend, base::Unretained(&sent_error)));
230 result = sent_error.WaitForResult();
231 }
232 }
233
234 TEST_F(GrpcStreamTest, InorderDelivery) {
235 GrpcEngineClient grpc;
236 grpc.Setup(assignment_options_);
237
238 std::unique_ptr<HeliumWrapper> msg1 = MakeMsg("test1");
239 HeliumWrapper expected_msg1 = *msg1;
240
241 std::unique_ptr<HeliumWrapper> msg2 = MakeMsg("test2");
242 HeliumWrapper expected_msg2 = *msg2;
243
244 std::unique_ptr<HeliumWrapper> msg3 = MakeMsg("test3");
245 HeliumWrapper expected_msg3 = *msg3;
246
247 std::unique_ptr<HeliumWrapper> msg4 = MakeMsg("test4");
248 HeliumWrapper expected_msg4 = *msg4;
249
250 // Engine sends msg1, client sends msg2 and engine sends msg3 and finally
251 // client sends msg4.
252 net::TestCompletionCallback sent1;
253 grpc.engine->SendMessage(std::move(msg1),
254 base::Bind(&OnSend, base::Unretained(&sent1)));
255 EXPECT_EQ(net::OK, sent1.WaitForResult());
256 net::TestCompletionCallback sent2;
257 grpc.client->SendMessage(std::move(msg2),
258 base::Bind(&OnSend, base::Unretained(&sent2)));
259 EXPECT_EQ(net::OK, sent2.WaitForResult());
260 net::TestCompletionCallback sent3;
261 grpc.engine->SendMessage(std::move(msg3),
262 base::Bind(&OnSend, base::Unretained(&sent3)));
263 EXPECT_EQ(net::OK, sent3.WaitForResult());
264 net::TestCompletionCallback sent4;
265 grpc.client->SendMessage(std::move(msg4),
266 base::Bind(&OnSend, base::Unretained(&sent4)));
267 EXPECT_EQ(net::OK, sent4.WaitForResult());
268
269 // Now make sure the messages are received in the same order. The completion
270 // queue on the other end will start processing the messages only when the tag
271 // for receiving a message has been added.
272 net::TestCompletionCallback received1;
273 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
274 base::Unretained(&expected_msg1),
275 base::Unretained(&received1)));
276 net::TestCompletionCallback received2;
277 grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
278 base::Unretained(&expected_msg2),
279 base::Unretained(&received2)));
280
281 // Engine and client pair can be awaited upon at the same time. However, the
282 // next ReceiveMessage for the engine cannot be called until the engine has
283 // processed the previous one first.
284 EXPECT_EQ(net::OK, received1.WaitForResult());
285 EXPECT_EQ(net::OK, received2.WaitForResult());
286
287 net::TestCompletionCallback received3;
288 grpc.client->ReceiveMessage(base::Bind(&OnReceive,
289 base::Unretained(&expected_msg3),
290 base::Unretained(&received3)));
291
292 net::TestCompletionCallback received4;
293 grpc.engine->ReceiveMessage(base::Bind(&OnReceive,
294 base::Unretained(&expected_msg4),
295 base::Unretained(&received4)));
296 EXPECT_EQ(net::OK, received3.WaitForResult());
297 EXPECT_EQ(net::OK, received4.WaitForResult());
298 }
299
300 } // namespace
301
302 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/grpc_stream.cc ('k') | blimp/net/tcp_engine_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698