Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1455063004: Mojo C++ bindings: introduce MultiplexRouter and related classes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix mac compilation Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include "base/bind.h"
8 #include "base/memory/ref_counted.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/stl_util.h"
11 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
12
13 namespace mojo {
14 namespace internal {
15
16 // InterfaceEndpoint stores the endpoint of an interface endpoint registered
17 // with
sky 2015/11/19 17:16:27 nit: fix wrapping.
yzshen1 2015/11/19 22:00:40 Done.
18 // the router. Always accessed under the router's lock.
sky 2015/11/19 17:16:27 To help debugging could this object take the lock
yzshen1 2015/11/19 22:00:40 Done. I added the asserts. It adds a little cost
19 //
20 // On creation, this object holds two refs to itself. Those refs are released
21 // when SetClosedMayDestruct() and SetPeerClosedMayDestruct() are called.
22 // Other than that, the task queue in the router may hold refs to it, too.
23 class MultiplexRouter::InterfaceEndpoint
24 : public base::RefCounted<InterfaceEndpoint> {
25 public:
26 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
27 : router_(router),
28 id_(id),
29 closed_(false),
30 peer_closed_(false),
31 client_(nullptr) {
32 AddRef();
33 AddRef();
sky 2015/11/19 17:16:27 I suspect a scoped_refptr to this member would be
yzshen1 2015/11/19 22:00:40 Done.
34 }
35
36 InterfaceId id() const { return id_; }
37
38 bool closed() const { return closed_; }
39 void SetClosedMayDestruct() {
40 if (closed_)
41 return;
42
43 closed_ = true;
44 Release();
45 }
46
47 bool peer_closed() const { return peer_closed_; }
48 void SetPeerClosedMayDestruct() {
49 if (peer_closed_)
50 return;
51
52 peer_closed_ = true;
53 Release();
54 }
55
56 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const {
57 return task_runner_;
58 }
59 void set_task_runner(
60 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
61 task_runner_ = task_runner.Pass();
62 }
63
64 InterfaceEndpointClient* client() const { return client_; }
65 void set_client(InterfaceEndpointClient* client) { client_ = client; }
66
67 private:
68 friend class base::RefCounted<InterfaceEndpoint>;
69
70 ~InterfaceEndpoint() {
71 DCHECK(!client_);
72 router_->OnEndpointDestructed(id_);
73 }
74
75 MultiplexRouter* const router_;
76 const InterfaceId id_;
77
78 // Whether the endpoint has been closed.
79 bool closed_;
80 // Whether the peer endpoint has been closed.
81 bool peer_closed_;
82
83 // The task runner on which |client_| can be accessed.
84 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
85 // Not owned. It is null if no client is attached to this endpoint.
86 InterfaceEndpointClient* client_;
87
88 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
89 };
90
91 struct MultiplexRouter::Task {
92 public:
93 // Doesn't take ownership of |message| but takes its contents.
94 static Task* CreateIncomingMessageTask(Message* message) {
95 Task* task = new Task();
96 task->message.reset(new Message);
97 message->MoveTo(task->message.get());
98 return task;
99 }
100 static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
101 Task* task = new Task();
102 task->endpoint_to_notify = endpoint;
103 return task;
104 }
105
106 ~Task() {}
107
108 bool IsIncomingMessageTask() const { return !!message; }
109 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; }
110
111 scoped_ptr<Message> message;
112 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
113
114 private:
115 Task() {}
116 };
117
118 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
119 ScopedMessagePipeHandle message_pipe,
120 const MojoAsyncWaiter* waiter)
121 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current()
122 ->task_runner()),
123 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
124 header_validator_(this),
125 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter),
126 control_message_handler_(this),
127 control_message_proxy_(&connector_),
128 next_interface_id_value_(1),
129 testing_mode_(false) {
130 connector_.set_incoming_receiver(&header_validator_);
131 connector_.set_connection_error_handler(
132 [this]() { OnPipeConnectionError(); });
133 }
134
135 MultiplexRouter::~MultiplexRouter() {
136 base::AutoLock locker(lock_);
137
138 while (!tasks_.empty()) {
139 delete tasks_.front();
140 tasks_.pop_front();
141 }
142
143 auto iter = endpoints_.begin();
sky 2015/11/19 17:16:27 Use a for loop (you can still increment inside the
yzshen1 2015/11/19 22:00:40 Done.
144 while (iter != endpoints_.end()) {
145 InterfaceEndpoint* endpoint = iter->second;
146 ++iter;
sky 2015/11/19 17:16:27 This is subtle and worth a comment.
yzshen1 2015/11/19 22:00:40 Done.
147
148 DCHECK(endpoint->closed());
149 endpoint->SetPeerClosedMayDestruct();
150 }
151
152 DCHECK(endpoints_.empty());
153 }
154
155 void MultiplexRouter::CreateEndpointHandlePair(
156 ScopedInterfaceEndpointHandle* local_endpoint,
157 ScopedInterfaceEndpointHandle* remote_endpoint) {
158 base::AutoLock locker(lock_);
159 uint32_t id = 0;
160 do {
161 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
162 next_interface_id_value_ = 1;
163 id = next_interface_id_value_++;
164 if (set_interface_id_namespace_bit_)
165 id |= kInterfaceIdNamespaceMask;
166 } while (ContainsKey(endpoints_, id));
167
168 endpoints_[id] = new InterfaceEndpoint(this, id);
169 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this);
170 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this);
171 }
172
173 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
174 InterfaceId id) {
175 if (!IsValidInterfaceId(id))
176 return ScopedInterfaceEndpointHandle();
177
178 base::AutoLock locker(lock_);
179 if (ContainsKey(endpoints_, id)) {
180 // If the endpoint already exist, it is because we have received a
sky 2015/11/19 17:16:27 It seems weird that you have to detect this case i
yzshen1 2015/11/19 22:00:40 This is legitimate: this method is a public method
181 // notification that the peer endpoint has closed.
182 InterfaceEndpoint* endpoint = endpoints_[id];
183 CHECK(!endpoint->closed());
184 CHECK(endpoint->peer_closed());
185 } else {
186 endpoints_[id] = new InterfaceEndpoint(this, id);
187 }
188 return ScopedInterfaceEndpointHandle(id, true, this);
189 }
190
191 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
192 if (!IsValidInterfaceId(id))
193 return;
194
195 base::AutoLock locker(lock_);
196
197 if (!is_local) {
198 DCHECK(ContainsKey(endpoints_, id));
199 DCHECK(!IsMasterInterfaceId(id));
200
201 // We will receive a NotifyPeerEndpointClosed message from the other side.
202 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
203
204 return;
205 }
206
207 DCHECK(ContainsKey(endpoints_, id));
208 InterfaceEndpoint* endpoint = endpoints_[id];
209 DCHECK(!endpoint->client());
210 DCHECK(!endpoint->closed());
211 endpoint->SetClosedMayDestruct();
212
213 if (!IsMasterInterfaceId(id))
214 control_message_proxy_.NotifyPeerEndpointClosed(id);
215
216 ProcessTasks(true);
217 }
218
219 void MultiplexRouter::AttachEndpointClient(
220 const ScopedInterfaceEndpointHandle& handle,
221 InterfaceEndpointClient* client) {
222 const InterfaceId id = handle.id();
223
224 DCHECK(IsValidInterfaceId(id));
225 DCHECK(client);
226
227 base::AutoLock locker(lock_);
228 DCHECK(ContainsKey(endpoints_, id));
229
230 InterfaceEndpoint* endpoint = endpoints_[id];
231 DCHECK(!endpoint->client());
232 DCHECK(!endpoint->closed());
233
234 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
235 endpoint->set_client(client);
236
237 if (endpoint->peer_closed())
238 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
239 ProcessTasks(true);
240 }
241
242 void MultiplexRouter::DetachEndpointClient(
243 const ScopedInterfaceEndpointHandle& handle) {
244 const InterfaceId id = handle.id();
245
246 DCHECK(IsValidInterfaceId(id));
247
248 base::AutoLock locker(lock_);
249 DCHECK(ContainsKey(endpoints_, id));
250
251 InterfaceEndpoint* endpoint = endpoints_[id];
252 DCHECK(endpoint->client());
253 DCHECK(!endpoint->closed());
254
255 endpoint->set_task_runner(nullptr);
256 endpoint->set_client(nullptr);
257 }
258
259 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
260 Message* message) {
261 const InterfaceId id = handle.id();
262
263 base::AutoLock locker(lock_);
264 if (!ContainsKey(endpoints_, id))
265 return false;
266
267 InterfaceEndpoint* endpoint = endpoints_[id];
268 if (endpoint->peer_closed())
269 return false;
270
271 message->set_interface_id(id);
272 return connector_.Accept(message);
273 }
274
275 void MultiplexRouter::RaiseError() {
276 if (task_runner_->BelongsToCurrentThread()) {
277 connector_.RaiseError();
278 } else {
279 task_runner_->PostTask(FROM_HERE,
280 base::Bind(&MultiplexRouter::RaiseError, this));
281 }
282 }
283
284 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() {
285 DCHECK(thread_checker_.CalledOnValidThread());
286 {
287 base::AutoLock locker(lock_);
288 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 &&
289 ContainsKey(endpoints_, kMasterInterfaceId)));
290 }
291 return connector_.PassMessagePipe();
292 }
293
294 void MultiplexRouter::EnableTestingMode() {
295 DCHECK(thread_checker_.CalledOnValidThread());
296 base::AutoLock locker(lock_);
297
298 testing_mode_ = true;
299 connector_.set_enforce_errors_from_incoming_receiver(false);
300 }
301
302 bool MultiplexRouter::Accept(Message* message) {
303 DCHECK(thread_checker_.CalledOnValidThread());
304
305 scoped_refptr<MultiplexRouter> protector(this);
306 base::AutoLock locker(lock_);
307 tasks_.push_back(Task::CreateIncomingMessageTask(message));
308 ProcessTasks(false);
309
310 // Always return true. If we see errors during message processing, we will
311 // explicitly call Connector::RaiseError() to disconnect the message pipe.
312 return true;
313 }
314
315 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
316 lock_.AssertAcquired();
317
318 if (IsMasterInterfaceId(id))
319 return false;
320
321 if (!ContainsKey(endpoints_, id))
322 endpoints_[id] = new InterfaceEndpoint(this, id);
323
324 InterfaceEndpoint* endpoint = endpoints_[id];
325 DCHECK(!endpoint->peer_closed());
326
327 if (endpoint->client())
328 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
329 endpoint->SetPeerClosedMayDestruct();
330
331 // No need to trigger a ProcessTasks() because it is already on the stack.
332
333 return true;
334 }
335
336 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
337 lock_.AssertAcquired();
338
339 if (IsMasterInterfaceId(id))
340 return false;
341
342 if (!ContainsKey(endpoints_, id))
343 endpoints_[id] = new InterfaceEndpoint(this, id);
344
345 InterfaceEndpoint* endpoint = endpoints_[id];
346 DCHECK(!endpoint->closed());
347 endpoint->SetClosedMayDestruct();
348
349 control_message_proxy_.NotifyPeerEndpointClosed(id);
350
351 return true;
352 }
353
354 void MultiplexRouter::OnPipeConnectionError() {
355 DCHECK(thread_checker_.CalledOnValidThread());
356
357 scoped_refptr<MultiplexRouter> protector(this);
358 base::AutoLock locker(lock_);
359
360 auto iter = endpoints_.begin();
361 while (iter != endpoints_.end()) {
362 InterfaceEndpoint* endpoint = iter->second;
363 ++iter;
364
365 if (endpoint->client())
366 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
367
368 endpoint->SetPeerClosedMayDestruct();
369 }
370
371 ProcessTasks(false);
372 }
373
374 void MultiplexRouter::ProcessTasks(bool force_async) {
375 lock_.AssertAcquired();
376
377 while (!tasks_.empty()) {
378 scoped_ptr<Task> task(tasks_.front());
379 tasks_.pop_front();
380
381 bool processed = task->IsNotifyErrorTask()
382 ? ProcessNotifyErrorTask(task.get(), &force_async)
383 : ProcessIncomingMessageTask(task.get(), &force_async);
384
385 if (!processed) {
386 tasks_.push_front(task.release());
387 break;
388 }
389 }
390 }
391
392 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) {
393 lock_.AssertAcquired();
394 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
395 if (!endpoint->client())
396 return true;
397
398 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
399 endpoint->task_runner()->PostTask(
400 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
401 return false;
402 }
403
404 *force_async = true;
405 InterfaceEndpointClient* client = endpoint->client();
406 {
407 base::AutoUnlock unlocker(lock_);
408 client->NotifyError();
sky 2015/11/19 17:16:27 Can bad things happen between the time you unlock
yzshen1 2015/11/19 22:00:40 |client| lives on a single thread. NotifyError() a
409 }
410 return true;
411 }
412
413 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task,
414 bool* force_async) {
415 lock_.AssertAcquired();
416 Message* message = task->message.get();
417
418 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
419 if (!control_message_handler_.Accept(message))
420 RaiseErrorInNonTestingMode();
421 return true;
422 }
423
424 InterfaceId id = message->interface_id();
425 DCHECK(IsValidInterfaceId(id));
426
427 if (!ContainsKey(endpoints_, id)) {
428 DCHECK(!IsMasterInterfaceId(id));
429
430 // Currently, it is legitimate to receive messages for an endpoint
431 // that is not registered. For example, the endpoint is transferred in
432 // a message that is discarded. Once we add support to specify all
433 // enclosing endpoints in message header, we should be able to remove
434 // this.
435 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
436 endpoints_[id] = endpoint;
437 endpoint->SetClosedMayDestruct();
438
439 control_message_proxy_.NotifyPeerEndpointClosed(id);
440 return true;
441 }
442
443 InterfaceEndpoint* endpoint = endpoints_[id];
444 if (endpoint->closed())
445 return true;
446
447 if (!endpoint->client()) {
448 // We need to wait until a client is attached in order to dispatch further
449 // messages.
450 return false;
451 }
452
453 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
454 endpoint->task_runner()->PostTask(
455 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
456 return false;
457 }
458
459 *force_async = true;
460 InterfaceEndpointClient* client = endpoint->client();
461 scoped_ptr<Message> owned_message = task->message.Pass();
462 bool result = false;
463 {
464 base::AutoUnlock unlocker(lock_);
465 result = client->HandleIncomingMessage(owned_message.get());
sky 2015/11/19 17:16:27 Similar question to above about unlocking.
yzshen1 2015/11/19 22:00:40 (Please see my comment for the previous question.)
466 }
467 if (!result)
468 RaiseErrorInNonTestingMode();
469
470 return true;
471 }
472
473 void MultiplexRouter::LockAndCallProcessTasks() {
474 // There is no need to hold a ref to this class in this case because this is
475 // always called using base::Bind(), which holds a ref.
476 base::AutoLock locker(lock_);
477 ProcessTasks(false);
478 }
479
480 void MultiplexRouter::OnEndpointDestructed(InterfaceId id) {
481 lock_.AssertAcquired();
482 endpoints_.erase(id);
483 }
484
485 void MultiplexRouter::RaiseErrorInNonTestingMode() {
486 lock_.AssertAcquired();
487 if (!testing_mode_)
488 RaiseError();
489 }
490
491 } // namespace internal
492 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698