OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/macros.h" | 12 #include "base/macros.h" |
13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
16 #include "mojo/public/cpp/bindings/associated_group.h" | 16 #include "mojo/public/cpp/bindings/associated_group.h" |
17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" | |
19 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" | |
18 | 20 |
19 namespace mojo { | 21 namespace mojo { |
20 namespace internal { | 22 namespace internal { |
21 | 23 |
22 // InterfaceEndpoint stores the information of an interface endpoint registered | 24 // InterfaceEndpoint stores the information of an interface endpoint registered |
23 // with the router. Always accessed under the router's lock. | 25 // with the router. |
24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 26 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
25 // this object. | 27 // this object. |
26 class MultiplexRouter::InterfaceEndpoint | 28 class MultiplexRouter::InterfaceEndpoint |
27 : public base::RefCounted<InterfaceEndpoint> { | 29 : public base::RefCounted<InterfaceEndpoint>, |
30 public InterfaceEndpointController { | |
28 public: | 31 public: |
29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
30 : router_lock_(&router->lock_), | 33 : router_(router), |
31 id_(id), | 34 id_(id), |
32 closed_(false), | 35 closed_(false), |
33 peer_closed_(false), | 36 peer_closed_(false), |
34 client_(nullptr) { | 37 client_(nullptr), |
35 router_lock_->AssertAcquired(); | 38 event_signalled_(false) {} |
36 } | 39 |
40 // --------------------------------------------------------------------------- | |
41 // The following public methods are safe to call from any threads without | |
42 // locking. | |
37 | 43 |
38 InterfaceId id() const { return id_; } | 44 InterfaceId id() const { return id_; } |
39 | 45 |
46 // --------------------------------------------------------------------------- | |
47 // The following public methods are called under the router's lock. | |
48 | |
40 bool closed() const { return closed_; } | 49 bool closed() const { return closed_; } |
41 void set_closed() { | 50 void set_closed() { |
42 router_lock_->AssertAcquired(); | 51 router_->lock_.AssertAcquired(); |
43 closed_ = true; | 52 closed_ = true; |
44 } | 53 } |
45 | 54 |
46 bool peer_closed() const { return peer_closed_; } | 55 bool peer_closed() const { return peer_closed_; } |
47 void set_peer_closed() { | 56 void set_peer_closed() { |
48 router_lock_->AssertAcquired(); | 57 router_->lock_.AssertAcquired(); |
49 peer_closed_ = true; | 58 peer_closed_ = true; |
50 } | 59 } |
51 | 60 |
52 base::SingleThreadTaskRunner* task_runner() const { | 61 base::SingleThreadTaskRunner* task_runner() const { |
53 return task_runner_.get(); | 62 return task_runner_.get(); |
54 } | 63 } |
55 void set_task_runner( | |
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | |
57 router_lock_->AssertAcquired(); | |
58 task_runner_ = std::move(task_runner); | |
59 } | |
60 | 64 |
61 InterfaceEndpointClient* client() const { return client_; } | 65 InterfaceEndpointClient* client() const { return client_; } |
62 void set_client(InterfaceEndpointClient* client) { | 66 |
63 router_lock_->AssertAcquired(); | 67 void AttachClient(InterfaceEndpointClient* client) { |
68 router_->lock_.AssertAcquired(); | |
69 DCHECK(!client_); | |
70 DCHECK(!closed_); | |
71 | |
72 task_runner_ = base::MessageLoop::current()->task_runner(); | |
64 client_ = client; | 73 client_ = client; |
65 } | 74 } |
66 | 75 |
76 // It should be called on the same thread as the corresponding AttachClient() | |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: should = must? Also perhaps "This" or "This m
yzshen1
2016/03/29 16:19:01
Done. English is hard. :)
| |
77 // call. | |
78 void DetachClient() { | |
79 router_->lock_.AssertAcquired(); | |
80 DCHECK(client_); | |
81 DCHECK(task_runner_->BelongsToCurrentThread()); | |
82 DCHECK(!closed_); | |
83 | |
84 task_runner_ = nullptr; | |
85 client_ = nullptr; | |
86 sync_watcher_.reset(); | |
87 } | |
88 | |
89 void SignalSyncMessageEvent() { | |
90 router_->lock_.AssertAcquired(); | |
91 if (event_signalled_) | |
92 return; | |
93 | |
94 EnsureEventMessagePipeExists(); | |
95 event_signalled_ = true; | |
96 char dummy_message = '\0'; | |
97 MojoResult result = | |
98 WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1, | |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It is perfectly legal to write a zero-length
yzshen1
2016/03/29 16:19:01
Done.
| |
99 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | |
100 DCHECK_EQ(MOJO_RESULT_OK, result); | |
101 } | |
102 | |
103 // --------------------------------------------------------------------------- | |
104 // The following public methods (i.e., InterfaceEndpointController | |
105 // implementation) are called by the client on the same thread as the | |
106 // AttachClient() call. They are called outside of the router's lock. | |
107 | |
108 bool SendMessage(Message* message) override { | |
109 DCHECK(task_runner_->BelongsToCurrentThread()); | |
110 message->set_interface_id(id_); | |
111 return router_->connector_.Accept(message); | |
112 } | |
113 | |
114 void AllowWokenUpBySyncWatchOnSameThread() override { | |
115 DCHECK(task_runner_->BelongsToCurrentThread()); | |
116 | |
117 EnsureSyncWatcherExists(); | |
118 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | |
119 } | |
120 | |
121 bool SyncWatch(const bool* should_stop) override { | |
122 DCHECK(task_runner_->BelongsToCurrentThread()); | |
123 | |
124 EnsureSyncWatcherExists(); | |
125 return sync_watcher_->SyncWatch(should_stop); | |
126 } | |
127 | |
67 private: | 128 private: |
68 friend class base::RefCounted<InterfaceEndpoint>; | 129 friend class base::RefCounted<InterfaceEndpoint>; |
69 | 130 |
70 ~InterfaceEndpoint() { | 131 ~InterfaceEndpoint() override { |
71 router_lock_->AssertAcquired(); | 132 router_->lock_.AssertAcquired(); |
72 | 133 |
73 DCHECK(!client_); | 134 DCHECK(!client_); |
74 DCHECK(closed_); | 135 DCHECK(closed_); |
75 DCHECK(peer_closed_); | 136 DCHECK(peer_closed_); |
76 } | 137 DCHECK(!sync_watcher_); |
77 | 138 } |
78 base::Lock* const router_lock_; | 139 |
140 void OnHandleReady(MojoResult result) { | |
141 DCHECK(task_runner_->BelongsToCurrentThread()); | |
142 scoped_refptr<InterfaceEndpoint> self_protector(this); | |
143 scoped_refptr<MultiplexRouter> router_protector(router_); | |
144 | |
145 // Because we never close |sync_message_event_{sender,receiver}_| before | |
146 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | |
147 DCHECK_EQ(MOJO_RESULT_OK, result); | |
148 bool reset_sync_watcher = false; | |
149 { | |
150 base::AutoLock locker(router_->lock_); | |
151 | |
152 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | |
153 | |
154 if (!more_to_process) | |
155 ResetSyncMessageSignal(); | |
156 | |
157 // Currently there are no queued sync messages and the peer has closed so | |
158 // there won't be incoming sync messages in the future. | |
159 reset_sync_watcher = !more_to_process && peer_closed_; | |
160 } | |
161 if (reset_sync_watcher) { | |
162 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | |
163 // on the call stack, resetting the sync watcher will allow it to exit | |
164 // when the call stack unwinds to that frame. | |
165 sync_watcher_.reset(); | |
166 } | |
167 } | |
168 | |
169 void EnsureSyncWatcherExists() { | |
170 DCHECK(task_runner_->BelongsToCurrentThread()); | |
171 if (sync_watcher_) | |
172 return; | |
173 | |
174 { | |
175 base::AutoLock locker(router_->lock_); | |
176 EnsureEventMessagePipeExists(); | |
177 | |
178 auto iter = router_->sync_message_tasks_.find(id_); | |
179 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | |
180 SignalSyncMessageEvent(); | |
181 } | |
182 | |
183 sync_watcher_.reset(new SyncHandleWatcher( | |
184 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
185 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); | |
186 } | |
187 | |
188 void EnsureEventMessagePipeExists() { | |
189 router_->lock_.AssertAcquired(); | |
190 | |
191 if (sync_message_event_receiver_.is_valid()) | |
192 return; | |
193 | |
194 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | |
195 &sync_message_event_receiver_); | |
196 DCHECK_EQ(MOJO_RESULT_OK, result); | |
197 } | |
198 | |
199 void ResetSyncMessageSignal() { | |
200 router_->lock_.AssertAcquired(); | |
201 | |
202 if (!event_signalled_) | |
203 return; | |
204 | |
205 DCHECK(sync_message_event_receiver_.is_valid()); | |
206 char dummy_message = 0; | |
207 uint32_t size = 1; | |
208 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), | |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It's fine to pass nullptr for the read buffer
yzshen1
2016/03/29 16:19:01
Done.
| |
209 &dummy_message, &size, nullptr, nullptr, | |
210 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
211 DCHECK_EQ(MOJO_RESULT_OK, result); | |
212 event_signalled_ = false; | |
213 } | |
214 | |
215 // --------------------------------------------------------------------------- | |
216 // The following members are safe to access from any threads. | |
217 | |
218 MultiplexRouter* const router_; | |
79 const InterfaceId id_; | 219 const InterfaceId id_; |
80 | 220 |
221 // --------------------------------------------------------------------------- | |
222 // The following members are accessed under the router's lock. | |
223 | |
81 // Whether the endpoint has been closed. | 224 // Whether the endpoint has been closed. |
82 bool closed_; | 225 bool closed_; |
83 // Whether the peer endpoint has been closed. | 226 // Whether the peer endpoint has been closed. |
84 bool peer_closed_; | 227 bool peer_closed_; |
85 | 228 |
86 // The task runner on which |client_| can be accessed. | 229 // The task runner on which |client_|'s methods can be called. |
87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 230 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
88 // Not owned. It is null if no client is attached to this endpoint. | 231 // Not owned. It is null if no client is attached to this endpoint. |
89 InterfaceEndpointClient* client_; | 232 InterfaceEndpointClient* client_; |
90 | 233 |
234 // A message pipe used as an event to signal that sync messages are available. | |
235 // The message pipe handles are initialized under the router's lock and remain | |
236 // unchanged afterwards. They may be accessed outside of the router's lock | |
237 // later. | |
238 ScopedMessagePipeHandle sync_message_event_sender_; | |
239 ScopedMessagePipeHandle sync_message_event_receiver_; | |
240 bool event_signalled_; | |
241 | |
242 // --------------------------------------------------------------------------- | |
243 // The following members are only valid while a client is attached. They are | |
244 // used exclusively on the client's thread. They may be accessed outside of | |
245 // the router's lock. | |
246 | |
247 scoped_ptr<SyncHandleWatcher> sync_watcher_; | |
248 | |
91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 249 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
92 }; | 250 }; |
93 | 251 |
94 struct MultiplexRouter::Task { | 252 struct MultiplexRouter::Task { |
95 public: | 253 public: |
96 // Doesn't take ownership of |message| but takes its contents. | 254 // Doesn't take ownership of |message| but takes its contents. |
97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { | 255 static scoped_ptr<Task> CreateMessageTask(Message* message) { |
98 Task* task = new Task(); | 256 Task* task = new Task(MESSAGE); |
99 task->message.reset(new Message); | 257 task->message.reset(new Message); |
100 message->MoveTo(task->message.get()); | 258 message->MoveTo(task->message.get()); |
101 return make_scoped_ptr(task); | 259 return make_scoped_ptr(task); |
102 } | 260 } |
103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | 261 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
104 Task* task = new Task(); | 262 Task* task = new Task(NOTIFY_ERROR); |
105 task->endpoint_to_notify = endpoint; | 263 task->endpoint_to_notify = endpoint; |
106 return make_scoped_ptr(task); | 264 return make_scoped_ptr(task); |
107 } | 265 } |
108 | 266 |
109 ~Task() {} | 267 ~Task() {} |
110 | 268 |
111 bool IsIncomingMessageTask() const { return !!message; } | 269 bool IsMessageTask() const { return type == MESSAGE; } |
112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | 270 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
113 | 271 |
114 scoped_ptr<Message> message; | 272 scoped_ptr<Message> message; |
115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 273 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
116 | 274 |
275 enum Type { MESSAGE, NOTIFY_ERROR }; | |
276 Type type; | |
277 | |
117 private: | 278 private: |
118 Task() {} | 279 explicit Task(Type in_type) : type(in_type) {} |
119 }; | 280 }; |
120 | 281 |
121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 282 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
122 ScopedMessagePipeHandle message_pipe) | 283 ScopedMessagePipeHandle message_pipe) |
123 : RefCountedDeleteOnMessageLoop( | 284 : RefCountedDeleteOnMessageLoop( |
124 base::MessageLoop::current()->task_runner()), | 285 base::MessageLoop::current()->task_runner()), |
125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 286 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
126 header_validator_(this), | 287 header_validator_(this), |
127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), | 288 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
128 encountered_error_(false), | |
129 control_message_handler_(this), | 289 control_message_handler_(this), |
130 control_message_proxy_(&connector_), | 290 control_message_proxy_(&connector_), |
131 next_interface_id_value_(1), | 291 next_interface_id_value_(1), |
132 posted_to_process_tasks_(false), | 292 posted_to_process_tasks_(false), |
293 encountered_error_(false), | |
133 testing_mode_(false) { | 294 testing_mode_(false) { |
295 // Always participate in sync handle watching, because even if it doesn't | |
296 // expect sync requests during sync handle watching, it may still need to | |
297 // dispatch messages to associated endpoints on a different thread. | |
298 connector_.AllowWokenUpBySyncWatchOnSameThread(); | |
134 connector_.set_incoming_receiver(&header_validator_); | 299 connector_.set_incoming_receiver(&header_validator_); |
135 connector_.set_connection_error_handler( | 300 connector_.set_connection_error_handler( |
136 [this]() { OnPipeConnectionError(); }); | 301 [this]() { OnPipeConnectionError(); }); |
137 } | 302 } |
138 | 303 |
139 MultiplexRouter::~MultiplexRouter() { | 304 MultiplexRouter::~MultiplexRouter() { |
140 base::AutoLock locker(lock_); | 305 base::AutoLock locker(lock_); |
141 | 306 |
307 sync_message_tasks_.clear(); | |
142 tasks_.clear(); | 308 tasks_.clear(); |
143 | 309 |
144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 310 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
145 InterfaceEndpoint* endpoint = iter->second.get(); | 311 InterfaceEndpoint* endpoint = iter->second.get(); |
146 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 312 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
147 // because it may remove the corresponding value from the map. | 313 // because it may remove the corresponding value from the map. |
148 ++iter; | 314 ++iter; |
149 | 315 |
150 DCHECK(endpoint->closed()); | 316 DCHECK(endpoint->closed()); |
151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 317 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
214 | 380 |
215 DCHECK(ContainsKey(endpoints_, id)); | 381 DCHECK(ContainsKey(endpoints_, id)); |
216 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 382 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
217 DCHECK(!endpoint->client()); | 383 DCHECK(!endpoint->client()); |
218 DCHECK(!endpoint->closed()); | 384 DCHECK(!endpoint->closed()); |
219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 385 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
220 | 386 |
221 if (!IsMasterInterfaceId(id)) | 387 if (!IsMasterInterfaceId(id)) |
222 control_message_proxy_.NotifyPeerEndpointClosed(id); | 388 control_message_proxy_.NotifyPeerEndpointClosed(id); |
223 | 389 |
224 ProcessTasks(true); | 390 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
225 } | 391 } |
226 | 392 |
227 void MultiplexRouter::AttachEndpointClient( | 393 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
228 const ScopedInterfaceEndpointHandle& handle, | 394 const ScopedInterfaceEndpointHandle& handle, |
229 InterfaceEndpointClient* client) { | 395 InterfaceEndpointClient* client) { |
230 const InterfaceId id = handle.id(); | 396 const InterfaceId id = handle.id(); |
231 | 397 |
232 DCHECK(IsValidInterfaceId(id)); | 398 DCHECK(IsValidInterfaceId(id)); |
233 DCHECK(client); | 399 DCHECK(client); |
234 | 400 |
235 base::AutoLock locker(lock_); | 401 base::AutoLock locker(lock_); |
236 DCHECK(ContainsKey(endpoints_, id)); | 402 DCHECK(ContainsKey(endpoints_, id)); |
237 | 403 |
238 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 404 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
239 DCHECK(!endpoint->client()); | 405 endpoint->AttachClient(client); |
240 DCHECK(!endpoint->closed()); | |
241 | |
242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | |
243 endpoint->set_client(client); | |
244 | 406 |
245 if (endpoint->peer_closed()) | 407 if (endpoint->peer_closed()) |
246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 408 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
247 ProcessTasks(true); | 409 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
410 | |
411 return endpoint; | |
248 } | 412 } |
249 | 413 |
250 void MultiplexRouter::DetachEndpointClient( | 414 void MultiplexRouter::DetachEndpointClient( |
251 const ScopedInterfaceEndpointHandle& handle) { | 415 const ScopedInterfaceEndpointHandle& handle) { |
252 const InterfaceId id = handle.id(); | 416 const InterfaceId id = handle.id(); |
253 | 417 |
254 DCHECK(IsValidInterfaceId(id)); | 418 DCHECK(IsValidInterfaceId(id)); |
255 | 419 |
256 base::AutoLock locker(lock_); | 420 base::AutoLock locker(lock_); |
257 DCHECK(ContainsKey(endpoints_, id)); | 421 DCHECK(ContainsKey(endpoints_, id)); |
258 | 422 |
259 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 423 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
260 DCHECK(endpoint->client()); | 424 endpoint->DetachClient(); |
261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | |
262 DCHECK(!endpoint->closed()); | |
263 | |
264 endpoint->set_task_runner(nullptr); | |
265 endpoint->set_client(nullptr); | |
266 } | |
267 | |
268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
269 Message* message) { | |
270 message->set_interface_id(handle.id()); | |
271 return connector_.Accept(message); | |
272 } | 425 } |
273 | 426 |
274 void MultiplexRouter::RaiseError() { | 427 void MultiplexRouter::RaiseError() { |
275 if (task_runner_->BelongsToCurrentThread()) { | 428 if (task_runner_->BelongsToCurrentThread()) { |
276 connector_.RaiseError(); | 429 connector_.RaiseError(); |
277 } else { | 430 } else { |
278 task_runner_->PostTask(FROM_HERE, | 431 task_runner_->PostTask(FROM_HERE, |
279 base::Bind(&MultiplexRouter::RaiseError, this)); | 432 base::Bind(&MultiplexRouter::RaiseError, this)); |
280 } | 433 } |
281 } | 434 } |
282 | 435 |
283 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 436 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
284 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); | 437 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); |
285 group->router_ = this; | 438 group->router_ = this; |
286 return group; | 439 return group; |
287 } | 440 } |
288 | 441 |
289 // static | 442 // static |
290 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { | 443 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
291 return associated_group->router_.get(); | 444 return associated_group->router_.get(); |
292 } | 445 } |
293 | 446 |
447 void MultiplexRouter::CloseMessagePipe() { | |
448 DCHECK(thread_checker_.CalledOnValidThread()); | |
449 connector_.CloseMessagePipe(); | |
450 // CloseMessagePipe() above won't trigger connection error handler. | |
451 // Explicitly call OnPipeConnectionError() so that associated endpoints will | |
452 // get notified. | |
453 OnPipeConnectionError(); | |
454 } | |
455 | |
294 bool MultiplexRouter::HasAssociatedEndpoints() const { | 456 bool MultiplexRouter::HasAssociatedEndpoints() const { |
295 DCHECK(thread_checker_.CalledOnValidThread()); | 457 DCHECK(thread_checker_.CalledOnValidThread()); |
296 base::AutoLock locker(lock_); | 458 base::AutoLock locker(lock_); |
297 | 459 |
298 if (endpoints_.size() > 1) | 460 if (endpoints_.size() > 1) |
299 return true; | 461 return true; |
300 if (endpoints_.size() == 0) | 462 if (endpoints_.size() == 0) |
301 return false; | 463 return false; |
302 | 464 |
303 return !ContainsKey(endpoints_, kMasterInterfaceId); | 465 return !ContainsKey(endpoints_, kMasterInterfaceId); |
304 } | 466 } |
305 | 467 |
306 void MultiplexRouter::EnableTestingMode() { | 468 void MultiplexRouter::EnableTestingMode() { |
307 DCHECK(thread_checker_.CalledOnValidThread()); | 469 DCHECK(thread_checker_.CalledOnValidThread()); |
308 base::AutoLock locker(lock_); | 470 base::AutoLock locker(lock_); |
309 | 471 |
310 testing_mode_ = true; | 472 testing_mode_ = true; |
311 connector_.set_enforce_errors_from_incoming_receiver(false); | 473 connector_.set_enforce_errors_from_incoming_receiver(false); |
312 } | 474 } |
313 | 475 |
314 bool MultiplexRouter::Accept(Message* message) { | 476 bool MultiplexRouter::Accept(Message* message) { |
315 DCHECK(thread_checker_.CalledOnValidThread()); | 477 DCHECK(thread_checker_.CalledOnValidThread()); |
316 | 478 |
317 scoped_refptr<MultiplexRouter> protector(this); | 479 scoped_refptr<MultiplexRouter> protector(this); |
318 base::AutoLock locker(lock_); | 480 base::AutoLock locker(lock_); |
319 | 481 |
320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); | 482 ClientCallBehavior client_call_behavior = |
483 connector_.during_sync_handle_watcher_callback() | |
484 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | |
485 : ALLOW_DIRECT_CLIENT_CALLS; | |
486 | |
487 bool processed = | |
488 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); | |
321 | 489 |
322 if (!processed) { | 490 if (!processed) { |
323 // Either the task queue is not empty or we cannot process the message | 491 // Either the task queue is not empty or we cannot process the message |
324 // directly. In both cases, there is no need to call ProcessTasks(). | 492 // directly. In both cases, there is no need to call ProcessTasks(). |
325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | 493 tasks_.push_back(Task::CreateMessageTask(message)); |
494 Task* task = tasks_.back().get(); | |
495 | |
496 if (task->message->has_flag(kMessageIsSync)) { | |
497 InterfaceId id = task->message->interface_id(); | |
498 sync_message_tasks_[id].push_back(task); | |
499 auto iter = endpoints_.find(id); | |
500 if (iter != endpoints_.end()) | |
501 iter->second->SignalSyncMessageEvent(); | |
502 } | |
326 } else if (!tasks_.empty()) { | 503 } else if (!tasks_.empty()) { |
327 // Processing the message may result in new tasks (for error notification) | 504 // Processing the message may result in new tasks (for error notification) |
328 // being added to the queue. In this case, we have to attempt to process the | 505 // being added to the queue. In this case, we have to attempt to process the |
329 // tasks. | 506 // tasks. |
330 ProcessTasks(false); | 507 ProcessTasks(client_call_behavior); |
331 } | 508 } |
332 | 509 |
333 // Always return true. If we see errors during message processing, we will | 510 // Always return true. If we see errors during message processing, we will |
334 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 511 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
335 return true; | 512 return true; |
336 } | 513 } |
337 | 514 |
338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 515 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
339 lock_.AssertAcquired(); | 516 lock_.AssertAcquired(); |
340 | 517 |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
381 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 558 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
382 // because it may remove the corresponding value from the map. | 559 // because it may remove the corresponding value from the map. |
383 ++iter; | 560 ++iter; |
384 | 561 |
385 if (endpoint->client()) | 562 if (endpoint->client()) |
386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 563 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
387 | 564 |
388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 565 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
389 } | 566 } |
390 | 567 |
391 ProcessTasks(false); | 568 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
569 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | |
570 : ALLOW_DIRECT_CLIENT_CALLS); | |
392 } | 571 } |
393 | 572 |
394 void MultiplexRouter::ProcessTasks(bool force_async) { | 573 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
395 lock_.AssertAcquired(); | 574 lock_.AssertAcquired(); |
396 | 575 |
397 if (posted_to_process_tasks_) | 576 if (posted_to_process_tasks_) |
398 return; | 577 return; |
399 | 578 |
400 while (!tasks_.empty()) { | 579 while (!tasks_.empty()) { |
401 scoped_ptr<Task> task(std::move(tasks_.front())); | 580 scoped_ptr<Task> task(std::move(tasks_.front())); |
402 tasks_.pop_front(); | 581 tasks_.pop_front(); |
403 | 582 |
583 InterfaceId id = kInvalidInterfaceId; | |
584 bool sync_message = task->IsMessageTask() && task->message && | |
585 task->message->has_flag(kMessageIsSync); | |
586 if (sync_message) { | |
587 InterfaceId id = task->message->interface_id(); | |
588 auto& sync_message_queue = sync_message_tasks_[id]; | |
589 DCHECK_EQ(task.get(), sync_message_queue.front()); | |
590 sync_message_queue.pop_front(); | |
591 } | |
592 | |
404 bool processed = | 593 bool processed = |
405 task->IsNotifyErrorTask() | 594 task->IsNotifyErrorTask() |
406 ? ProcessNotifyErrorTask(task.get(), force_async) | 595 ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
407 : ProcessIncomingMessage(task->message.get(), force_async); | 596 : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
408 | 597 |
409 if (!processed) { | 598 if (!processed) { |
410 tasks_.push_front(std::move(task)); | 599 tasks_.push_front(std::move(task)); |
600 if (sync_message) { | |
601 auto& sync_message_queue = sync_message_tasks_[id]; | |
602 sync_message_queue.push_front(task.get()); | |
603 } | |
411 break; | 604 break; |
605 } else { | |
606 if (sync_message) { | |
607 auto iter = sync_message_tasks_.find(id); | |
608 if (iter != sync_message_tasks_.end() && iter->second.empty()) | |
609 sync_message_tasks_.erase(iter); | |
610 } | |
412 } | 611 } |
413 } | 612 } |
414 } | 613 } |
415 | 614 |
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { | 615 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
616 lock_.AssertAcquired(); | |
617 | |
618 auto iter = sync_message_tasks_.find(id); | |
619 if (iter == sync_message_tasks_.end()) | |
620 return false; | |
621 | |
622 MultiplexRouter::Task* task = iter->second.front(); | |
623 iter->second.pop_front(); | |
624 | |
625 DCHECK(task->IsMessageTask()); | |
626 scoped_ptr<Message> message(std::move(task->message)); | |
627 | |
628 // Note: after this call, |task| and |iter| may be invalidated. | |
629 bool processed = ProcessIncomingMessage( | |
630 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); | |
631 DCHECK(processed); | |
632 | |
633 iter = sync_message_tasks_.find(id); | |
634 return iter != sync_message_tasks_.end() && !iter->second.empty(); | |
635 } | |
636 | |
637 bool MultiplexRouter::ProcessNotifyErrorTask( | |
638 Task* task, | |
639 ClientCallBehavior client_call_behavior) { | |
417 lock_.AssertAcquired(); | 640 lock_.AssertAcquired(); |
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 641 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
419 if (!endpoint->client()) | 642 if (!endpoint->client()) |
420 return true; | 643 return true; |
421 | 644 |
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 645 if (!endpoint->task_runner()->BelongsToCurrentThread() || |
646 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { | |
423 MaybePostToProcessTasks(endpoint->task_runner()); | 647 MaybePostToProcessTasks(endpoint->task_runner()); |
424 return false; | 648 return false; |
425 } | 649 } |
426 | 650 |
427 InterfaceEndpointClient* client = endpoint->client(); | 651 InterfaceEndpointClient* client = endpoint->client(); |
428 { | 652 { |
429 // We must unlock before calling into |client| because it may call this | 653 // We must unlock before calling into |client| because it may call this |
430 // object within NotifyError(). Holding the lock will lead to deadlock. | 654 // object within NotifyError(). Holding the lock will lead to deadlock. |
431 // | 655 // |
432 // It is safe to call into |client| without the lock. Because |client| is | 656 // It is safe to call into |client| without the lock. Because |client| is |
433 // always accessed on the same thread, including DetachEndpointClient(). | 657 // always accessed on the same thread, including DetachEndpointClient(). |
434 base::AutoUnlock unlocker(lock_); | 658 base::AutoUnlock unlocker(lock_); |
435 client->NotifyError(); | 659 client->NotifyError(); |
436 } | 660 } |
437 return true; | 661 return true; |
438 } | 662 } |
439 | 663 |
440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, | 664 bool MultiplexRouter::ProcessIncomingMessage( |
441 bool force_async) { | 665 Message* message, |
666 ClientCallBehavior client_call_behavior) { | |
442 lock_.AssertAcquired(); | 667 lock_.AssertAcquired(); |
668 | |
669 if (!message) { | |
670 // This is a sync message and has been processed during sync handle | |
671 // watching. | |
672 return true; | |
673 } | |
674 | |
443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 675 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
444 if (!control_message_handler_.Accept(message)) | 676 if (!control_message_handler_.Accept(message)) |
445 RaiseErrorInNonTestingMode(); | 677 RaiseErrorInNonTestingMode(); |
446 return true; | 678 return true; |
447 } | 679 } |
448 | 680 |
449 InterfaceId id = message->interface_id(); | 681 InterfaceId id = message->interface_id(); |
450 DCHECK(IsValidInterfaceId(id)); | 682 DCHECK(IsValidInterfaceId(id)); |
451 | 683 |
452 bool inserted = false; | 684 bool inserted = false; |
(...skipping 14 matching lines...) Expand all Loading... | |
467 | 699 |
468 if (endpoint->closed()) | 700 if (endpoint->closed()) |
469 return true; | 701 return true; |
470 | 702 |
471 if (!endpoint->client()) { | 703 if (!endpoint->client()) { |
472 // We need to wait until a client is attached in order to dispatch further | 704 // We need to wait until a client is attached in order to dispatch further |
473 // messages. | 705 // messages. |
474 return false; | 706 return false; |
475 } | 707 } |
476 | 708 |
477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 709 bool can_direct_call = |
710 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || | |
711 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && | |
712 message->has_flag(kMessageIsSync)); | |
713 | |
714 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { | |
478 MaybePostToProcessTasks(endpoint->task_runner()); | 715 MaybePostToProcessTasks(endpoint->task_runner()); |
479 return false; | 716 return false; |
480 } | 717 } |
481 | 718 |
482 InterfaceEndpointClient* client = endpoint->client(); | 719 InterfaceEndpointClient* client = endpoint->client(); |
483 bool result = false; | 720 bool result = false; |
484 { | 721 { |
485 // We must unlock before calling into |client| because it may call this | 722 // We must unlock before calling into |client| because it may call this |
486 // object within HandleIncomingMessage(). Holding the lock will lead to | 723 // object within HandleIncomingMessage(). Holding the lock will lead to |
487 // deadlock. | 724 // deadlock. |
(...skipping 18 matching lines...) Expand all Loading... | |
506 posted_to_process_tasks_ = true; | 743 posted_to_process_tasks_ = true; |
507 task_runner->PostTask( | 744 task_runner->PostTask( |
508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 745 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
509 } | 746 } |
510 | 747 |
511 void MultiplexRouter::LockAndCallProcessTasks() { | 748 void MultiplexRouter::LockAndCallProcessTasks() { |
512 // There is no need to hold a ref to this class in this case because this is | 749 // There is no need to hold a ref to this class in this case because this is |
513 // always called using base::Bind(), which holds a ref. | 750 // always called using base::Bind(), which holds a ref. |
514 base::AutoLock locker(lock_); | 751 base::AutoLock locker(lock_); |
515 posted_to_process_tasks_ = false; | 752 posted_to_process_tasks_ = false; |
516 ProcessTasks(false); | 753 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
517 } | 754 } |
518 | 755 |
519 void MultiplexRouter::UpdateEndpointStateMayRemove( | 756 void MultiplexRouter::UpdateEndpointStateMayRemove( |
520 InterfaceEndpoint* endpoint, | 757 InterfaceEndpoint* endpoint, |
521 EndpointStateUpdateType type) { | 758 EndpointStateUpdateType type) { |
522 switch (type) { | 759 switch (type) { |
523 case ENDPOINT_CLOSED: | 760 case ENDPOINT_CLOSED: |
524 endpoint->set_closed(); | 761 endpoint->set_closed(); |
525 break; | 762 break; |
526 case PEER_ENDPOINT_CLOSED: | 763 case PEER_ENDPOINT_CLOSED: |
527 endpoint->set_peer_closed(); | 764 endpoint->set_peer_closed(); |
765 // If the interface endpoint is performing a sync watch, this makes sure | |
766 // it is notified and eventually exits the sync watch. | |
767 endpoint->SignalSyncMessageEvent(); | |
528 break; | 768 break; |
529 } | 769 } |
530 if (endpoint->closed() && endpoint->peer_closed()) | 770 if (endpoint->closed() && endpoint->peer_closed()) |
531 endpoints_.erase(endpoint->id()); | 771 endpoints_.erase(endpoint->id()); |
532 } | 772 } |
533 | 773 |
534 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 774 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
535 lock_.AssertAcquired(); | 775 lock_.AssertAcquired(); |
536 if (!testing_mode_) | 776 if (!testing_mode_) |
537 RaiseError(); | 777 RaiseError(); |
(...skipping 16 matching lines...) Expand all Loading... | |
554 *inserted = true; | 794 *inserted = true; |
555 } else { | 795 } else { |
556 endpoint = iter->second.get(); | 796 endpoint = iter->second.get(); |
557 } | 797 } |
558 | 798 |
559 return endpoint; | 799 return endpoint; |
560 } | 800 } |
561 | 801 |
562 } // namespace internal | 802 } // namespace internal |
563 } // namespace mojo | 803 } // namespace mojo |
OLD | NEW |