OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/message_loop/message_loop.h" | |
9 #include "base/stl_util.h" | |
10 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | |
11 | |
12 namespace mojo { | |
13 namespace internal { | |
14 | |
15 // InterfaceEndpoint stores the information of an interface endpoint registered | |
16 // with the router. Always accessed under the router's lock. | |
17 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | |
18 // this object. | |
19 class MultiplexRouter::InterfaceEndpoint | |
20 : public base::RefCounted<InterfaceEndpoint> { | |
21 public: | |
22 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | |
23 : router_lock_(&router->lock_), | |
24 id_(id), | |
25 closed_(false), | |
26 peer_closed_(false), | |
27 client_(nullptr) { | |
28 router_lock_->AssertAcquired(); | |
29 } | |
30 | |
31 InterfaceId id() const { return id_; } | |
32 | |
33 bool closed() const { return closed_; } | |
34 void set_closed() { | |
35 router_lock_->AssertAcquired(); | |
36 closed_ = true; | |
37 } | |
38 | |
39 bool peer_closed() const { return peer_closed_; } | |
40 void set_peer_closed() { | |
41 router_lock_->AssertAcquired(); | |
42 peer_closed_ = true; | |
43 } | |
44 | |
45 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { | |
46 return task_runner_; | |
47 } | |
48 void set_task_runner( | |
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | |
50 router_lock_->AssertAcquired(); | |
51 task_runner_ = task_runner.Pass(); | |
52 } | |
53 | |
54 InterfaceEndpointClient* client() const { return client_; } | |
55 void set_client(InterfaceEndpointClient* client) { | |
56 router_lock_->AssertAcquired(); | |
57 client_ = client; | |
58 } | |
59 | |
60 private: | |
61 friend class base::RefCounted<InterfaceEndpoint>; | |
62 | |
63 ~InterfaceEndpoint() { | |
64 router_lock_->AssertAcquired(); | |
65 | |
66 DCHECK(!client_); | |
67 DCHECK(closed_); | |
68 DCHECK(peer_closed_); | |
69 } | |
70 | |
71 base::Lock* const router_lock_; | |
72 const InterfaceId id_; | |
73 | |
74 // Whether the endpoint has been closed. | |
75 bool closed_; | |
76 // Whether the peer endpoint has been closed. | |
77 bool peer_closed_; | |
78 | |
79 // The task runner on which |client_| can be accessed. | |
80 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | |
81 // Not owned. It is null if no client is attached to this endpoint. | |
82 InterfaceEndpointClient* client_; | |
83 | |
84 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | |
85 }; | |
86 | |
87 struct MultiplexRouter::Task { | |
88 public: | |
89 // Doesn't take ownership of |message| but takes its contents. | |
90 static Task* CreateIncomingMessageTask(Message* message) { | |
91 Task* task = new Task(); | |
92 task->message.reset(new Message); | |
93 message->MoveTo(task->message.get()); | |
94 return task; | |
95 } | |
96 static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | |
97 Task* task = new Task(); | |
98 task->endpoint_to_notify = endpoint; | |
99 return task; | |
100 } | |
101 | |
102 ~Task() {} | |
103 | |
104 bool IsIncomingMessageTask() const { return !!message; } | |
105 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | |
106 | |
107 scoped_ptr<Message> message; | |
108 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | |
109 | |
110 private: | |
111 Task() {} | |
112 }; | |
113 | |
114 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | |
115 ScopedMessagePipeHandle message_pipe, | |
116 const MojoAsyncWaiter* waiter) | |
117 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() | |
118 ->task_runner()), | |
119 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | |
120 header_validator_(this), | |
121 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), | |
122 control_message_handler_(this), | |
123 control_message_proxy_(&connector_), | |
124 next_interface_id_value_(1), | |
125 testing_mode_(false) { | |
126 connector_.set_incoming_receiver(&header_validator_); | |
127 connector_.set_connection_error_handler( | |
128 [this]() { OnPipeConnectionError(); }); | |
129 } | |
130 | |
131 MultiplexRouter::~MultiplexRouter() { | |
132 base::AutoLock locker(lock_); | |
133 | |
134 while (!tasks_.empty()) { | |
135 delete tasks_.front(); | |
136 tasks_.pop_front(); | |
137 } | |
138 | |
139 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | |
140 InterfaceEndpoint* endpoint = iter->second.get(); | |
141 // Increment the iterator before calling UpdateEndpointStateMayRemove() | |
142 // because it may remove the corresponding value from the map. | |
143 ++iter; | |
144 | |
145 DCHECK(endpoint->closed()); | |
146 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | |
147 } | |
148 | |
149 DCHECK(endpoints_.empty()); | |
150 } | |
151 | |
152 void MultiplexRouter::CreateEndpointHandlePair( | |
153 ScopedInterfaceEndpointHandle* local_endpoint, | |
154 ScopedInterfaceEndpointHandle* remote_endpoint) { | |
155 base::AutoLock locker(lock_); | |
156 uint32_t id = 0; | |
157 do { | |
158 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) | |
159 next_interface_id_value_ = 1; | |
160 id = next_interface_id_value_++; | |
161 if (set_interface_id_namespace_bit_) | |
162 id |= kInterfaceIdNamespaceMask; | |
163 } while (ContainsKey(endpoints_, id)); | |
164 | |
165 endpoints_[id] = new InterfaceEndpoint(this, id); | |
166 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); | |
167 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); | |
168 } | |
169 | |
170 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | |
171 InterfaceId id) { | |
172 if (!IsValidInterfaceId(id)) | |
173 return ScopedInterfaceEndpointHandle(); | |
174 | |
175 base::AutoLock locker(lock_); | |
176 if (ContainsKey(endpoints_, id)) { | |
177 // If the endpoint already exist, it is because we have received a | |
178 // notification that the peer endpoint has closed. | |
179 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
180 CHECK(!endpoint->closed()); | |
181 CHECK(endpoint->peer_closed()); | |
182 } else { | |
183 endpoints_[id] = new InterfaceEndpoint(this, id); | |
184 } | |
185 return ScopedInterfaceEndpointHandle(id, true, this); | |
186 } | |
187 | |
188 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { | |
189 if (!IsValidInterfaceId(id)) | |
190 return; | |
191 | |
192 base::AutoLock locker(lock_); | |
193 | |
194 if (!is_local) { | |
195 DCHECK(ContainsKey(endpoints_, id)); | |
196 DCHECK(!IsMasterInterfaceId(id)); | |
197 | |
198 // We will receive a NotifyPeerEndpointClosed message from the other side. | |
199 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); | |
200 | |
201 return; | |
202 } | |
203 | |
204 DCHECK(ContainsKey(endpoints_, id)); | |
205 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
206 DCHECK(!endpoint->client()); | |
207 DCHECK(!endpoint->closed()); | |
208 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | |
209 | |
210 if (!IsMasterInterfaceId(id)) | |
211 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
212 | |
213 ProcessTasks(true); | |
214 } | |
215 | |
216 void MultiplexRouter::AttachEndpointClient( | |
217 const ScopedInterfaceEndpointHandle& handle, | |
218 InterfaceEndpointClient* client) { | |
219 const InterfaceId id = handle.id(); | |
220 | |
221 DCHECK(IsValidInterfaceId(id)); | |
222 DCHECK(client); | |
223 | |
224 base::AutoLock locker(lock_); | |
225 DCHECK(ContainsKey(endpoints_, id)); | |
226 | |
227 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
228 DCHECK(!endpoint->client()); | |
229 DCHECK(!endpoint->closed()); | |
230 | |
231 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | |
232 endpoint->set_client(client); | |
233 | |
234 if (endpoint->peer_closed()) | |
235 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
236 ProcessTasks(true); | |
237 } | |
238 | |
239 void MultiplexRouter::DetachEndpointClient( | |
240 const ScopedInterfaceEndpointHandle& handle) { | |
241 const InterfaceId id = handle.id(); | |
242 | |
243 DCHECK(IsValidInterfaceId(id)); | |
244 | |
245 base::AutoLock locker(lock_); | |
246 DCHECK(ContainsKey(endpoints_, id)); | |
247 | |
248 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
249 DCHECK(endpoint->client()); | |
250 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | |
251 DCHECK(!endpoint->closed()); | |
252 | |
253 endpoint->set_task_runner(nullptr); | |
254 endpoint->set_client(nullptr); | |
255 } | |
256 | |
257 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
258 Message* message) { | |
259 const InterfaceId id = handle.id(); | |
260 | |
261 base::AutoLock locker(lock_); | |
262 if (!ContainsKey(endpoints_, id)) | |
263 return false; | |
264 | |
265 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
266 if (endpoint->peer_closed()) | |
267 return false; | |
268 | |
269 message->set_interface_id(id); | |
270 return connector_.Accept(message); | |
271 } | |
272 | |
273 void MultiplexRouter::RaiseError() { | |
274 if (task_runner_->BelongsToCurrentThread()) { | |
275 connector_.RaiseError(); | |
276 } else { | |
277 task_runner_->PostTask(FROM_HERE, | |
278 base::Bind(&MultiplexRouter::RaiseError, this)); | |
279 } | |
280 } | |
281 | |
282 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { | |
283 DCHECK(thread_checker_.CalledOnValidThread()); | |
284 { | |
285 base::AutoLock locker(lock_); | |
286 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && | |
287 ContainsKey(endpoints_, kMasterInterfaceId))); | |
288 } | |
289 return connector_.PassMessagePipe(); | |
290 } | |
291 | |
292 void MultiplexRouter::EnableTestingMode() { | |
293 DCHECK(thread_checker_.CalledOnValidThread()); | |
294 base::AutoLock locker(lock_); | |
295 | |
296 testing_mode_ = true; | |
297 connector_.set_enforce_errors_from_incoming_receiver(false); | |
298 } | |
299 | |
300 bool MultiplexRouter::Accept(Message* message) { | |
301 DCHECK(thread_checker_.CalledOnValidThread()); | |
302 | |
303 scoped_refptr<MultiplexRouter> protector(this); | |
304 base::AutoLock locker(lock_); | |
305 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | |
306 ProcessTasks(false); | |
307 | |
308 // Always return true. If we see errors during message processing, we will | |
309 // explicitly call Connector::RaiseError() to disconnect the message pipe. | |
310 return true; | |
311 } | |
312 | |
313 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | |
314 lock_.AssertAcquired(); | |
315 | |
316 if (IsMasterInterfaceId(id)) | |
317 return false; | |
318 | |
319 if (!ContainsKey(endpoints_, id)) | |
320 endpoints_[id] = new InterfaceEndpoint(this, id); | |
321 | |
322 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
323 DCHECK(!endpoint->peer_closed()); | |
324 | |
325 if (endpoint->client()) | |
326 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
327 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | |
328 | |
329 // No need to trigger a ProcessTasks() because it is already on the stack. | |
330 | |
331 return true; | |
332 } | |
333 | |
334 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | |
335 lock_.AssertAcquired(); | |
336 | |
337 if (IsMasterInterfaceId(id)) | |
338 return false; | |
339 | |
340 if (!ContainsKey(endpoints_, id)) | |
341 endpoints_[id] = new InterfaceEndpoint(this, id); | |
342 | |
343 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
344 DCHECK(!endpoint->closed()); | |
345 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | |
346 | |
347 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
348 | |
349 return true; | |
350 } | |
351 | |
352 void MultiplexRouter::OnPipeConnectionError() { | |
353 DCHECK(thread_checker_.CalledOnValidThread()); | |
354 | |
355 scoped_refptr<MultiplexRouter> protector(this); | |
356 base::AutoLock locker(lock_); | |
357 | |
358 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | |
359 InterfaceEndpoint* endpoint = iter->second.get(); | |
360 // Increment the iterator before calling UpdateEndpointStateMayRemove() | |
361 // because it may remove the corresponding value from the map. | |
362 ++iter; | |
363 | |
364 if (endpoint->client()) | |
365 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
366 | |
367 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | |
368 } | |
369 | |
370 ProcessTasks(false); | |
371 } | |
372 | |
373 void MultiplexRouter::ProcessTasks(bool force_async) { | |
374 lock_.AssertAcquired(); | |
375 | |
376 while (!tasks_.empty()) { | |
377 scoped_ptr<Task> task(tasks_.front()); | |
378 tasks_.pop_front(); | |
379 | |
380 bool processed = task->IsNotifyErrorTask() | |
381 ? ProcessNotifyErrorTask(task.get(), &force_async) | |
382 : ProcessIncomingMessageTask(task.get(), &force_async); | |
383 | |
384 if (!processed) { | |
385 tasks_.push_front(task.release()); | |
386 break; | |
387 } | |
388 } | |
389 } | |
390 | |
391 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { | |
392 lock_.AssertAcquired(); | |
393 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | |
394 if (!endpoint->client()) | |
395 return true; | |
396 | |
397 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | |
398 endpoint->task_runner()->PostTask( | |
399 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
400 return false; | |
401 } | |
402 | |
403 *force_async = true; | |
404 InterfaceEndpointClient* client = endpoint->client(); | |
405 { | |
406 // We must unlock before calling into |client| because it may calls this | |
sky
2015/11/20 01:00:19
calls->call
yzshen1
2015/11/20 17:01:00
Done.
| |
407 // object within NotifyError(). Holding the lock will lead to deadlock. | |
408 // | |
409 // It is safe to call into |client| without the lock. Because |client| is | |
410 // always accessed on the same thread, including DetachEndpointClient(). | |
411 base::AutoUnlock unlocker(lock_); | |
412 client->NotifyError(); | |
413 } | |
414 return true; | |
415 } | |
416 | |
417 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, | |
418 bool* force_async) { | |
419 lock_.AssertAcquired(); | |
420 Message* message = task->message.get(); | |
421 | |
422 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | |
423 if (!control_message_handler_.Accept(message)) | |
424 RaiseErrorInNonTestingMode(); | |
425 return true; | |
426 } | |
427 | |
428 InterfaceId id = message->interface_id(); | |
429 DCHECK(IsValidInterfaceId(id)); | |
430 | |
431 if (!ContainsKey(endpoints_, id)) { | |
432 DCHECK(!IsMasterInterfaceId(id)); | |
433 | |
434 // Currently, it is legitimate to receive messages for an endpoint | |
435 // that is not registered. For example, the endpoint is transferred in | |
436 // a message that is discarded. Once we add support to specify all | |
437 // enclosing endpoints in message header, we should be able to remove | |
438 // this. | |
439 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | |
440 endpoints_[id] = endpoint; | |
441 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | |
442 | |
443 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
444 return true; | |
445 } | |
446 | |
447 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
448 if (endpoint->closed()) | |
449 return true; | |
450 | |
451 if (!endpoint->client()) { | |
452 // We need to wait until a client is attached in order to dispatch further | |
453 // messages. | |
454 return false; | |
455 } | |
456 | |
457 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | |
458 endpoint->task_runner()->PostTask( | |
459 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
460 return false; | |
461 } | |
462 | |
463 *force_async = true; | |
464 InterfaceEndpointClient* client = endpoint->client(); | |
465 scoped_ptr<Message> owned_message = task->message.Pass(); | |
466 bool result = false; | |
467 { | |
468 // We must unlock before calling into |client| because it may calls this | |
sky
2015/11/20 01:00:19
calls->call
yzshen1
2015/11/20 17:01:00
Done.
| |
469 // object within HandleIncomingMessage(). Holding the lock will lead to | |
470 // deadlock. | |
471 // | |
472 // It is safe to call into |client| without the lock. Because |client| is | |
473 // always accessed on the same thread, including DetachEndpointClient(). | |
474 base::AutoUnlock unlocker(lock_); | |
475 result = client->HandleIncomingMessage(owned_message.get()); | |
476 } | |
477 if (!result) | |
478 RaiseErrorInNonTestingMode(); | |
479 | |
480 return true; | |
481 } | |
482 | |
483 void MultiplexRouter::LockAndCallProcessTasks() { | |
484 // There is no need to hold a ref to this class in this case because this is | |
485 // always called using base::Bind(), which holds a ref. | |
486 base::AutoLock locker(lock_); | |
487 ProcessTasks(false); | |
488 } | |
489 | |
490 void MultiplexRouter::UpdateEndpointStateMayRemove( | |
491 InterfaceEndpoint* endpoint, | |
492 EndpointStateUpdateType type) { | |
493 switch (type) { | |
494 case ENDPOINT_CLOSED: | |
495 endpoint->set_closed(); | |
496 break; | |
497 case PEER_ENDPOINT_CLOSED: | |
498 endpoint->set_peer_closed(); | |
499 break; | |
500 } | |
501 if (endpoint->closed() && endpoint->peer_closed()) | |
502 endpoints_.erase(endpoint->id()); | |
503 } | |
504 | |
505 void MultiplexRouter::RaiseErrorInNonTestingMode() { | |
506 lock_.AssertAcquired(); | |
507 if (!testing_mode_) | |
508 RaiseError(); | |
509 } | |
510 | |
511 } // namespace internal | |
512 } // namespace mojo | |
OLD | NEW |