OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/memory/ref_counted.h" | |
9 #include "base/message_loop/message_loop.h" | |
10 #include "base/stl_util.h" | |
11 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | |
12 | |
13 namespace mojo { | |
14 namespace internal { | |
15 | |
16 // InterfaceEndpoint stores the endpoint of an interface endpoint registered | |
17 // with | |
sky
2015/11/19 17:16:27
nit: fix wrapping.
yzshen1
2015/11/19 22:00:40
Done.
| |
18 // the router. Always accessed under the router's lock. | |
sky
2015/11/19 17:16:27
To help debugging could this object take the lock
yzshen1
2015/11/19 22:00:40
Done. I added the asserts.
It adds a little cost
| |
19 // | |
20 // On creation, this object holds two refs to itself. Those refs are released | |
21 // when SetClosedMayDestruct() and SetPeerClosedMayDestruct() are called. | |
22 // Other than that, the task queue in the router may hold refs to it, too. | |
23 class MultiplexRouter::InterfaceEndpoint | |
24 : public base::RefCounted<InterfaceEndpoint> { | |
25 public: | |
26 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | |
27 : router_(router), | |
28 id_(id), | |
29 closed_(false), | |
30 peer_closed_(false), | |
31 client_(nullptr) { | |
32 AddRef(); | |
33 AddRef(); | |
sky
2015/11/19 17:16:27
I suspect a scoped_refptr to this member would be
yzshen1
2015/11/19 22:00:40
Done.
| |
34 } | |
35 | |
36 InterfaceId id() const { return id_; } | |
37 | |
38 bool closed() const { return closed_; } | |
39 void SetClosedMayDestruct() { | |
40 if (closed_) | |
41 return; | |
42 | |
43 closed_ = true; | |
44 Release(); | |
45 } | |
46 | |
47 bool peer_closed() const { return peer_closed_; } | |
48 void SetPeerClosedMayDestruct() { | |
49 if (peer_closed_) | |
50 return; | |
51 | |
52 peer_closed_ = true; | |
53 Release(); | |
54 } | |
55 | |
56 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { | |
57 return task_runner_; | |
58 } | |
59 void set_task_runner( | |
60 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | |
61 task_runner_ = task_runner.Pass(); | |
62 } | |
63 | |
64 InterfaceEndpointClient* client() const { return client_; } | |
65 void set_client(InterfaceEndpointClient* client) { client_ = client; } | |
66 | |
67 private: | |
68 friend class base::RefCounted<InterfaceEndpoint>; | |
69 | |
70 ~InterfaceEndpoint() { | |
71 DCHECK(!client_); | |
72 router_->OnEndpointDestructed(id_); | |
73 } | |
74 | |
75 MultiplexRouter* const router_; | |
76 const InterfaceId id_; | |
77 | |
78 // Whether the endpoint has been closed. | |
79 bool closed_; | |
80 // Whether the peer endpoint has been closed. | |
81 bool peer_closed_; | |
82 | |
83 // The task runner on which |client_| can be accessed. | |
84 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | |
85 // Not owned. It is null if no client is attached to this endpoint. | |
86 InterfaceEndpointClient* client_; | |
87 | |
88 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | |
89 }; | |
90 | |
91 struct MultiplexRouter::Task { | |
92 public: | |
93 // Doesn't take ownership of |message| but takes its contents. | |
94 static Task* CreateIncomingMessageTask(Message* message) { | |
95 Task* task = new Task(); | |
96 task->message.reset(new Message); | |
97 message->MoveTo(task->message.get()); | |
98 return task; | |
99 } | |
100 static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | |
101 Task* task = new Task(); | |
102 task->endpoint_to_notify = endpoint; | |
103 return task; | |
104 } | |
105 | |
106 ~Task() {} | |
107 | |
108 bool IsIncomingMessageTask() const { return !!message; } | |
109 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | |
110 | |
111 scoped_ptr<Message> message; | |
112 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | |
113 | |
114 private: | |
115 Task() {} | |
116 }; | |
117 | |
118 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | |
119 ScopedMessagePipeHandle message_pipe, | |
120 const MojoAsyncWaiter* waiter) | |
121 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() | |
122 ->task_runner()), | |
123 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | |
124 header_validator_(this), | |
125 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), | |
126 control_message_handler_(this), | |
127 control_message_proxy_(&connector_), | |
128 next_interface_id_value_(1), | |
129 testing_mode_(false) { | |
130 connector_.set_incoming_receiver(&header_validator_); | |
131 connector_.set_connection_error_handler( | |
132 [this]() { OnPipeConnectionError(); }); | |
133 } | |
134 | |
135 MultiplexRouter::~MultiplexRouter() { | |
136 base::AutoLock locker(lock_); | |
137 | |
138 while (!tasks_.empty()) { | |
139 delete tasks_.front(); | |
140 tasks_.pop_front(); | |
141 } | |
142 | |
143 auto iter = endpoints_.begin(); | |
sky
2015/11/19 17:16:27
Use a for loop (you can still increment inside the
yzshen1
2015/11/19 22:00:40
Done.
| |
144 while (iter != endpoints_.end()) { | |
145 InterfaceEndpoint* endpoint = iter->second; | |
146 ++iter; | |
sky
2015/11/19 17:16:27
This is subtle and worth a comment.
yzshen1
2015/11/19 22:00:40
Done.
| |
147 | |
148 DCHECK(endpoint->closed()); | |
149 endpoint->SetPeerClosedMayDestruct(); | |
150 } | |
151 | |
152 DCHECK(endpoints_.empty()); | |
153 } | |
154 | |
155 void MultiplexRouter::CreateEndpointHandlePair( | |
156 ScopedInterfaceEndpointHandle* local_endpoint, | |
157 ScopedInterfaceEndpointHandle* remote_endpoint) { | |
158 base::AutoLock locker(lock_); | |
159 uint32_t id = 0; | |
160 do { | |
161 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) | |
162 next_interface_id_value_ = 1; | |
163 id = next_interface_id_value_++; | |
164 if (set_interface_id_namespace_bit_) | |
165 id |= kInterfaceIdNamespaceMask; | |
166 } while (ContainsKey(endpoints_, id)); | |
167 | |
168 endpoints_[id] = new InterfaceEndpoint(this, id); | |
169 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); | |
170 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); | |
171 } | |
172 | |
173 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | |
174 InterfaceId id) { | |
175 if (!IsValidInterfaceId(id)) | |
176 return ScopedInterfaceEndpointHandle(); | |
177 | |
178 base::AutoLock locker(lock_); | |
179 if (ContainsKey(endpoints_, id)) { | |
180 // If the endpoint already exist, it is because we have received a | |
sky
2015/11/19 17:16:27
It seems weird that you have to detect this case i
yzshen1
2015/11/19 22:00:40
This is legitimate:
this method is a public method
| |
181 // notification that the peer endpoint has closed. | |
182 InterfaceEndpoint* endpoint = endpoints_[id]; | |
183 CHECK(!endpoint->closed()); | |
184 CHECK(endpoint->peer_closed()); | |
185 } else { | |
186 endpoints_[id] = new InterfaceEndpoint(this, id); | |
187 } | |
188 return ScopedInterfaceEndpointHandle(id, true, this); | |
189 } | |
190 | |
191 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { | |
192 if (!IsValidInterfaceId(id)) | |
193 return; | |
194 | |
195 base::AutoLock locker(lock_); | |
196 | |
197 if (!is_local) { | |
198 DCHECK(ContainsKey(endpoints_, id)); | |
199 DCHECK(!IsMasterInterfaceId(id)); | |
200 | |
201 // We will receive a NotifyPeerEndpointClosed message from the other side. | |
202 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); | |
203 | |
204 return; | |
205 } | |
206 | |
207 DCHECK(ContainsKey(endpoints_, id)); | |
208 InterfaceEndpoint* endpoint = endpoints_[id]; | |
209 DCHECK(!endpoint->client()); | |
210 DCHECK(!endpoint->closed()); | |
211 endpoint->SetClosedMayDestruct(); | |
212 | |
213 if (!IsMasterInterfaceId(id)) | |
214 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
215 | |
216 ProcessTasks(true); | |
217 } | |
218 | |
219 void MultiplexRouter::AttachEndpointClient( | |
220 const ScopedInterfaceEndpointHandle& handle, | |
221 InterfaceEndpointClient* client) { | |
222 const InterfaceId id = handle.id(); | |
223 | |
224 DCHECK(IsValidInterfaceId(id)); | |
225 DCHECK(client); | |
226 | |
227 base::AutoLock locker(lock_); | |
228 DCHECK(ContainsKey(endpoints_, id)); | |
229 | |
230 InterfaceEndpoint* endpoint = endpoints_[id]; | |
231 DCHECK(!endpoint->client()); | |
232 DCHECK(!endpoint->closed()); | |
233 | |
234 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | |
235 endpoint->set_client(client); | |
236 | |
237 if (endpoint->peer_closed()) | |
238 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
239 ProcessTasks(true); | |
240 } | |
241 | |
242 void MultiplexRouter::DetachEndpointClient( | |
243 const ScopedInterfaceEndpointHandle& handle) { | |
244 const InterfaceId id = handle.id(); | |
245 | |
246 DCHECK(IsValidInterfaceId(id)); | |
247 | |
248 base::AutoLock locker(lock_); | |
249 DCHECK(ContainsKey(endpoints_, id)); | |
250 | |
251 InterfaceEndpoint* endpoint = endpoints_[id]; | |
252 DCHECK(endpoint->client()); | |
253 DCHECK(!endpoint->closed()); | |
254 | |
255 endpoint->set_task_runner(nullptr); | |
256 endpoint->set_client(nullptr); | |
257 } | |
258 | |
259 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
260 Message* message) { | |
261 const InterfaceId id = handle.id(); | |
262 | |
263 base::AutoLock locker(lock_); | |
264 if (!ContainsKey(endpoints_, id)) | |
265 return false; | |
266 | |
267 InterfaceEndpoint* endpoint = endpoints_[id]; | |
268 if (endpoint->peer_closed()) | |
269 return false; | |
270 | |
271 message->set_interface_id(id); | |
272 return connector_.Accept(message); | |
273 } | |
274 | |
275 void MultiplexRouter::RaiseError() { | |
276 if (task_runner_->BelongsToCurrentThread()) { | |
277 connector_.RaiseError(); | |
278 } else { | |
279 task_runner_->PostTask(FROM_HERE, | |
280 base::Bind(&MultiplexRouter::RaiseError, this)); | |
281 } | |
282 } | |
283 | |
284 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { | |
285 DCHECK(thread_checker_.CalledOnValidThread()); | |
286 { | |
287 base::AutoLock locker(lock_); | |
288 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && | |
289 ContainsKey(endpoints_, kMasterInterfaceId))); | |
290 } | |
291 return connector_.PassMessagePipe(); | |
292 } | |
293 | |
294 void MultiplexRouter::EnableTestingMode() { | |
295 DCHECK(thread_checker_.CalledOnValidThread()); | |
296 base::AutoLock locker(lock_); | |
297 | |
298 testing_mode_ = true; | |
299 connector_.set_enforce_errors_from_incoming_receiver(false); | |
300 } | |
301 | |
302 bool MultiplexRouter::Accept(Message* message) { | |
303 DCHECK(thread_checker_.CalledOnValidThread()); | |
304 | |
305 scoped_refptr<MultiplexRouter> protector(this); | |
306 base::AutoLock locker(lock_); | |
307 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | |
308 ProcessTasks(false); | |
309 | |
310 // Always return true. If we see errors during message processing, we will | |
311 // explicitly call Connector::RaiseError() to disconnect the message pipe. | |
312 return true; | |
313 } | |
314 | |
315 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | |
316 lock_.AssertAcquired(); | |
317 | |
318 if (IsMasterInterfaceId(id)) | |
319 return false; | |
320 | |
321 if (!ContainsKey(endpoints_, id)) | |
322 endpoints_[id] = new InterfaceEndpoint(this, id); | |
323 | |
324 InterfaceEndpoint* endpoint = endpoints_[id]; | |
325 DCHECK(!endpoint->peer_closed()); | |
326 | |
327 if (endpoint->client()) | |
328 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
329 endpoint->SetPeerClosedMayDestruct(); | |
330 | |
331 // No need to trigger a ProcessTasks() because it is already on the stack. | |
332 | |
333 return true; | |
334 } | |
335 | |
336 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | |
337 lock_.AssertAcquired(); | |
338 | |
339 if (IsMasterInterfaceId(id)) | |
340 return false; | |
341 | |
342 if (!ContainsKey(endpoints_, id)) | |
343 endpoints_[id] = new InterfaceEndpoint(this, id); | |
344 | |
345 InterfaceEndpoint* endpoint = endpoints_[id]; | |
346 DCHECK(!endpoint->closed()); | |
347 endpoint->SetClosedMayDestruct(); | |
348 | |
349 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
350 | |
351 return true; | |
352 } | |
353 | |
354 void MultiplexRouter::OnPipeConnectionError() { | |
355 DCHECK(thread_checker_.CalledOnValidThread()); | |
356 | |
357 scoped_refptr<MultiplexRouter> protector(this); | |
358 base::AutoLock locker(lock_); | |
359 | |
360 auto iter = endpoints_.begin(); | |
361 while (iter != endpoints_.end()) { | |
362 InterfaceEndpoint* endpoint = iter->second; | |
363 ++iter; | |
364 | |
365 if (endpoint->client()) | |
366 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | |
367 | |
368 endpoint->SetPeerClosedMayDestruct(); | |
369 } | |
370 | |
371 ProcessTasks(false); | |
372 } | |
373 | |
374 void MultiplexRouter::ProcessTasks(bool force_async) { | |
375 lock_.AssertAcquired(); | |
376 | |
377 while (!tasks_.empty()) { | |
378 scoped_ptr<Task> task(tasks_.front()); | |
379 tasks_.pop_front(); | |
380 | |
381 bool processed = task->IsNotifyErrorTask() | |
382 ? ProcessNotifyErrorTask(task.get(), &force_async) | |
383 : ProcessIncomingMessageTask(task.get(), &force_async); | |
384 | |
385 if (!processed) { | |
386 tasks_.push_front(task.release()); | |
387 break; | |
388 } | |
389 } | |
390 } | |
391 | |
392 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { | |
393 lock_.AssertAcquired(); | |
394 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | |
395 if (!endpoint->client()) | |
396 return true; | |
397 | |
398 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | |
399 endpoint->task_runner()->PostTask( | |
400 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
401 return false; | |
402 } | |
403 | |
404 *force_async = true; | |
405 InterfaceEndpointClient* client = endpoint->client(); | |
406 { | |
407 base::AutoUnlock unlocker(lock_); | |
408 client->NotifyError(); | |
sky
2015/11/19 17:16:27
Can bad things happen between the time you unlock
yzshen1
2015/11/19 22:00:40
|client| lives on a single thread. NotifyError() a
| |
409 } | |
410 return true; | |
411 } | |
412 | |
413 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, | |
414 bool* force_async) { | |
415 lock_.AssertAcquired(); | |
416 Message* message = task->message.get(); | |
417 | |
418 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | |
419 if (!control_message_handler_.Accept(message)) | |
420 RaiseErrorInNonTestingMode(); | |
421 return true; | |
422 } | |
423 | |
424 InterfaceId id = message->interface_id(); | |
425 DCHECK(IsValidInterfaceId(id)); | |
426 | |
427 if (!ContainsKey(endpoints_, id)) { | |
428 DCHECK(!IsMasterInterfaceId(id)); | |
429 | |
430 // Currently, it is legitimate to receive messages for an endpoint | |
431 // that is not registered. For example, the endpoint is transferred in | |
432 // a message that is discarded. Once we add support to specify all | |
433 // enclosing endpoints in message header, we should be able to remove | |
434 // this. | |
435 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | |
436 endpoints_[id] = endpoint; | |
437 endpoint->SetClosedMayDestruct(); | |
438 | |
439 control_message_proxy_.NotifyPeerEndpointClosed(id); | |
440 return true; | |
441 } | |
442 | |
443 InterfaceEndpoint* endpoint = endpoints_[id]; | |
444 if (endpoint->closed()) | |
445 return true; | |
446 | |
447 if (!endpoint->client()) { | |
448 // We need to wait until a client is attached in order to dispatch further | |
449 // messages. | |
450 return false; | |
451 } | |
452 | |
453 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | |
454 endpoint->task_runner()->PostTask( | |
455 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
456 return false; | |
457 } | |
458 | |
459 *force_async = true; | |
460 InterfaceEndpointClient* client = endpoint->client(); | |
461 scoped_ptr<Message> owned_message = task->message.Pass(); | |
462 bool result = false; | |
463 { | |
464 base::AutoUnlock unlocker(lock_); | |
465 result = client->HandleIncomingMessage(owned_message.get()); | |
sky
2015/11/19 17:16:27
Similar question to above about unlocking.
yzshen1
2015/11/19 22:00:40
(Please see my comment for the previous question.)
| |
466 } | |
467 if (!result) | |
468 RaiseErrorInNonTestingMode(); | |
469 | |
470 return true; | |
471 } | |
472 | |
473 void MultiplexRouter::LockAndCallProcessTasks() { | |
474 // There is no need to hold a ref to this class in this case because this is | |
475 // always called using base::Bind(), which holds a ref. | |
476 base::AutoLock locker(lock_); | |
477 ProcessTasks(false); | |
478 } | |
479 | |
480 void MultiplexRouter::OnEndpointDestructed(InterfaceId id) { | |
481 lock_.AssertAcquired(); | |
482 endpoints_.erase(id); | |
483 } | |
484 | |
485 void MultiplexRouter::RaiseErrorInNonTestingMode() { | |
486 lock_.AssertAcquired(); | |
487 if (!testing_mode_) | |
488 RaiseError(); | |
489 } | |
490 | |
491 } // namespace internal | |
492 } // namespace mojo | |
OLD | NEW |