Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1455063004: Mojo C++ bindings: introduce MultiplexRouter and related classes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: changes according to review comments Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/stl_util.h"
10 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
11
12 namespace mojo {
13 namespace internal {
14
15 // InterfaceEndpoint stores the information of an interface endpoint registered
16 // with the router. Always accessed under the router's lock.
17 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
18 // this object.
19 class MultiplexRouter::InterfaceEndpoint
20 : public base::RefCounted<InterfaceEndpoint> {
21 public:
22 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
23 : router_lock_(&router->lock_),
24 id_(id),
25 closed_(false),
26 peer_closed_(false),
27 client_(nullptr) {
28 router_lock_->AssertAcquired();
29 }
30
31 InterfaceId id() const { return id_; }
32
33 bool closed() const { return closed_; }
34 void set_closed() {
35 router_lock_->AssertAcquired();
36 closed_ = true;
37 }
38
39 bool peer_closed() const { return peer_closed_; }
40 void set_peer_closed() {
41 router_lock_->AssertAcquired();
42 peer_closed_ = true;
43 }
44
45 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const {
46 return task_runner_;
47 }
48 void set_task_runner(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
50 router_lock_->AssertAcquired();
51 task_runner_ = task_runner.Pass();
52 }
53
54 InterfaceEndpointClient* client() const { return client_; }
55 void set_client(InterfaceEndpointClient* client) {
56 router_lock_->AssertAcquired();
57 client_ = client;
58 }
59
60 private:
61 friend class base::RefCounted<InterfaceEndpoint>;
62
63 ~InterfaceEndpoint() {
64 router_lock_->AssertAcquired();
65
66 DCHECK(!client_);
67 DCHECK(closed_);
68 DCHECK(peer_closed_);
69 }
70
71 base::Lock* const router_lock_;
72 const InterfaceId id_;
73
74 // Whether the endpoint has been closed.
75 bool closed_;
76 // Whether the peer endpoint has been closed.
77 bool peer_closed_;
78
79 // The task runner on which |client_| can be accessed.
80 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
81 // Not owned. It is null if no client is attached to this endpoint.
82 InterfaceEndpointClient* client_;
83
84 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
85 };
86
87 struct MultiplexRouter::Task {
88 public:
89 // Doesn't take ownership of |message| but takes its contents.
90 static Task* CreateIncomingMessageTask(Message* message) {
91 Task* task = new Task();
92 task->message.reset(new Message);
93 message->MoveTo(task->message.get());
94 return task;
95 }
96 static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
97 Task* task = new Task();
98 task->endpoint_to_notify = endpoint;
99 return task;
100 }
101
102 ~Task() {}
103
104 bool IsIncomingMessageTask() const { return !!message; }
105 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; }
106
107 scoped_ptr<Message> message;
108 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
109
110 private:
111 Task() {}
112 };
113
114 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
115 ScopedMessagePipeHandle message_pipe,
116 const MojoAsyncWaiter* waiter)
117 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current()
118 ->task_runner()),
119 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
120 header_validator_(this),
121 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter),
122 control_message_handler_(this),
123 control_message_proxy_(&connector_),
124 next_interface_id_value_(1),
125 testing_mode_(false) {
126 connector_.set_incoming_receiver(&header_validator_);
127 connector_.set_connection_error_handler(
128 [this]() { OnPipeConnectionError(); });
129 }
130
131 MultiplexRouter::~MultiplexRouter() {
132 base::AutoLock locker(lock_);
133
134 while (!tasks_.empty()) {
135 delete tasks_.front();
136 tasks_.pop_front();
137 }
138
139 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
140 InterfaceEndpoint* endpoint = iter->second.get();
141 // Increment the iterator before calling UpdateEndpointStateMayRemove()
142 // because it may remove the corresponding value from the map.
143 ++iter;
144
145 DCHECK(endpoint->closed());
146 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
147 }
148
149 DCHECK(endpoints_.empty());
150 }
151
152 void MultiplexRouter::CreateEndpointHandlePair(
153 ScopedInterfaceEndpointHandle* local_endpoint,
154 ScopedInterfaceEndpointHandle* remote_endpoint) {
155 base::AutoLock locker(lock_);
156 uint32_t id = 0;
157 do {
158 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
159 next_interface_id_value_ = 1;
160 id = next_interface_id_value_++;
161 if (set_interface_id_namespace_bit_)
162 id |= kInterfaceIdNamespaceMask;
163 } while (ContainsKey(endpoints_, id));
164
165 endpoints_[id] = new InterfaceEndpoint(this, id);
166 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this);
167 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this);
168 }
169
170 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
171 InterfaceId id) {
172 if (!IsValidInterfaceId(id))
173 return ScopedInterfaceEndpointHandle();
174
175 base::AutoLock locker(lock_);
176 if (ContainsKey(endpoints_, id)) {
177 // If the endpoint already exist, it is because we have received a
178 // notification that the peer endpoint has closed.
179 InterfaceEndpoint* endpoint = endpoints_[id].get();
180 CHECK(!endpoint->closed());
181 CHECK(endpoint->peer_closed());
182 } else {
183 endpoints_[id] = new InterfaceEndpoint(this, id);
184 }
185 return ScopedInterfaceEndpointHandle(id, true, this);
186 }
187
188 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
189 if (!IsValidInterfaceId(id))
190 return;
191
192 base::AutoLock locker(lock_);
193
194 if (!is_local) {
195 DCHECK(ContainsKey(endpoints_, id));
196 DCHECK(!IsMasterInterfaceId(id));
197
198 // We will receive a NotifyPeerEndpointClosed message from the other side.
199 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
200
201 return;
202 }
203
204 DCHECK(ContainsKey(endpoints_, id));
205 InterfaceEndpoint* endpoint = endpoints_[id].get();
206 DCHECK(!endpoint->client());
207 DCHECK(!endpoint->closed());
208 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
209
210 if (!IsMasterInterfaceId(id))
211 control_message_proxy_.NotifyPeerEndpointClosed(id);
212
213 ProcessTasks(true);
214 }
215
216 void MultiplexRouter::AttachEndpointClient(
217 const ScopedInterfaceEndpointHandle& handle,
218 InterfaceEndpointClient* client) {
219 const InterfaceId id = handle.id();
220
221 DCHECK(IsValidInterfaceId(id));
222 DCHECK(client);
223
224 base::AutoLock locker(lock_);
225 DCHECK(ContainsKey(endpoints_, id));
226
227 InterfaceEndpoint* endpoint = endpoints_[id].get();
228 DCHECK(!endpoint->client());
229 DCHECK(!endpoint->closed());
230
231 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
232 endpoint->set_client(client);
233
234 if (endpoint->peer_closed())
235 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
236 ProcessTasks(true);
237 }
238
239 void MultiplexRouter::DetachEndpointClient(
240 const ScopedInterfaceEndpointHandle& handle) {
241 const InterfaceId id = handle.id();
242
243 DCHECK(IsValidInterfaceId(id));
244
245 base::AutoLock locker(lock_);
246 DCHECK(ContainsKey(endpoints_, id));
247
248 InterfaceEndpoint* endpoint = endpoints_[id].get();
249 DCHECK(endpoint->client());
250 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
251 DCHECK(!endpoint->closed());
252
253 endpoint->set_task_runner(nullptr);
254 endpoint->set_client(nullptr);
255 }
256
257 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
258 Message* message) {
259 const InterfaceId id = handle.id();
260
261 base::AutoLock locker(lock_);
262 if (!ContainsKey(endpoints_, id))
263 return false;
264
265 InterfaceEndpoint* endpoint = endpoints_[id].get();
266 if (endpoint->peer_closed())
267 return false;
268
269 message->set_interface_id(id);
270 return connector_.Accept(message);
271 }
272
273 void MultiplexRouter::RaiseError() {
274 if (task_runner_->BelongsToCurrentThread()) {
275 connector_.RaiseError();
276 } else {
277 task_runner_->PostTask(FROM_HERE,
278 base::Bind(&MultiplexRouter::RaiseError, this));
279 }
280 }
281
282 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() {
283 DCHECK(thread_checker_.CalledOnValidThread());
284 {
285 base::AutoLock locker(lock_);
286 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 &&
287 ContainsKey(endpoints_, kMasterInterfaceId)));
288 }
289 return connector_.PassMessagePipe();
290 }
291
292 void MultiplexRouter::EnableTestingMode() {
293 DCHECK(thread_checker_.CalledOnValidThread());
294 base::AutoLock locker(lock_);
295
296 testing_mode_ = true;
297 connector_.set_enforce_errors_from_incoming_receiver(false);
298 }
299
300 bool MultiplexRouter::Accept(Message* message) {
301 DCHECK(thread_checker_.CalledOnValidThread());
302
303 scoped_refptr<MultiplexRouter> protector(this);
304 base::AutoLock locker(lock_);
305 tasks_.push_back(Task::CreateIncomingMessageTask(message));
306 ProcessTasks(false);
307
308 // Always return true. If we see errors during message processing, we will
309 // explicitly call Connector::RaiseError() to disconnect the message pipe.
310 return true;
311 }
312
313 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
314 lock_.AssertAcquired();
315
316 if (IsMasterInterfaceId(id))
317 return false;
318
319 if (!ContainsKey(endpoints_, id))
320 endpoints_[id] = new InterfaceEndpoint(this, id);
321
322 InterfaceEndpoint* endpoint = endpoints_[id].get();
323 DCHECK(!endpoint->peer_closed());
324
325 if (endpoint->client())
326 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
327 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
328
329 // No need to trigger a ProcessTasks() because it is already on the stack.
330
331 return true;
332 }
333
334 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
335 lock_.AssertAcquired();
336
337 if (IsMasterInterfaceId(id))
338 return false;
339
340 if (!ContainsKey(endpoints_, id))
341 endpoints_[id] = new InterfaceEndpoint(this, id);
342
343 InterfaceEndpoint* endpoint = endpoints_[id].get();
344 DCHECK(!endpoint->closed());
345 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
346
347 control_message_proxy_.NotifyPeerEndpointClosed(id);
348
349 return true;
350 }
351
352 void MultiplexRouter::OnPipeConnectionError() {
353 DCHECK(thread_checker_.CalledOnValidThread());
354
355 scoped_refptr<MultiplexRouter> protector(this);
356 base::AutoLock locker(lock_);
357
358 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
359 InterfaceEndpoint* endpoint = iter->second.get();
360 // Increment the iterator before calling UpdateEndpointStateMayRemove()
361 // because it may remove the corresponding value from the map.
362 ++iter;
363
364 if (endpoint->client())
365 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
366
367 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
368 }
369
370 ProcessTasks(false);
371 }
372
373 void MultiplexRouter::ProcessTasks(bool force_async) {
374 lock_.AssertAcquired();
375
376 while (!tasks_.empty()) {
377 scoped_ptr<Task> task(tasks_.front());
378 tasks_.pop_front();
379
380 bool processed = task->IsNotifyErrorTask()
381 ? ProcessNotifyErrorTask(task.get(), &force_async)
382 : ProcessIncomingMessageTask(task.get(), &force_async);
383
384 if (!processed) {
385 tasks_.push_front(task.release());
386 break;
387 }
388 }
389 }
390
391 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) {
392 lock_.AssertAcquired();
393 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
394 if (!endpoint->client())
395 return true;
396
397 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
398 endpoint->task_runner()->PostTask(
399 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
400 return false;
401 }
402
403 *force_async = true;
404 InterfaceEndpointClient* client = endpoint->client();
405 {
406 // We must unlock before calling into |client| because it may calls this
sky 2015/11/20 01:00:19 calls->call
yzshen1 2015/11/20 17:01:00 Done.
407 // object within NotifyError(). Holding the lock will lead to deadlock.
408 //
409 // It is safe to call into |client| without the lock. Because |client| is
410 // always accessed on the same thread, including DetachEndpointClient().
411 base::AutoUnlock unlocker(lock_);
412 client->NotifyError();
413 }
414 return true;
415 }
416
417 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task,
418 bool* force_async) {
419 lock_.AssertAcquired();
420 Message* message = task->message.get();
421
422 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
423 if (!control_message_handler_.Accept(message))
424 RaiseErrorInNonTestingMode();
425 return true;
426 }
427
428 InterfaceId id = message->interface_id();
429 DCHECK(IsValidInterfaceId(id));
430
431 if (!ContainsKey(endpoints_, id)) {
432 DCHECK(!IsMasterInterfaceId(id));
433
434 // Currently, it is legitimate to receive messages for an endpoint
435 // that is not registered. For example, the endpoint is transferred in
436 // a message that is discarded. Once we add support to specify all
437 // enclosing endpoints in message header, we should be able to remove
438 // this.
439 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
440 endpoints_[id] = endpoint;
441 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
442
443 control_message_proxy_.NotifyPeerEndpointClosed(id);
444 return true;
445 }
446
447 InterfaceEndpoint* endpoint = endpoints_[id].get();
448 if (endpoint->closed())
449 return true;
450
451 if (!endpoint->client()) {
452 // We need to wait until a client is attached in order to dispatch further
453 // messages.
454 return false;
455 }
456
457 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) {
458 endpoint->task_runner()->PostTask(
459 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
460 return false;
461 }
462
463 *force_async = true;
464 InterfaceEndpointClient* client = endpoint->client();
465 scoped_ptr<Message> owned_message = task->message.Pass();
466 bool result = false;
467 {
468 // We must unlock before calling into |client| because it may calls this
sky 2015/11/20 01:00:19 calls->call
yzshen1 2015/11/20 17:01:00 Done.
469 // object within HandleIncomingMessage(). Holding the lock will lead to
470 // deadlock.
471 //
472 // It is safe to call into |client| without the lock. Because |client| is
473 // always accessed on the same thread, including DetachEndpointClient().
474 base::AutoUnlock unlocker(lock_);
475 result = client->HandleIncomingMessage(owned_message.get());
476 }
477 if (!result)
478 RaiseErrorInNonTestingMode();
479
480 return true;
481 }
482
483 void MultiplexRouter::LockAndCallProcessTasks() {
484 // There is no need to hold a ref to this class in this case because this is
485 // always called using base::Bind(), which holds a ref.
486 base::AutoLock locker(lock_);
487 ProcessTasks(false);
488 }
489
490 void MultiplexRouter::UpdateEndpointStateMayRemove(
491 InterfaceEndpoint* endpoint,
492 EndpointStateUpdateType type) {
493 switch (type) {
494 case ENDPOINT_CLOSED:
495 endpoint->set_closed();
496 break;
497 case PEER_ENDPOINT_CLOSED:
498 endpoint->set_peer_closed();
499 break;
500 }
501 if (endpoint->closed() && endpoint->peer_closed())
502 endpoints_.erase(endpoint->id());
503 }
504
505 void MultiplexRouter::RaiseErrorInNonTestingMode() {
506 lock_.AssertAcquired();
507 if (!testing_mode_)
508 RaiseError();
509 }
510
511 } // namespace internal
512 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698