OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/macros.h" | 12 #include "base/macros.h" |
13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
16 #include "mojo/public/cpp/bindings/associated_group.h" | 16 #include "mojo/public/cpp/bindings/associated_group.h" |
17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" |
| 19 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
18 | 20 |
19 namespace mojo { | 21 namespace mojo { |
20 namespace internal { | 22 namespace internal { |
21 | 23 |
22 // InterfaceEndpoint stores the information of an interface endpoint registered | 24 // InterfaceEndpoint stores the information of an interface endpoint registered |
23 // with the router. Always accessed under the router's lock. | 25 // with the router. |
24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 26 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
25 // this object. | 27 // this object. |
26 class MultiplexRouter::InterfaceEndpoint | 28 class MultiplexRouter::InterfaceEndpoint |
27 : public base::RefCounted<InterfaceEndpoint> { | 29 : public base::RefCounted<InterfaceEndpoint>, |
| 30 public InterfaceEndpointController { |
28 public: | 31 public: |
29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
30 : router_lock_(&router->lock_), | 33 : router_(router), |
31 id_(id), | 34 id_(id), |
32 closed_(false), | 35 closed_(false), |
33 peer_closed_(false), | 36 peer_closed_(false), |
34 client_(nullptr) { | 37 client_(nullptr), |
35 router_lock_->AssertAcquired(); | 38 event_signalled_(false) {} |
36 } | 39 |
| 40 // --------------------------------------------------------------------------- |
| 41 // The following public methods are safe to call from any threads without |
| 42 // locking. |
37 | 43 |
38 InterfaceId id() const { return id_; } | 44 InterfaceId id() const { return id_; } |
39 | 45 |
| 46 // --------------------------------------------------------------------------- |
| 47 // The following public methods are called under the router's lock. |
| 48 |
40 bool closed() const { return closed_; } | 49 bool closed() const { return closed_; } |
41 void set_closed() { | 50 void set_closed() { |
42 router_lock_->AssertAcquired(); | 51 router_->lock_.AssertAcquired(); |
43 closed_ = true; | 52 closed_ = true; |
44 } | 53 } |
45 | 54 |
46 bool peer_closed() const { return peer_closed_; } | 55 bool peer_closed() const { return peer_closed_; } |
47 void set_peer_closed() { | 56 void set_peer_closed() { |
48 router_lock_->AssertAcquired(); | 57 router_->lock_.AssertAcquired(); |
49 peer_closed_ = true; | 58 peer_closed_ = true; |
50 } | 59 } |
51 | 60 |
52 base::SingleThreadTaskRunner* task_runner() const { | 61 base::SingleThreadTaskRunner* task_runner() const { |
53 return task_runner_.get(); | 62 return task_runner_.get(); |
54 } | 63 } |
55 void set_task_runner( | |
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | |
57 router_lock_->AssertAcquired(); | |
58 task_runner_ = std::move(task_runner); | |
59 } | |
60 | 64 |
61 InterfaceEndpointClient* client() const { return client_; } | 65 InterfaceEndpointClient* client() const { return client_; } |
62 void set_client(InterfaceEndpointClient* client) { | 66 |
63 router_lock_->AssertAcquired(); | 67 void AttachClient(InterfaceEndpointClient* client) { |
| 68 router_->lock_.AssertAcquired(); |
| 69 DCHECK(!client_); |
| 70 DCHECK(!closed_); |
| 71 |
| 72 task_runner_ = base::MessageLoop::current()->task_runner(); |
64 client_ = client; | 73 client_ = client; |
65 } | 74 } |
66 | 75 |
| 76 // This method must be called on the same thread as the corresponding |
| 77 // AttachClient() call. |
| 78 void DetachClient() { |
| 79 router_->lock_.AssertAcquired(); |
| 80 DCHECK(client_); |
| 81 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 82 DCHECK(!closed_); |
| 83 |
| 84 task_runner_ = nullptr; |
| 85 client_ = nullptr; |
| 86 sync_watcher_.reset(); |
| 87 } |
| 88 |
| 89 void SignalSyncMessageEvent() { |
| 90 router_->lock_.AssertAcquired(); |
| 91 if (event_signalled_) |
| 92 return; |
| 93 |
| 94 EnsureEventMessagePipeExists(); |
| 95 event_signalled_ = true; |
| 96 MojoResult result = |
| 97 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, |
| 98 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
| 99 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 100 } |
| 101 |
| 102 // --------------------------------------------------------------------------- |
| 103 // The following public methods (i.e., InterfaceEndpointController |
| 104 // implementation) are called by the client on the same thread as the |
| 105 // AttachClient() call. They are called outside of the router's lock. |
| 106 |
| 107 bool SendMessage(Message* message) override { |
| 108 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 109 message->set_interface_id(id_); |
| 110 return router_->connector_.Accept(message); |
| 111 } |
| 112 |
| 113 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 114 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 115 |
| 116 EnsureSyncWatcherExists(); |
| 117 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 118 } |
| 119 |
| 120 bool SyncWatch(const bool* should_stop) override { |
| 121 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 122 |
| 123 EnsureSyncWatcherExists(); |
| 124 return sync_watcher_->SyncWatch(should_stop); |
| 125 } |
| 126 |
67 private: | 127 private: |
68 friend class base::RefCounted<InterfaceEndpoint>; | 128 friend class base::RefCounted<InterfaceEndpoint>; |
69 | 129 |
70 ~InterfaceEndpoint() { | 130 ~InterfaceEndpoint() override { |
71 router_lock_->AssertAcquired(); | 131 router_->lock_.AssertAcquired(); |
72 | 132 |
73 DCHECK(!client_); | 133 DCHECK(!client_); |
74 DCHECK(closed_); | 134 DCHECK(closed_); |
75 DCHECK(peer_closed_); | 135 DCHECK(peer_closed_); |
76 } | 136 DCHECK(!sync_watcher_); |
77 | 137 } |
78 base::Lock* const router_lock_; | 138 |
| 139 void OnHandleReady(MojoResult result) { |
| 140 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 141 scoped_refptr<InterfaceEndpoint> self_protector(this); |
| 142 scoped_refptr<MultiplexRouter> router_protector(router_); |
| 143 |
| 144 // Because we never close |sync_message_event_{sender,receiver}_| before |
| 145 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
| 146 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 147 bool reset_sync_watcher = false; |
| 148 { |
| 149 base::AutoLock locker(router_->lock_); |
| 150 |
| 151 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| 152 |
| 153 if (!more_to_process) |
| 154 ResetSyncMessageSignal(); |
| 155 |
| 156 // Currently there are no queued sync messages and the peer has closed so |
| 157 // there won't be incoming sync messages in the future. |
| 158 reset_sync_watcher = !more_to_process && peer_closed_; |
| 159 } |
| 160 if (reset_sync_watcher) { |
| 161 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
| 162 // on the call stack, resetting the sync watcher will allow it to exit |
| 163 // when the call stack unwinds to that frame. |
| 164 sync_watcher_.reset(); |
| 165 } |
| 166 } |
| 167 |
| 168 void EnsureSyncWatcherExists() { |
| 169 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 170 if (sync_watcher_) |
| 171 return; |
| 172 |
| 173 { |
| 174 base::AutoLock locker(router_->lock_); |
| 175 EnsureEventMessagePipeExists(); |
| 176 |
| 177 auto iter = router_->sync_message_tasks_.find(id_); |
| 178 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| 179 SignalSyncMessageEvent(); |
| 180 } |
| 181 |
| 182 sync_watcher_.reset(new SyncHandleWatcher( |
| 183 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 184 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
| 185 } |
| 186 |
| 187 void EnsureEventMessagePipeExists() { |
| 188 router_->lock_.AssertAcquired(); |
| 189 |
| 190 if (sync_message_event_receiver_.is_valid()) |
| 191 return; |
| 192 |
| 193 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
| 194 &sync_message_event_receiver_); |
| 195 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 196 } |
| 197 |
| 198 void ResetSyncMessageSignal() { |
| 199 router_->lock_.AssertAcquired(); |
| 200 |
| 201 if (!event_signalled_) |
| 202 return; |
| 203 |
| 204 DCHECK(sync_message_event_receiver_.is_valid()); |
| 205 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), |
| 206 nullptr, nullptr, nullptr, nullptr, |
| 207 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| 208 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 209 event_signalled_ = false; |
| 210 } |
| 211 |
| 212 // --------------------------------------------------------------------------- |
| 213 // The following members are safe to access from any threads. |
| 214 |
| 215 MultiplexRouter* const router_; |
79 const InterfaceId id_; | 216 const InterfaceId id_; |
80 | 217 |
| 218 // --------------------------------------------------------------------------- |
| 219 // The following members are accessed under the router's lock. |
| 220 |
81 // Whether the endpoint has been closed. | 221 // Whether the endpoint has been closed. |
82 bool closed_; | 222 bool closed_; |
83 // Whether the peer endpoint has been closed. | 223 // Whether the peer endpoint has been closed. |
84 bool peer_closed_; | 224 bool peer_closed_; |
85 | 225 |
86 // The task runner on which |client_| can be accessed. | 226 // The task runner on which |client_|'s methods can be called. |
87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 227 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
88 // Not owned. It is null if no client is attached to this endpoint. | 228 // Not owned. It is null if no client is attached to this endpoint. |
89 InterfaceEndpointClient* client_; | 229 InterfaceEndpointClient* client_; |
90 | 230 |
| 231 // A message pipe used as an event to signal that sync messages are available. |
| 232 // The message pipe handles are initialized under the router's lock and remain |
| 233 // unchanged afterwards. They may be accessed outside of the router's lock |
| 234 // later. |
| 235 ScopedMessagePipeHandle sync_message_event_sender_; |
| 236 ScopedMessagePipeHandle sync_message_event_receiver_; |
| 237 bool event_signalled_; |
| 238 |
| 239 // --------------------------------------------------------------------------- |
| 240 // The following members are only valid while a client is attached. They are |
| 241 // used exclusively on the client's thread. They may be accessed outside of |
| 242 // the router's lock. |
| 243 |
| 244 scoped_ptr<SyncHandleWatcher> sync_watcher_; |
| 245 |
91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 246 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
92 }; | 247 }; |
93 | 248 |
94 struct MultiplexRouter::Task { | 249 struct MultiplexRouter::Task { |
95 public: | 250 public: |
96 // Doesn't take ownership of |message| but takes its contents. | 251 // Doesn't take ownership of |message| but takes its contents. |
97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { | 252 static scoped_ptr<Task> CreateMessageTask(Message* message) { |
98 Task* task = new Task(); | 253 Task* task = new Task(MESSAGE); |
99 task->message.reset(new Message); | 254 task->message.reset(new Message); |
100 message->MoveTo(task->message.get()); | 255 message->MoveTo(task->message.get()); |
101 return make_scoped_ptr(task); | 256 return make_scoped_ptr(task); |
102 } | 257 } |
103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | 258 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
104 Task* task = new Task(); | 259 Task* task = new Task(NOTIFY_ERROR); |
105 task->endpoint_to_notify = endpoint; | 260 task->endpoint_to_notify = endpoint; |
106 return make_scoped_ptr(task); | 261 return make_scoped_ptr(task); |
107 } | 262 } |
108 | 263 |
109 ~Task() {} | 264 ~Task() {} |
110 | 265 |
111 bool IsIncomingMessageTask() const { return !!message; } | 266 bool IsMessageTask() const { return type == MESSAGE; } |
112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | 267 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
113 | 268 |
114 scoped_ptr<Message> message; | 269 scoped_ptr<Message> message; |
115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 270 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
116 | 271 |
| 272 enum Type { MESSAGE, NOTIFY_ERROR }; |
| 273 Type type; |
| 274 |
117 private: | 275 private: |
118 Task() {} | 276 explicit Task(Type in_type) : type(in_type) {} |
119 }; | 277 }; |
120 | 278 |
121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 279 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
122 ScopedMessagePipeHandle message_pipe) | 280 ScopedMessagePipeHandle message_pipe) |
123 : RefCountedDeleteOnMessageLoop( | 281 : RefCountedDeleteOnMessageLoop( |
124 base::MessageLoop::current()->task_runner()), | 282 base::MessageLoop::current()->task_runner()), |
125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 283 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
126 header_validator_(this), | 284 header_validator_(this), |
127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), | 285 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
128 encountered_error_(false), | |
129 control_message_handler_(this), | 286 control_message_handler_(this), |
130 control_message_proxy_(&connector_), | 287 control_message_proxy_(&connector_), |
131 next_interface_id_value_(1), | 288 next_interface_id_value_(1), |
132 posted_to_process_tasks_(false), | 289 posted_to_process_tasks_(false), |
| 290 encountered_error_(false), |
133 testing_mode_(false) { | 291 testing_mode_(false) { |
| 292 // Always participate in sync handle watching, because even if it doesn't |
| 293 // expect sync requests during sync handle watching, it may still need to |
| 294 // dispatch messages to associated endpoints on a different thread. |
| 295 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
134 connector_.set_incoming_receiver(&header_validator_); | 296 connector_.set_incoming_receiver(&header_validator_); |
135 connector_.set_connection_error_handler( | 297 connector_.set_connection_error_handler( |
136 [this]() { OnPipeConnectionError(); }); | 298 [this]() { OnPipeConnectionError(); }); |
137 } | 299 } |
138 | 300 |
139 MultiplexRouter::~MultiplexRouter() { | 301 MultiplexRouter::~MultiplexRouter() { |
140 base::AutoLock locker(lock_); | 302 base::AutoLock locker(lock_); |
141 | 303 |
| 304 sync_message_tasks_.clear(); |
142 tasks_.clear(); | 305 tasks_.clear(); |
143 | 306 |
144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 307 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
145 InterfaceEndpoint* endpoint = iter->second.get(); | 308 InterfaceEndpoint* endpoint = iter->second.get(); |
146 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 309 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
147 // because it may remove the corresponding value from the map. | 310 // because it may remove the corresponding value from the map. |
148 ++iter; | 311 ++iter; |
149 | 312 |
150 DCHECK(endpoint->closed()); | 313 DCHECK(endpoint->closed()); |
151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 314 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
214 | 377 |
215 DCHECK(ContainsKey(endpoints_, id)); | 378 DCHECK(ContainsKey(endpoints_, id)); |
216 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 379 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
217 DCHECK(!endpoint->client()); | 380 DCHECK(!endpoint->client()); |
218 DCHECK(!endpoint->closed()); | 381 DCHECK(!endpoint->closed()); |
219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 382 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
220 | 383 |
221 if (!IsMasterInterfaceId(id)) | 384 if (!IsMasterInterfaceId(id)) |
222 control_message_proxy_.NotifyPeerEndpointClosed(id); | 385 control_message_proxy_.NotifyPeerEndpointClosed(id); |
223 | 386 |
224 ProcessTasks(true); | 387 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
225 } | 388 } |
226 | 389 |
227 void MultiplexRouter::AttachEndpointClient( | 390 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
228 const ScopedInterfaceEndpointHandle& handle, | 391 const ScopedInterfaceEndpointHandle& handle, |
229 InterfaceEndpointClient* client) { | 392 InterfaceEndpointClient* client) { |
230 const InterfaceId id = handle.id(); | 393 const InterfaceId id = handle.id(); |
231 | 394 |
232 DCHECK(IsValidInterfaceId(id)); | 395 DCHECK(IsValidInterfaceId(id)); |
233 DCHECK(client); | 396 DCHECK(client); |
234 | 397 |
235 base::AutoLock locker(lock_); | 398 base::AutoLock locker(lock_); |
236 DCHECK(ContainsKey(endpoints_, id)); | 399 DCHECK(ContainsKey(endpoints_, id)); |
237 | 400 |
238 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 401 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
239 DCHECK(!endpoint->client()); | 402 endpoint->AttachClient(client); |
240 DCHECK(!endpoint->closed()); | |
241 | |
242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | |
243 endpoint->set_client(client); | |
244 | 403 |
245 if (endpoint->peer_closed()) | 404 if (endpoint->peer_closed()) |
246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 405 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
247 ProcessTasks(true); | 406 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| 407 |
| 408 return endpoint; |
248 } | 409 } |
249 | 410 |
250 void MultiplexRouter::DetachEndpointClient( | 411 void MultiplexRouter::DetachEndpointClient( |
251 const ScopedInterfaceEndpointHandle& handle) { | 412 const ScopedInterfaceEndpointHandle& handle) { |
252 const InterfaceId id = handle.id(); | 413 const InterfaceId id = handle.id(); |
253 | 414 |
254 DCHECK(IsValidInterfaceId(id)); | 415 DCHECK(IsValidInterfaceId(id)); |
255 | 416 |
256 base::AutoLock locker(lock_); | 417 base::AutoLock locker(lock_); |
257 DCHECK(ContainsKey(endpoints_, id)); | 418 DCHECK(ContainsKey(endpoints_, id)); |
258 | 419 |
259 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 420 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
260 DCHECK(endpoint->client()); | 421 endpoint->DetachClient(); |
261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | |
262 DCHECK(!endpoint->closed()); | |
263 | |
264 endpoint->set_task_runner(nullptr); | |
265 endpoint->set_client(nullptr); | |
266 } | |
267 | |
268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
269 Message* message) { | |
270 message->set_interface_id(handle.id()); | |
271 return connector_.Accept(message); | |
272 } | 422 } |
273 | 423 |
274 void MultiplexRouter::RaiseError() { | 424 void MultiplexRouter::RaiseError() { |
275 if (task_runner_->BelongsToCurrentThread()) { | 425 if (task_runner_->BelongsToCurrentThread()) { |
276 connector_.RaiseError(); | 426 connector_.RaiseError(); |
277 } else { | 427 } else { |
278 task_runner_->PostTask(FROM_HERE, | 428 task_runner_->PostTask(FROM_HERE, |
279 base::Bind(&MultiplexRouter::RaiseError, this)); | 429 base::Bind(&MultiplexRouter::RaiseError, this)); |
280 } | 430 } |
281 } | 431 } |
282 | 432 |
283 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 433 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
284 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); | 434 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); |
285 group->router_ = this; | 435 group->router_ = this; |
286 return group; | 436 return group; |
287 } | 437 } |
288 | 438 |
289 // static | 439 // static |
290 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { | 440 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
291 return associated_group->router_.get(); | 441 return associated_group->router_.get(); |
292 } | 442 } |
293 | 443 |
| 444 void MultiplexRouter::CloseMessagePipe() { |
| 445 DCHECK(thread_checker_.CalledOnValidThread()); |
| 446 connector_.CloseMessagePipe(); |
| 447 // CloseMessagePipe() above won't trigger connection error handler. |
| 448 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| 449 // get notified. |
| 450 OnPipeConnectionError(); |
| 451 } |
| 452 |
294 bool MultiplexRouter::HasAssociatedEndpoints() const { | 453 bool MultiplexRouter::HasAssociatedEndpoints() const { |
295 DCHECK(thread_checker_.CalledOnValidThread()); | 454 DCHECK(thread_checker_.CalledOnValidThread()); |
296 base::AutoLock locker(lock_); | 455 base::AutoLock locker(lock_); |
297 | 456 |
298 if (endpoints_.size() > 1) | 457 if (endpoints_.size() > 1) |
299 return true; | 458 return true; |
300 if (endpoints_.size() == 0) | 459 if (endpoints_.size() == 0) |
301 return false; | 460 return false; |
302 | 461 |
303 return !ContainsKey(endpoints_, kMasterInterfaceId); | 462 return !ContainsKey(endpoints_, kMasterInterfaceId); |
304 } | 463 } |
305 | 464 |
306 void MultiplexRouter::EnableTestingMode() { | 465 void MultiplexRouter::EnableTestingMode() { |
307 DCHECK(thread_checker_.CalledOnValidThread()); | 466 DCHECK(thread_checker_.CalledOnValidThread()); |
308 base::AutoLock locker(lock_); | 467 base::AutoLock locker(lock_); |
309 | 468 |
310 testing_mode_ = true; | 469 testing_mode_ = true; |
311 connector_.set_enforce_errors_from_incoming_receiver(false); | 470 connector_.set_enforce_errors_from_incoming_receiver(false); |
312 } | 471 } |
313 | 472 |
314 bool MultiplexRouter::Accept(Message* message) { | 473 bool MultiplexRouter::Accept(Message* message) { |
315 DCHECK(thread_checker_.CalledOnValidThread()); | 474 DCHECK(thread_checker_.CalledOnValidThread()); |
316 | 475 |
317 scoped_refptr<MultiplexRouter> protector(this); | 476 scoped_refptr<MultiplexRouter> protector(this); |
318 base::AutoLock locker(lock_); | 477 base::AutoLock locker(lock_); |
319 | 478 |
320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); | 479 ClientCallBehavior client_call_behavior = |
| 480 connector_.during_sync_handle_watcher_callback() |
| 481 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 482 : ALLOW_DIRECT_CLIENT_CALLS; |
| 483 |
| 484 bool processed = |
| 485 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); |
321 | 486 |
322 if (!processed) { | 487 if (!processed) { |
323 // Either the task queue is not empty or we cannot process the message | 488 // Either the task queue is not empty or we cannot process the message |
324 // directly. In both cases, there is no need to call ProcessTasks(). | 489 // directly. In both cases, there is no need to call ProcessTasks(). |
325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | 490 tasks_.push_back(Task::CreateMessageTask(message)); |
| 491 Task* task = tasks_.back().get(); |
| 492 |
| 493 if (task->message->has_flag(kMessageIsSync)) { |
| 494 InterfaceId id = task->message->interface_id(); |
| 495 sync_message_tasks_[id].push_back(task); |
| 496 auto iter = endpoints_.find(id); |
| 497 if (iter != endpoints_.end()) |
| 498 iter->second->SignalSyncMessageEvent(); |
| 499 } |
326 } else if (!tasks_.empty()) { | 500 } else if (!tasks_.empty()) { |
327 // Processing the message may result in new tasks (for error notification) | 501 // Processing the message may result in new tasks (for error notification) |
328 // being added to the queue. In this case, we have to attempt to process the | 502 // being added to the queue. In this case, we have to attempt to process the |
329 // tasks. | 503 // tasks. |
330 ProcessTasks(false); | 504 ProcessTasks(client_call_behavior); |
331 } | 505 } |
332 | 506 |
333 // Always return true. If we see errors during message processing, we will | 507 // Always return true. If we see errors during message processing, we will |
334 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 508 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
335 return true; | 509 return true; |
336 } | 510 } |
337 | 511 |
338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 512 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
339 lock_.AssertAcquired(); | 513 lock_.AssertAcquired(); |
340 | 514 |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
381 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 555 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
382 // because it may remove the corresponding value from the map. | 556 // because it may remove the corresponding value from the map. |
383 ++iter; | 557 ++iter; |
384 | 558 |
385 if (endpoint->client()) | 559 if (endpoint->client()) |
386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 560 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
387 | 561 |
388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 562 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
389 } | 563 } |
390 | 564 |
391 ProcessTasks(false); | 565 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| 566 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 567 : ALLOW_DIRECT_CLIENT_CALLS); |
392 } | 568 } |
393 | 569 |
394 void MultiplexRouter::ProcessTasks(bool force_async) { | 570 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
395 lock_.AssertAcquired(); | 571 lock_.AssertAcquired(); |
396 | 572 |
397 if (posted_to_process_tasks_) | 573 if (posted_to_process_tasks_) |
398 return; | 574 return; |
399 | 575 |
400 while (!tasks_.empty()) { | 576 while (!tasks_.empty()) { |
401 scoped_ptr<Task> task(std::move(tasks_.front())); | 577 scoped_ptr<Task> task(std::move(tasks_.front())); |
402 tasks_.pop_front(); | 578 tasks_.pop_front(); |
403 | 579 |
| 580 InterfaceId id = kInvalidInterfaceId; |
| 581 bool sync_message = task->IsMessageTask() && task->message && |
| 582 task->message->has_flag(kMessageIsSync); |
| 583 if (sync_message) { |
| 584 InterfaceId id = task->message->interface_id(); |
| 585 auto& sync_message_queue = sync_message_tasks_[id]; |
| 586 DCHECK_EQ(task.get(), sync_message_queue.front()); |
| 587 sync_message_queue.pop_front(); |
| 588 } |
| 589 |
404 bool processed = | 590 bool processed = |
405 task->IsNotifyErrorTask() | 591 task->IsNotifyErrorTask() |
406 ? ProcessNotifyErrorTask(task.get(), force_async) | 592 ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
407 : ProcessIncomingMessage(task->message.get(), force_async); | 593 : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
408 | 594 |
409 if (!processed) { | 595 if (!processed) { |
410 tasks_.push_front(std::move(task)); | 596 tasks_.push_front(std::move(task)); |
| 597 if (sync_message) { |
| 598 auto& sync_message_queue = sync_message_tasks_[id]; |
| 599 sync_message_queue.push_front(task.get()); |
| 600 } |
411 break; | 601 break; |
| 602 } else { |
| 603 if (sync_message) { |
| 604 auto iter = sync_message_tasks_.find(id); |
| 605 if (iter != sync_message_tasks_.end() && iter->second.empty()) |
| 606 sync_message_tasks_.erase(iter); |
| 607 } |
412 } | 608 } |
413 } | 609 } |
414 } | 610 } |
415 | 611 |
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { | 612 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| 613 lock_.AssertAcquired(); |
| 614 |
| 615 auto iter = sync_message_tasks_.find(id); |
| 616 if (iter == sync_message_tasks_.end()) |
| 617 return false; |
| 618 |
| 619 MultiplexRouter::Task* task = iter->second.front(); |
| 620 iter->second.pop_front(); |
| 621 |
| 622 DCHECK(task->IsMessageTask()); |
| 623 scoped_ptr<Message> message(std::move(task->message)); |
| 624 |
| 625 // Note: after this call, |task| and |iter| may be invalidated. |
| 626 bool processed = ProcessIncomingMessage( |
| 627 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); |
| 628 DCHECK(processed); |
| 629 |
| 630 iter = sync_message_tasks_.find(id); |
| 631 return iter != sync_message_tasks_.end() && !iter->second.empty(); |
| 632 } |
| 633 |
| 634 bool MultiplexRouter::ProcessNotifyErrorTask( |
| 635 Task* task, |
| 636 ClientCallBehavior client_call_behavior) { |
417 lock_.AssertAcquired(); | 637 lock_.AssertAcquired(); |
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 638 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
419 if (!endpoint->client()) | 639 if (!endpoint->client()) |
420 return true; | 640 return true; |
421 | 641 |
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 642 if (!endpoint->task_runner()->BelongsToCurrentThread() || |
| 643 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { |
423 MaybePostToProcessTasks(endpoint->task_runner()); | 644 MaybePostToProcessTasks(endpoint->task_runner()); |
424 return false; | 645 return false; |
425 } | 646 } |
426 | 647 |
427 InterfaceEndpointClient* client = endpoint->client(); | 648 InterfaceEndpointClient* client = endpoint->client(); |
428 { | 649 { |
429 // We must unlock before calling into |client| because it may call this | 650 // We must unlock before calling into |client| because it may call this |
430 // object within NotifyError(). Holding the lock will lead to deadlock. | 651 // object within NotifyError(). Holding the lock will lead to deadlock. |
431 // | 652 // |
432 // It is safe to call into |client| without the lock. Because |client| is | 653 // It is safe to call into |client| without the lock. Because |client| is |
433 // always accessed on the same thread, including DetachEndpointClient(). | 654 // always accessed on the same thread, including DetachEndpointClient(). |
434 base::AutoUnlock unlocker(lock_); | 655 base::AutoUnlock unlocker(lock_); |
435 client->NotifyError(); | 656 client->NotifyError(); |
436 } | 657 } |
437 return true; | 658 return true; |
438 } | 659 } |
439 | 660 |
440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, | 661 bool MultiplexRouter::ProcessIncomingMessage( |
441 bool force_async) { | 662 Message* message, |
| 663 ClientCallBehavior client_call_behavior) { |
442 lock_.AssertAcquired(); | 664 lock_.AssertAcquired(); |
| 665 |
| 666 if (!message) { |
| 667 // This is a sync message and has been processed during sync handle |
| 668 // watching. |
| 669 return true; |
| 670 } |
| 671 |
443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 672 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
444 if (!control_message_handler_.Accept(message)) | 673 if (!control_message_handler_.Accept(message)) |
445 RaiseErrorInNonTestingMode(); | 674 RaiseErrorInNonTestingMode(); |
446 return true; | 675 return true; |
447 } | 676 } |
448 | 677 |
449 InterfaceId id = message->interface_id(); | 678 InterfaceId id = message->interface_id(); |
450 DCHECK(IsValidInterfaceId(id)); | 679 DCHECK(IsValidInterfaceId(id)); |
451 | 680 |
452 bool inserted = false; | 681 bool inserted = false; |
(...skipping 14 matching lines...) Expand all Loading... |
467 | 696 |
468 if (endpoint->closed()) | 697 if (endpoint->closed()) |
469 return true; | 698 return true; |
470 | 699 |
471 if (!endpoint->client()) { | 700 if (!endpoint->client()) { |
472 // We need to wait until a client is attached in order to dispatch further | 701 // We need to wait until a client is attached in order to dispatch further |
473 // messages. | 702 // messages. |
474 return false; | 703 return false; |
475 } | 704 } |
476 | 705 |
477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 706 bool can_direct_call = |
| 707 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || |
| 708 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && |
| 709 message->has_flag(kMessageIsSync)); |
| 710 |
| 711 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { |
478 MaybePostToProcessTasks(endpoint->task_runner()); | 712 MaybePostToProcessTasks(endpoint->task_runner()); |
479 return false; | 713 return false; |
480 } | 714 } |
481 | 715 |
482 InterfaceEndpointClient* client = endpoint->client(); | 716 InterfaceEndpointClient* client = endpoint->client(); |
483 bool result = false; | 717 bool result = false; |
484 { | 718 { |
485 // We must unlock before calling into |client| because it may call this | 719 // We must unlock before calling into |client| because it may call this |
486 // object within HandleIncomingMessage(). Holding the lock will lead to | 720 // object within HandleIncomingMessage(). Holding the lock will lead to |
487 // deadlock. | 721 // deadlock. |
(...skipping 18 matching lines...) Expand all Loading... |
506 posted_to_process_tasks_ = true; | 740 posted_to_process_tasks_ = true; |
507 task_runner->PostTask( | 741 task_runner->PostTask( |
508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 742 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
509 } | 743 } |
510 | 744 |
511 void MultiplexRouter::LockAndCallProcessTasks() { | 745 void MultiplexRouter::LockAndCallProcessTasks() { |
512 // There is no need to hold a ref to this class in this case because this is | 746 // There is no need to hold a ref to this class in this case because this is |
513 // always called using base::Bind(), which holds a ref. | 747 // always called using base::Bind(), which holds a ref. |
514 base::AutoLock locker(lock_); | 748 base::AutoLock locker(lock_); |
515 posted_to_process_tasks_ = false; | 749 posted_to_process_tasks_ = false; |
516 ProcessTasks(false); | 750 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
517 } | 751 } |
518 | 752 |
519 void MultiplexRouter::UpdateEndpointStateMayRemove( | 753 void MultiplexRouter::UpdateEndpointStateMayRemove( |
520 InterfaceEndpoint* endpoint, | 754 InterfaceEndpoint* endpoint, |
521 EndpointStateUpdateType type) { | 755 EndpointStateUpdateType type) { |
522 switch (type) { | 756 switch (type) { |
523 case ENDPOINT_CLOSED: | 757 case ENDPOINT_CLOSED: |
524 endpoint->set_closed(); | 758 endpoint->set_closed(); |
525 break; | 759 break; |
526 case PEER_ENDPOINT_CLOSED: | 760 case PEER_ENDPOINT_CLOSED: |
527 endpoint->set_peer_closed(); | 761 endpoint->set_peer_closed(); |
| 762 // If the interface endpoint is performing a sync watch, this makes sure |
| 763 // it is notified and eventually exits the sync watch. |
| 764 endpoint->SignalSyncMessageEvent(); |
528 break; | 765 break; |
529 } | 766 } |
530 if (endpoint->closed() && endpoint->peer_closed()) | 767 if (endpoint->closed() && endpoint->peer_closed()) |
531 endpoints_.erase(endpoint->id()); | 768 endpoints_.erase(endpoint->id()); |
532 } | 769 } |
533 | 770 |
534 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 771 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
535 lock_.AssertAcquired(); | 772 lock_.AssertAcquired(); |
536 if (!testing_mode_) | 773 if (!testing_mode_) |
537 RaiseError(); | 774 RaiseError(); |
(...skipping 16 matching lines...) Expand all Loading... |
554 *inserted = true; | 791 *inserted = true; |
555 } else { | 792 } else { |
556 endpoint = iter->second.get(); | 793 endpoint = iter->second.get(); |
557 } | 794 } |
558 | 795 |
559 return endpoint; | 796 return endpoint; |
560 } | 797 } |
561 | 798 |
562 } // namespace internal | 799 } // namespace internal |
563 } // namespace mojo | 800 } // namespace mojo |
OLD | NEW |