Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: mojo/system/remote_message_pipe_unittest.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include <fcntl.h>
8 #include <stdint.h>
9 #include <string.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12 #include <unistd.h>
13
14 #include "base/basictypes.h"
15 #include "base/bind.h"
16 #include "base/callback.h"
17 #include "base/location.h"
18 #include "base/logging.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/threading/thread.h"
22 #include "mojo/system/channel.h"
23 #include "mojo/system/local_message_pipe_endpoint.h"
24 #include "mojo/system/platform_channel_handle.h"
25 #include "mojo/system/proxy_message_pipe_endpoint.h"
26 #include "mojo/system/test_utils.h"
27 #include "mojo/system/waiter.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29
30 namespace mojo {
31 namespace system {
32 namespace {
33
34 class RemoteMessagePipeTest : public testing::Test {
35 public:
36 RemoteMessagePipeTest() : io_thread_("io_thread") {
37 }
38
39 virtual ~RemoteMessagePipeTest() {
40 }
41
42 virtual void SetUp() OVERRIDE {
43 io_thread_.StartWithOptions(
44 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
45
46 test::PostTaskAndWait(io_thread_task_runner(),
47 FROM_HERE,
48 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
49 base::Unretained(this)));
50 }
51
52 virtual void TearDown() OVERRIDE {
53 test::PostTaskAndWait(io_thread_task_runner(),
54 FROM_HERE,
55 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
56 base::Unretained(this)));
57 io_thread_.Stop();
58 }
59
60 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
61 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
62 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
63 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,
64 scoped_refptr<MessagePipe> mp_1) {
65 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
66
67 MessageInTransit::EndpointId local_id_0 =
68 channels_[0]->AttachMessagePipeEndpoint(mp_0, 1);
69 MessageInTransit::EndpointId local_id_1 =
70 channels_[1]->AttachMessagePipeEndpoint(mp_1, 0);
71
72 channels_[0]->RunMessagePipeEndpoint(local_id_0, local_id_1);
73 channels_[1]->RunMessagePipeEndpoint(local_id_1, local_id_0);
74 }
75
76 protected:
77 base::MessageLoop* io_thread_message_loop() {
78 return io_thread_.message_loop();
79 }
80
81 scoped_refptr<base::TaskRunner> io_thread_task_runner() {
82 return io_thread_message_loop()->message_loop_proxy();
83 }
84
85 private:
86 void SetUpOnIOThread() {
87 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
88
89 // Create the socket.
90 int fds[2];
91 PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
92
93 // Set the ends to non-blocking.
94 PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
95 PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
96
97 // Create |Channel|s.
98 channels_[0] = new Channel(PlatformChannelHandle(fds[0]));
99 channels_[1] = new Channel(PlatformChannelHandle(fds[1]));
100 }
101
102 void TearDownOnIOThread() {
103 channels_[1]->Shutdown();
104 channels_[1] = NULL;
105 channels_[0]->Shutdown();
106 channels_[0] = NULL;
107 }
108
109 base::Thread io_thread_;
110 scoped_refptr<Channel> channels_[2];
111
112 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
113 };
114
115 TEST_F(RemoteMessagePipeTest, Basic) {
116 const char hello[] = "hello";
117 const char world[] = "world!!!1!!!1!";
118 char buffer[100] = { 0 };
119 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
120 Waiter waiter;
121
122 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
123 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
124 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
125
126 scoped_refptr<MessagePipe> mp_0(new MessagePipe(
127 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
128 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
129 scoped_refptr<MessagePipe> mp_1(new MessagePipe(
130 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
131 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
132 test::PostTaskAndWait(
133 io_thread_task_runner(),
134 FROM_HERE,
135 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
136 base::Unretained(this), mp_0, mp_1));
137
138 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
139
140 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
141 // it later, it might already be readable.)
142 waiter.Init();
143 EXPECT_EQ(MOJO_RESULT_OK,
144 mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
145
146 // Write to MP 0, port 0.
147 EXPECT_EQ(MOJO_RESULT_OK,
148 mp_0->WriteMessage(0,
149 hello, sizeof(hello),
150 NULL, 0,
151 MOJO_WRITE_MESSAGE_FLAG_NONE));
152
153 // Wait.
154 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
155 mp_1->RemoveWaiter(1, &waiter);
156
157 // Read from MP 1, port 1.
158 EXPECT_EQ(MOJO_RESULT_OK,
159 mp_1->ReadMessage(1,
160 buffer, &buffer_size,
161 NULL, NULL,
162 MOJO_READ_MESSAGE_FLAG_NONE));
163 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
164 EXPECT_EQ(0, strcmp(buffer, hello));
165
166 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
167
168 waiter.Init();
169 EXPECT_EQ(MOJO_RESULT_OK,
170 mp_0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
171
172 EXPECT_EQ(MOJO_RESULT_OK,
173 mp_1->WriteMessage(1,
174 world, sizeof(world),
175 NULL, 0,
176 MOJO_WRITE_MESSAGE_FLAG_NONE));
177
178 EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
179 mp_0->RemoveWaiter(0, &waiter);
180
181 buffer_size = static_cast<uint32_t>(sizeof(buffer));
182 EXPECT_EQ(MOJO_RESULT_OK,
183 mp_0->ReadMessage(0,
184 buffer, &buffer_size,
185 NULL, NULL,
186 MOJO_READ_MESSAGE_FLAG_NONE));
187 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
188 EXPECT_EQ(0, strcmp(buffer, world));
189
190 // Close MP 0, port 0.
191 mp_0->Close(0);
192
193 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
194 // when it realizes that MP 0, port 0 has been closed. (It may also fail
195 // immediately.)
196 waiter.Init();
197 MojoResult result = mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
198 if (result == MOJO_RESULT_OK) {
199 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
200 waiter.Wait(MOJO_DEADLINE_INDEFINITE));
201 mp_1->RemoveWaiter(1, &waiter);
202 } else {
203 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
204 }
205
206 // And MP 1, port 1.
207 mp_1->Close(1);
208 }
209
210 TEST_F(RemoteMessagePipeTest, Multiplex) {
211 const char hello[] = "hello";
212 const char world[] = "world!!!1!!!1!";
213 char buffer[100] = { 0 };
214 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
215 Waiter waiter;
216
217 // Connect message pipes as in the |Basic| test.
218
219 scoped_refptr<MessagePipe> mp_0(new MessagePipe(
220 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
221 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
222 scoped_refptr<MessagePipe> mp_1(new MessagePipe(
223 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
224 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
225 test::PostTaskAndWait(
226 io_thread_task_runner(),
227 FROM_HERE,
228 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
229 base::Unretained(this), mp_0, mp_1));
230
231 // Now put another message pipe on the channel.
232
233 scoped_refptr<MessagePipe> mp_2(new MessagePipe(
234 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
235 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
236 scoped_refptr<MessagePipe> mp_3(new MessagePipe(
237 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
238 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
239 test::PostTaskAndWait(
240 io_thread_task_runner(),
241 FROM_HERE,
242 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
243 base::Unretained(this), mp_2, mp_3));
244
245 // Write: MP 2, port 0 -> MP 3, port 1.
246
247 waiter.Init();
248 EXPECT_EQ(MOJO_RESULT_OK,
249 mp_3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
250
251 EXPECT_EQ(MOJO_RESULT_OK,
252 mp_2->WriteMessage(0,
253 hello, sizeof(hello),
254 NULL, 0,
255 MOJO_WRITE_MESSAGE_FLAG_NONE));
256
257 EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
258 mp_3->RemoveWaiter(1, &waiter);
259
260 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
261 buffer_size = static_cast<uint32_t>(sizeof(buffer));
262 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
263 mp_0->ReadMessage(0,
264 buffer, &buffer_size,
265 NULL, NULL,
266 MOJO_READ_MESSAGE_FLAG_NONE));
267 buffer_size = static_cast<uint32_t>(sizeof(buffer));
268 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
269 mp_1->ReadMessage(1,
270 buffer, &buffer_size,
271 NULL, NULL,
272 MOJO_READ_MESSAGE_FLAG_NONE));
273 buffer_size = static_cast<uint32_t>(sizeof(buffer));
274 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
275 mp_2->ReadMessage(0,
276 buffer, &buffer_size,
277 NULL, NULL,
278 MOJO_READ_MESSAGE_FLAG_NONE));
279
280 // Read from MP 3, port 1.
281 buffer_size = static_cast<uint32_t>(sizeof(buffer));
282 EXPECT_EQ(MOJO_RESULT_OK,
283 mp_3->ReadMessage(1,
284 buffer, &buffer_size,
285 NULL, NULL,
286 MOJO_READ_MESSAGE_FLAG_NONE));
287 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
288 EXPECT_EQ(0, strcmp(buffer, hello));
289
290 // Write: MP 0, port 0 -> MP 1, port 1 again.
291
292 waiter.Init();
293 EXPECT_EQ(MOJO_RESULT_OK,
294 mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
295
296 EXPECT_EQ(MOJO_RESULT_OK,
297 mp_0->WriteMessage(0,
298 world, sizeof(world),
299 NULL, 0,
300 MOJO_WRITE_MESSAGE_FLAG_NONE));
301
302 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
303 mp_1->RemoveWaiter(1, &waiter);
304
305 // Make sure there's nothing on the other ports.
306 buffer_size = static_cast<uint32_t>(sizeof(buffer));
307 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
308 mp_0->ReadMessage(0,
309 buffer, &buffer_size,
310 NULL, NULL,
311 MOJO_READ_MESSAGE_FLAG_NONE));
312 buffer_size = static_cast<uint32_t>(sizeof(buffer));
313 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
314 mp_2->ReadMessage(0,
315 buffer, &buffer_size,
316 NULL, NULL,
317 MOJO_READ_MESSAGE_FLAG_NONE));
318 buffer_size = static_cast<uint32_t>(sizeof(buffer));
319 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
320 mp_3->ReadMessage(1,
321 buffer, &buffer_size,
322 NULL, NULL,
323 MOJO_READ_MESSAGE_FLAG_NONE));
324
325 buffer_size = static_cast<uint32_t>(sizeof(buffer));
326 EXPECT_EQ(MOJO_RESULT_OK,
327 mp_1->ReadMessage(1,
328 buffer, &buffer_size,
329 NULL, NULL,
330 MOJO_READ_MESSAGE_FLAG_NONE));
331 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
332 EXPECT_EQ(0, strcmp(buffer, world));
333 }
334
335 } // namespace
336 } // namespace system
337 } // namespace mojo
OLDNEW
« mojo/system/proxy_message_pipe_endpoint.cc ('K') | « mojo/system/raw_channel_posix_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698