Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(182)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1455063004: Mojo C++ bindings: introduce MultiplexRouter and related classes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/stl_util.h"
10 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
11
12 namespace mojo {
13 namespace internal {
14
15 // InterfaceEndpoint stores the information of an interface endpoint registered
16 // with the router. Always accessed under the router's lock.
17 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
18 // this object.
19 class MultiplexRouter::InterfaceEndpoint
20 : public base::RefCounted<InterfaceEndpoint> {
21 public:
22 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
23 : router_lock_(&router->lock_),
24 id_(id),
25 closed_(false),
26 peer_closed_(false),
27 client_(nullptr) {
28 router_lock_->AssertAcquired();
29 }
30
31 InterfaceId id() const { return id_; }
32
33 bool closed() const { return closed_; }
34 void set_closed() {
35 router_lock_->AssertAcquired();
36 closed_ = true;
37 }
38
39 bool peer_closed() const { return peer_closed_; }
40 void set_peer_closed() {
41 router_lock_->AssertAcquired();
42 peer_closed_ = true;
43 }
44
45 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const {
46 return task_runner_;
47 }
48 void set_task_runner(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
50 router_lock_->AssertAcquired();
51 task_runner_ = task_runner.Pass();
52 }
53
54 InterfaceEndpointClient* client() const { return client_; }
55 void set_client(InterfaceEndpointClient* client) {
56 router_lock_->AssertAcquired();
57 client_ = client;
58 }
59
60 private:
61 friend class base::RefCounted<InterfaceEndpoint>;
62
63 ~InterfaceEndpoint() {
64 router_lock_->AssertAcquired();
65
66 DCHECK(!client_);
67 DCHECK(closed_);
68 DCHECK(peer_closed_);
69 }
70
71 base::Lock* const router_lock_;
72 const InterfaceId id_;
73
74 // Whether the endpoint has been closed.
75 bool closed_;
76 // Whether the peer endpoint has been closed.
77 bool peer_closed_;
78
79 // The task runner on which |client_| can be accessed.
80 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
81 // Not owned. It is null if no client is attached to this endpoint.
82 InterfaceEndpointClient* client_;
83
84 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
85 };
86
87 struct MultiplexRouter::Task {
88 public:
89 // Doesn't take ownership of |message| but takes its contents.
90 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) {
91 Task* task = new Task();
92 task->message.reset(new Message);
93 message->MoveTo(task->message.get());
94 return make_scoped_ptr(task);
95 }
96 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
97 Task* task = new Task();
98 task->endpoint_to_notify = endpoint;
99 return make_scoped_ptr(task);
100 }
101
102 ~Task() {}
103
104 bool IsIncomingMessageTask() const { return !!message; }
105 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; }
106
107 scoped_ptr<Message> message;
108 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
109
110 private:
111 Task() {}
112 };
113
114 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
115 ScopedMessagePipeHandle message_pipe,
116 const MojoAsyncWaiter* waiter)
117 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current()
118 ->task_runner()),
119 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
120 header_validator_(this),
121 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter),
122 encountered_error_(false),
123 control_message_handler_(this),
124 control_message_proxy_(&connector_),
125 next_interface_id_value_(1),
126 testing_mode_(false) {
127 connector_.set_incoming_receiver(&header_validator_);
128 connector_.set_connection_error_handler(
129 [this]() { OnPipeConnectionError(); });
130 }
131
132 MultiplexRouter::~MultiplexRouter() {
133 base::AutoLock locker(lock_);
134
135 tasks_.clear();
136
137 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
138 InterfaceEndpoint* endpoint = iter->second.get();
139 // Increment the iterator before calling UpdateEndpointStateMayRemove()
140 // because it may remove the corresponding value from the map.
141 ++iter;
142
143 DCHECK(endpoint->closed());
144 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
145 }
146
147 DCHECK(endpoints_.empty());
148 }
149
150 void MultiplexRouter::CreateEndpointHandlePair(
151 ScopedInterfaceEndpointHandle* local_endpoint,
152 ScopedInterfaceEndpointHandle* remote_endpoint) {
153 base::AutoLock locker(lock_);
154 uint32_t id = 0;
155 do {
156 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
157 next_interface_id_value_ = 1;
158 id = next_interface_id_value_++;
159 if (set_interface_id_namespace_bit_)
160 id |= kInterfaceIdNamespaceMask;
161 } while (ContainsKey(endpoints_, id));
162
163 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
164 endpoints_[id] = endpoint;
165 if (encountered_error_)
166 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
167
168 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this);
169 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this);
170 }
171
172 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
173 InterfaceId id) {
174 if (!IsValidInterfaceId(id))
175 return ScopedInterfaceEndpointHandle();
176
177 base::AutoLock locker(lock_);
178 if (ContainsKey(endpoints_, id)) {
179 // If the endpoint already exist, it is because we have received a
180 // notification that the peer endpoint has closed.
181 InterfaceEndpoint* endpoint = endpoints_[id].get();
182 CHECK(!endpoint->closed());
183 CHECK(endpoint->peer_closed());
184 } else {
185 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
186 endpoints_[id] = endpoint;
187 if (encountered_error_)
188 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
189 }
190 return ScopedInterfaceEndpointHandle(id, true, this);
191 }
192
193 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
194 if (!IsValidInterfaceId(id))
195 return;
196
197 base::AutoLock locker(lock_);
198
199 if (!is_local) {
200 DCHECK(ContainsKey(endpoints_, id));
201 DCHECK(!IsMasterInterfaceId(id));
202
203 // We will receive a NotifyPeerEndpointClosed message from the other side.
204 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
205
206 return;
207 }
208
209 DCHECK(ContainsKey(endpoints_, id));
210 InterfaceEndpoint* endpoint = endpoints_[id].get();
211 DCHECK(!endpoint->client());
212 DCHECK(!endpoint->closed());
213 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
214
215 if (!IsMasterInterfaceId(id))
216 control_message_proxy_.NotifyPeerEndpointClosed(id);
217
218 ProcessTasks(true);
219 }
220
221 void MultiplexRouter::AttachEndpointClient(
222 const ScopedInterfaceEndpointHandle& handle,
223 InterfaceEndpointClient* client) {
224 const InterfaceId id = handle.id();
225
226 DCHECK(IsValidInterfaceId(id));
227 DCHECK(client);
228
229 base::AutoLock locker(lock_);
230 DCHECK(ContainsKey(endpoints_, id));
231
232 InterfaceEndpoint* endpoint = endpoints_[id].get();
233 DCHECK(!endpoint->client());
234 DCHECK(!endpoint->closed());
235
236 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
237 endpoint->set_client(client);
238
239 if (endpoint->peer_closed())
240 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
241 ProcessTasks(true);
242 }
243
244 void MultiplexRouter::DetachEndpointClient(
245 const ScopedInterfaceEndpointHandle& handle) {
246 const InterfaceId id = handle.id();
247
248 DCHECK(IsValidInterfaceId(id));
249
250 base::AutoLock locker(lock_);
251 DCHECK(ContainsKey(endpoints_, id));
252
253 InterfaceEndpoint* endpoint = endpoints_[id].get();
254 DCHECK(endpoint->client());
255 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
256 DCHECK(!endpoint->closed());
257
258 endpoint->set_task_runner(nullptr);
259 endpoint->set_client(nullptr);
260 }
261
262 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
263 Message* message) {
264 const InterfaceId id = handle.id();
265
266 base::AutoLock locker(lock_);
267 if (!ContainsKey(endpoints_, id))
268 return false;
269
270 InterfaceEndpoint* endpoint = endpoints_[id].get();
271 if (endpoint->peer_closed())
272 return false;
273
274 message->set_interface_id(id);
275 return connector_.Accept(message);
276 }
277
278 void MultiplexRouter::RaiseError() {
279 if (task_runner_->BelongsToCurrentThread()) {
280 connector_.RaiseError();
281 } else {
282 task_runner_->PostTask(FROM_HERE,
283 base::Bind(&MultiplexRouter::RaiseError, this));
284 }
285 }
286
287 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() {
288 DCHECK(thread_checker_.CalledOnValidThread());
289 {
290 base::AutoLock locker(lock_);
291 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 &&
292 ContainsKey(endpoints_, kMasterInterfaceId)));
293 }
294 return connector_.PassMessagePipe();
295 }
296
297 void MultiplexRouter::EnableTestingMode() {
298 DCHECK(thread_checker_.CalledOnValidThread());
299 base::AutoLock locker(lock_);
300
301 testing_mode_ = true;
302 connector_.set_enforce_errors_from_incoming_receiver(false);
303 }
304
305 bool MultiplexRouter::Accept(Message* message) {
306 DCHECK(thread_checker_.CalledOnValidThread());
307
308 scoped_refptr<MultiplexRouter> protector(this);
309 base::AutoLock locker(lock_);
310 tasks_.push_back(Task::CreateIncomingMessageTask(message));
311 ProcessTasks(false);
312
313 // Always return true. If we see errors during message processing, we will
314 // explicitly call Connector::RaiseError() to disconnect the message pipe.
315 return true;
316 }
317
318 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
319 lock_.AssertAcquired();
320
321 if (IsMasterInterfaceId(id))
322 return false;
323
324 if (!ContainsKey(endpoints_, id))
325 endpoints_[id] = new InterfaceEndpoint(this, id);
326
327 InterfaceEndpoint* endpoint = endpoints_[id].get();
328 DCHECK(!endpoint->peer_closed());
329
330 if (endpoint->client())
331 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
332 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
333
334 // No need to trigger a ProcessTasks() because it is already on the stack.
335
336 return true;
337 }
338
339 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
340 lock_.AssertAcquired();
341
342 if (IsMasterInterfaceId(id))
343 return false;
344
345 if (!ContainsKey(endpoints_, id))
346 endpoints_[id] = new InterfaceEndpoint(this, id);
347
348 InterfaceEndpoint* endpoint = endpoints_[id].get();
349 DCHECK(!endpoint->closed());
350 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
351
352 control_message_proxy_.NotifyPeerEndpointClosed(id);
353
354 return true;
355 }
356
357 void MultiplexRouter::OnPipeConnectionError() {
358 DCHECK(thread_checker_.CalledOnValidThread());
359
360 scoped_refptr<MultiplexRouter> protector(this);
361 base::AutoLock locker(lock_);
362
363 encountered_error_ = true;
364
365 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
366 InterfaceEndpoint* endpoint = iter->second.get();
367 // Increment the iterator before calling UpdateEndpointStateMayRemove()
368 // because it may remove the corresponding value from the map.
369 ++iter;
370
371 if (endpoint->client())
372 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
373
374 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
375 }
376
377 ProcessTasks(false);
378 }
379
380 void MultiplexRouter::ProcessTasks(bool force_async) {
381 lock_.AssertAcquired();
382
383 while (!tasks_.empty()) {
384 scoped_ptr<Task> task(tasks_.front().Pass());
385 tasks_.pop_front();
386
387 bool processed = task->IsNotifyErrorTask()
388 ? ProcessNotifyErrorTask(task.get(), &force_async)
389 : ProcessIncomingMessageTask(task.get(), &force_async);
390
391 if (!processed) {
392 tasks_.push_front(task.Pass());
393 break;
394 }
395 }
396 }
397
398 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) {
399 lock_.AssertAcquired();
400 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
401 if (!endpoint->client())
402 return true;
403
404 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
405 endpoint->task_runner()->PostTask(
406 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
407 return false;
408 }
409
410 *force_async = true;
411 InterfaceEndpointClient* client = endpoint->client();
412 {
413 // We must unlock before calling into |client| because it may call this
414 // object within NotifyError(). Holding the lock will lead to deadlock.
415 //
416 // It is safe to call into |client| without the lock. Because |client| is
417 // always accessed on the same thread, including DetachEndpointClient().
418 base::AutoUnlock unlocker(lock_);
419 client->NotifyError();
420 }
421 return true;
422 }
423
424 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task,
425 bool* force_async) {
426 lock_.AssertAcquired();
427 Message* message = task->message.get();
428
429 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
430 if (!control_message_handler_.Accept(message))
431 RaiseErrorInNonTestingMode();
432 return true;
433 }
434
435 InterfaceId id = message->interface_id();
436 DCHECK(IsValidInterfaceId(id));
437
438 if (!ContainsKey(endpoints_, id)) {
439 DCHECK(!IsMasterInterfaceId(id));
440
441 // Currently, it is legitimate to receive messages for an endpoint
442 // that is not registered. For example, the endpoint is transferred in
443 // a message that is discarded. Once we add support to specify all
444 // enclosing endpoints in message header, we should be able to remove
445 // this.
446 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
447 endpoints_[id] = endpoint;
448 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
449
450 control_message_proxy_.NotifyPeerEndpointClosed(id);
451 return true;
452 }
453
454 InterfaceEndpoint* endpoint = endpoints_[id].get();
455 if (endpoint->closed())
456 return true;
457
458 if (!endpoint->client()) {
459 // We need to wait until a client is attached in order to dispatch further
460 // messages.
461 return false;
462 }
463
464 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
465 endpoint->task_runner()->PostTask(
466 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
467 return false;
468 }
469
470 *force_async = true;
471 InterfaceEndpointClient* client = endpoint->client();
472 scoped_ptr<Message> owned_message = task->message.Pass();
473 bool result = false;
474 {
475 // We must unlock before calling into |client| because it may call this
476 // object within HandleIncomingMessage(). Holding the lock will lead to
477 // deadlock.
478 //
479 // It is safe to call into |client| without the lock. Because |client| is
480 // always accessed on the same thread, including DetachEndpointClient().
481 base::AutoUnlock unlocker(lock_);
482 result = client->HandleIncomingMessage(owned_message.get());
483 }
484 if (!result)
485 RaiseErrorInNonTestingMode();
486
487 return true;
488 }
489
490 void MultiplexRouter::LockAndCallProcessTasks() {
491 // There is no need to hold a ref to this class in this case because this is
492 // always called using base::Bind(), which holds a ref.
493 base::AutoLock locker(lock_);
494 ProcessTasks(false);
495 }
496
497 void MultiplexRouter::UpdateEndpointStateMayRemove(
498 InterfaceEndpoint* endpoint,
499 EndpointStateUpdateType type) {
500 switch (type) {
501 case ENDPOINT_CLOSED:
502 endpoint->set_closed();
503 break;
504 case PEER_ENDPOINT_CLOSED:
505 endpoint->set_peer_closed();
506 break;
507 }
508 if (endpoint->closed() && endpoint->peer_closed())
509 endpoints_.erase(endpoint->id());
510 }
511
512 void MultiplexRouter::RaiseErrorInNonTestingMode() {
513 lock_.AssertAcquired();
514 if (!testing_mode_)
515 RaiseError();
516 }
517
518 } // namespace internal
519 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698