Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/macros.h" | 12 #include "base/macros.h" |
| 13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
| 14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
| 15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
| 16 #include "mojo/public/cpp/bindings/associated_group.h" | 16 #include "mojo/public/cpp/bindings/associated_group.h" |
| 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" | |
| 19 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" | |
| 18 | 20 |
| 19 namespace mojo { | 21 namespace mojo { |
| 20 namespace internal { | 22 namespace internal { |
| 21 | 23 |
| 22 // InterfaceEndpoint stores the information of an interface endpoint registered | 24 // InterfaceEndpoint stores the information of an interface endpoint registered |
| 23 // with the router. Always accessed under the router's lock. | 25 // with the router. |
| 24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 26 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| 25 // this object. | 27 // this object. |
| 26 class MultiplexRouter::InterfaceEndpoint | 28 class MultiplexRouter::InterfaceEndpoint |
| 27 : public base::RefCounted<InterfaceEndpoint> { | 29 : public base::RefCounted<InterfaceEndpoint>, |
| 30 public InterfaceEndpointController { | |
| 28 public: | 31 public: |
| 29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| 30 : router_lock_(&router->lock_), | 33 : router_(router), |
| 31 id_(id), | 34 id_(id), |
| 32 closed_(false), | 35 closed_(false), |
| 33 peer_closed_(false), | 36 peer_closed_(false), |
| 34 client_(nullptr) { | 37 client_(nullptr), |
| 35 router_lock_->AssertAcquired(); | 38 event_signalled_(false) {} |
| 36 } | 39 |
| 40 // --------------------------------------------------------------------------- | |
| 41 // The following public methods are safe to call from any threads without | |
| 42 // locking. | |
| 37 | 43 |
| 38 InterfaceId id() const { return id_; } | 44 InterfaceId id() const { return id_; } |
| 39 | 45 |
| 46 // --------------------------------------------------------------------------- | |
| 47 // The following public methods are called under the router's lock. | |
| 48 | |
| 40 bool closed() const { return closed_; } | 49 bool closed() const { return closed_; } |
| 41 void set_closed() { | 50 void set_closed() { |
| 42 router_lock_->AssertAcquired(); | 51 router_->lock_.AssertAcquired(); |
| 43 closed_ = true; | 52 closed_ = true; |
| 44 } | 53 } |
| 45 | 54 |
| 46 bool peer_closed() const { return peer_closed_; } | 55 bool peer_closed() const { return peer_closed_; } |
| 47 void set_peer_closed() { | 56 void set_peer_closed() { |
| 48 router_lock_->AssertAcquired(); | 57 router_->lock_.AssertAcquired(); |
| 49 peer_closed_ = true; | 58 peer_closed_ = true; |
| 50 } | 59 } |
| 51 | 60 |
| 52 base::SingleThreadTaskRunner* task_runner() const { | 61 base::SingleThreadTaskRunner* task_runner() const { |
| 53 return task_runner_.get(); | 62 return task_runner_.get(); |
| 54 } | 63 } |
| 55 void set_task_runner( | |
| 56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | |
| 57 router_lock_->AssertAcquired(); | |
| 58 task_runner_ = std::move(task_runner); | |
| 59 } | |
| 60 | 64 |
| 61 InterfaceEndpointClient* client() const { return client_; } | 65 InterfaceEndpointClient* client() const { return client_; } |
| 62 void set_client(InterfaceEndpointClient* client) { | 66 |
| 63 router_lock_->AssertAcquired(); | 67 void AttachClient(InterfaceEndpointClient* client) { |
| 68 router_->lock_.AssertAcquired(); | |
| 69 DCHECK(!client_); | |
| 70 DCHECK(!closed_); | |
| 71 | |
| 72 task_runner_ = base::MessageLoop::current()->task_runner(); | |
| 64 client_ = client; | 73 client_ = client; |
| 65 } | 74 } |
| 66 | 75 |
| 76 // It should be called on the same thread as the corresponding AttachClient() | |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: should = must? Also perhaps "This" or "This m
yzshen1
2016/03/29 16:19:01
Done. English is hard. :)
| |
| 77 // call. | |
| 78 void DetachClient() { | |
| 79 router_->lock_.AssertAcquired(); | |
| 80 DCHECK(client_); | |
| 81 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 82 DCHECK(!closed_); | |
| 83 | |
| 84 task_runner_ = nullptr; | |
| 85 client_ = nullptr; | |
| 86 sync_watcher_.reset(); | |
| 87 } | |
| 88 | |
| 89 void SignalSyncMessageEvent() { | |
| 90 router_->lock_.AssertAcquired(); | |
| 91 if (event_signalled_) | |
| 92 return; | |
| 93 | |
| 94 EnsureEventMessagePipeExists(); | |
| 95 event_signalled_ = true; | |
| 96 char dummy_message = '\0'; | |
| 97 MojoResult result = | |
| 98 WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1, | |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It is perfectly legal to write a zero-length
yzshen1
2016/03/29 16:19:01
Done.
| |
| 99 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | |
| 100 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 101 } | |
| 102 | |
| 103 // --------------------------------------------------------------------------- | |
| 104 // The following public methods (i.e., InterfaceEndpointController | |
| 105 // implementation) are called by the client on the same thread as the | |
| 106 // AttachClient() call. They are called outside of the router's lock. | |
| 107 | |
| 108 bool SendMessage(Message* message) override { | |
| 109 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 110 message->set_interface_id(id_); | |
| 111 return router_->connector_.Accept(message); | |
| 112 } | |
| 113 | |
| 114 void AllowWokenUpBySyncWatchOnSameThread() override { | |
| 115 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 116 | |
| 117 EnsureSyncWatcherExists(); | |
| 118 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | |
| 119 } | |
| 120 | |
| 121 bool SyncWatch(const bool* should_stop) override { | |
| 122 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 123 | |
| 124 EnsureSyncWatcherExists(); | |
| 125 return sync_watcher_->SyncWatch(should_stop); | |
| 126 } | |
| 127 | |
| 67 private: | 128 private: |
| 68 friend class base::RefCounted<InterfaceEndpoint>; | 129 friend class base::RefCounted<InterfaceEndpoint>; |
| 69 | 130 |
| 70 ~InterfaceEndpoint() { | 131 ~InterfaceEndpoint() override { |
| 71 router_lock_->AssertAcquired(); | 132 router_->lock_.AssertAcquired(); |
| 72 | 133 |
| 73 DCHECK(!client_); | 134 DCHECK(!client_); |
| 74 DCHECK(closed_); | 135 DCHECK(closed_); |
| 75 DCHECK(peer_closed_); | 136 DCHECK(peer_closed_); |
| 76 } | 137 DCHECK(!sync_watcher_); |
| 77 | 138 } |
| 78 base::Lock* const router_lock_; | 139 |
| 140 void OnHandleReady(MojoResult result) { | |
| 141 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 142 scoped_refptr<InterfaceEndpoint> self_protector(this); | |
| 143 scoped_refptr<MultiplexRouter> router_protector(router_); | |
| 144 | |
| 145 // Because we never close |sync_message_event_{sender,receiver}_| before | |
| 146 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | |
| 147 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 148 bool reset_sync_watcher = false; | |
| 149 { | |
| 150 base::AutoLock locker(router_->lock_); | |
| 151 | |
| 152 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | |
| 153 | |
| 154 if (!more_to_process) | |
| 155 ResetSyncMessageSignal(); | |
| 156 | |
| 157 // Currently there are no queued sync messages and the peer has closed so | |
| 158 // there won't be incoming sync messages in the future. | |
| 159 reset_sync_watcher = !more_to_process && peer_closed_; | |
| 160 } | |
| 161 if (reset_sync_watcher) { | |
| 162 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | |
| 163 // on the call stack, resetting the sync watcher will allow it to exit | |
| 164 // when the call stack unwinds to that frame. | |
| 165 sync_watcher_.reset(); | |
| 166 } | |
| 167 } | |
| 168 | |
| 169 void EnsureSyncWatcherExists() { | |
| 170 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 171 if (sync_watcher_) | |
| 172 return; | |
| 173 | |
| 174 { | |
| 175 base::AutoLock locker(router_->lock_); | |
| 176 EnsureEventMessagePipeExists(); | |
| 177 | |
| 178 auto iter = router_->sync_message_tasks_.find(id_); | |
| 179 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | |
| 180 SignalSyncMessageEvent(); | |
| 181 } | |
| 182 | |
| 183 sync_watcher_.reset(new SyncHandleWatcher( | |
| 184 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 185 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); | |
| 186 } | |
| 187 | |
| 188 void EnsureEventMessagePipeExists() { | |
| 189 router_->lock_.AssertAcquired(); | |
| 190 | |
| 191 if (sync_message_event_receiver_.is_valid()) | |
| 192 return; | |
| 193 | |
| 194 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | |
| 195 &sync_message_event_receiver_); | |
| 196 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 197 } | |
| 198 | |
| 199 void ResetSyncMessageSignal() { | |
| 200 router_->lock_.AssertAcquired(); | |
| 201 | |
| 202 if (!event_signalled_) | |
| 203 return; | |
| 204 | |
| 205 DCHECK(sync_message_event_receiver_.is_valid()); | |
| 206 char dummy_message = 0; | |
| 207 uint32_t size = 1; | |
| 208 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), | |
|
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It's fine to pass nullptr for the read buffer
yzshen1
2016/03/29 16:19:01
Done.
| |
| 209 &dummy_message, &size, nullptr, nullptr, | |
| 210 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
| 211 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 212 event_signalled_ = false; | |
| 213 } | |
| 214 | |
| 215 // --------------------------------------------------------------------------- | |
| 216 // The following members are safe to access from any threads. | |
| 217 | |
| 218 MultiplexRouter* const router_; | |
| 79 const InterfaceId id_; | 219 const InterfaceId id_; |
| 80 | 220 |
| 221 // --------------------------------------------------------------------------- | |
| 222 // The following members are accessed under the router's lock. | |
| 223 | |
| 81 // Whether the endpoint has been closed. | 224 // Whether the endpoint has been closed. |
| 82 bool closed_; | 225 bool closed_; |
| 83 // Whether the peer endpoint has been closed. | 226 // Whether the peer endpoint has been closed. |
| 84 bool peer_closed_; | 227 bool peer_closed_; |
| 85 | 228 |
| 86 // The task runner on which |client_| can be accessed. | 229 // The task runner on which |client_|'s methods can be called. |
| 87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 230 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 88 // Not owned. It is null if no client is attached to this endpoint. | 231 // Not owned. It is null if no client is attached to this endpoint. |
| 89 InterfaceEndpointClient* client_; | 232 InterfaceEndpointClient* client_; |
| 90 | 233 |
| 234 // A message pipe used as an event to signal that sync messages are available. | |
| 235 // The message pipe handles are initialized under the router's lock and remain | |
| 236 // unchanged afterwards. They may be accessed outside of the router's lock | |
| 237 // later. | |
| 238 ScopedMessagePipeHandle sync_message_event_sender_; | |
| 239 ScopedMessagePipeHandle sync_message_event_receiver_; | |
| 240 bool event_signalled_; | |
| 241 | |
| 242 // --------------------------------------------------------------------------- | |
| 243 // The following members are only valid while a client is attached. They are | |
| 244 // used exclusively on the client's thread. They may be accessed outside of | |
| 245 // the router's lock. | |
| 246 | |
| 247 scoped_ptr<SyncHandleWatcher> sync_watcher_; | |
| 248 | |
| 91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 249 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| 92 }; | 250 }; |
| 93 | 251 |
| 94 struct MultiplexRouter::Task { | 252 struct MultiplexRouter::Task { |
| 95 public: | 253 public: |
| 96 // Doesn't take ownership of |message| but takes its contents. | 254 // Doesn't take ownership of |message| but takes its contents. |
| 97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { | 255 static scoped_ptr<Task> CreateMessageTask(Message* message) { |
| 98 Task* task = new Task(); | 256 Task* task = new Task(MESSAGE); |
| 99 task->message.reset(new Message); | 257 task->message.reset(new Message); |
| 100 message->MoveTo(task->message.get()); | 258 message->MoveTo(task->message.get()); |
| 101 return make_scoped_ptr(task); | 259 return make_scoped_ptr(task); |
| 102 } | 260 } |
| 103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | 261 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
| 104 Task* task = new Task(); | 262 Task* task = new Task(NOTIFY_ERROR); |
| 105 task->endpoint_to_notify = endpoint; | 263 task->endpoint_to_notify = endpoint; |
| 106 return make_scoped_ptr(task); | 264 return make_scoped_ptr(task); |
| 107 } | 265 } |
| 108 | 266 |
| 109 ~Task() {} | 267 ~Task() {} |
| 110 | 268 |
| 111 bool IsIncomingMessageTask() const { return !!message; } | 269 bool IsMessageTask() const { return type == MESSAGE; } |
| 112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | 270 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
| 113 | 271 |
| 114 scoped_ptr<Message> message; | 272 scoped_ptr<Message> message; |
| 115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 273 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 116 | 274 |
| 275 enum Type { MESSAGE, NOTIFY_ERROR }; | |
| 276 Type type; | |
| 277 | |
| 117 private: | 278 private: |
| 118 Task() {} | 279 explicit Task(Type in_type) : type(in_type) {} |
| 119 }; | 280 }; |
| 120 | 281 |
| 121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 282 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| 122 ScopedMessagePipeHandle message_pipe) | 283 ScopedMessagePipeHandle message_pipe) |
| 123 : RefCountedDeleteOnMessageLoop( | 284 : RefCountedDeleteOnMessageLoop( |
| 124 base::MessageLoop::current()->task_runner()), | 285 base::MessageLoop::current()->task_runner()), |
| 125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 286 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 126 header_validator_(this), | 287 header_validator_(this), |
| 127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), | 288 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
| 128 encountered_error_(false), | |
| 129 control_message_handler_(this), | 289 control_message_handler_(this), |
| 130 control_message_proxy_(&connector_), | 290 control_message_proxy_(&connector_), |
| 131 next_interface_id_value_(1), | 291 next_interface_id_value_(1), |
| 132 posted_to_process_tasks_(false), | 292 posted_to_process_tasks_(false), |
| 293 encountered_error_(false), | |
| 133 testing_mode_(false) { | 294 testing_mode_(false) { |
| 295 // Always participate in sync handle watching, because even if it doesn't | |
| 296 // expect sync requests during sync handle watching, it may still need to | |
| 297 // dispatch messages to associated endpoints on a different thread. | |
| 298 connector_.AllowWokenUpBySyncWatchOnSameThread(); | |
| 134 connector_.set_incoming_receiver(&header_validator_); | 299 connector_.set_incoming_receiver(&header_validator_); |
| 135 connector_.set_connection_error_handler( | 300 connector_.set_connection_error_handler( |
| 136 [this]() { OnPipeConnectionError(); }); | 301 [this]() { OnPipeConnectionError(); }); |
| 137 } | 302 } |
| 138 | 303 |
| 139 MultiplexRouter::~MultiplexRouter() { | 304 MultiplexRouter::~MultiplexRouter() { |
| 140 base::AutoLock locker(lock_); | 305 base::AutoLock locker(lock_); |
| 141 | 306 |
| 307 sync_message_tasks_.clear(); | |
| 142 tasks_.clear(); | 308 tasks_.clear(); |
| 143 | 309 |
| 144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 310 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 145 InterfaceEndpoint* endpoint = iter->second.get(); | 311 InterfaceEndpoint* endpoint = iter->second.get(); |
| 146 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 312 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 147 // because it may remove the corresponding value from the map. | 313 // because it may remove the corresponding value from the map. |
| 148 ++iter; | 314 ++iter; |
| 149 | 315 |
| 150 DCHECK(endpoint->closed()); | 316 DCHECK(endpoint->closed()); |
| 151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 317 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 214 | 380 |
| 215 DCHECK(ContainsKey(endpoints_, id)); | 381 DCHECK(ContainsKey(endpoints_, id)); |
| 216 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 382 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 217 DCHECK(!endpoint->client()); | 383 DCHECK(!endpoint->client()); |
| 218 DCHECK(!endpoint->closed()); | 384 DCHECK(!endpoint->closed()); |
| 219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 385 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 220 | 386 |
| 221 if (!IsMasterInterfaceId(id)) | 387 if (!IsMasterInterfaceId(id)) |
| 222 control_message_proxy_.NotifyPeerEndpointClosed(id); | 388 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 223 | 389 |
| 224 ProcessTasks(true); | 390 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| 225 } | 391 } |
| 226 | 392 |
| 227 void MultiplexRouter::AttachEndpointClient( | 393 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| 228 const ScopedInterfaceEndpointHandle& handle, | 394 const ScopedInterfaceEndpointHandle& handle, |
| 229 InterfaceEndpointClient* client) { | 395 InterfaceEndpointClient* client) { |
| 230 const InterfaceId id = handle.id(); | 396 const InterfaceId id = handle.id(); |
| 231 | 397 |
| 232 DCHECK(IsValidInterfaceId(id)); | 398 DCHECK(IsValidInterfaceId(id)); |
| 233 DCHECK(client); | 399 DCHECK(client); |
| 234 | 400 |
| 235 base::AutoLock locker(lock_); | 401 base::AutoLock locker(lock_); |
| 236 DCHECK(ContainsKey(endpoints_, id)); | 402 DCHECK(ContainsKey(endpoints_, id)); |
| 237 | 403 |
| 238 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 404 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 239 DCHECK(!endpoint->client()); | 405 endpoint->AttachClient(client); |
| 240 DCHECK(!endpoint->closed()); | |
| 241 | |
| 242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | |
| 243 endpoint->set_client(client); | |
| 244 | 406 |
| 245 if (endpoint->peer_closed()) | 407 if (endpoint->peer_closed()) |
| 246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 408 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 247 ProcessTasks(true); | 409 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| 410 | |
| 411 return endpoint; | |
| 248 } | 412 } |
| 249 | 413 |
| 250 void MultiplexRouter::DetachEndpointClient( | 414 void MultiplexRouter::DetachEndpointClient( |
| 251 const ScopedInterfaceEndpointHandle& handle) { | 415 const ScopedInterfaceEndpointHandle& handle) { |
| 252 const InterfaceId id = handle.id(); | 416 const InterfaceId id = handle.id(); |
| 253 | 417 |
| 254 DCHECK(IsValidInterfaceId(id)); | 418 DCHECK(IsValidInterfaceId(id)); |
| 255 | 419 |
| 256 base::AutoLock locker(lock_); | 420 base::AutoLock locker(lock_); |
| 257 DCHECK(ContainsKey(endpoints_, id)); | 421 DCHECK(ContainsKey(endpoints_, id)); |
| 258 | 422 |
| 259 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 423 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 260 DCHECK(endpoint->client()); | 424 endpoint->DetachClient(); |
| 261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | |
| 262 DCHECK(!endpoint->closed()); | |
| 263 | |
| 264 endpoint->set_task_runner(nullptr); | |
| 265 endpoint->set_client(nullptr); | |
| 266 } | |
| 267 | |
| 268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
| 269 Message* message) { | |
| 270 message->set_interface_id(handle.id()); | |
| 271 return connector_.Accept(message); | |
| 272 } | 425 } |
| 273 | 426 |
| 274 void MultiplexRouter::RaiseError() { | 427 void MultiplexRouter::RaiseError() { |
| 275 if (task_runner_->BelongsToCurrentThread()) { | 428 if (task_runner_->BelongsToCurrentThread()) { |
| 276 connector_.RaiseError(); | 429 connector_.RaiseError(); |
| 277 } else { | 430 } else { |
| 278 task_runner_->PostTask(FROM_HERE, | 431 task_runner_->PostTask(FROM_HERE, |
| 279 base::Bind(&MultiplexRouter::RaiseError, this)); | 432 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 280 } | 433 } |
| 281 } | 434 } |
| 282 | 435 |
| 283 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 436 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
| 284 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); | 437 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); |
| 285 group->router_ = this; | 438 group->router_ = this; |
| 286 return group; | 439 return group; |
| 287 } | 440 } |
| 288 | 441 |
| 289 // static | 442 // static |
| 290 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { | 443 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
| 291 return associated_group->router_.get(); | 444 return associated_group->router_.get(); |
| 292 } | 445 } |
| 293 | 446 |
| 447 void MultiplexRouter::CloseMessagePipe() { | |
| 448 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 449 connector_.CloseMessagePipe(); | |
| 450 // CloseMessagePipe() above won't trigger connection error handler. | |
| 451 // Explicitly call OnPipeConnectionError() so that associated endpoints will | |
| 452 // get notified. | |
| 453 OnPipeConnectionError(); | |
| 454 } | |
| 455 | |
| 294 bool MultiplexRouter::HasAssociatedEndpoints() const { | 456 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 295 DCHECK(thread_checker_.CalledOnValidThread()); | 457 DCHECK(thread_checker_.CalledOnValidThread()); |
| 296 base::AutoLock locker(lock_); | 458 base::AutoLock locker(lock_); |
| 297 | 459 |
| 298 if (endpoints_.size() > 1) | 460 if (endpoints_.size() > 1) |
| 299 return true; | 461 return true; |
| 300 if (endpoints_.size() == 0) | 462 if (endpoints_.size() == 0) |
| 301 return false; | 463 return false; |
| 302 | 464 |
| 303 return !ContainsKey(endpoints_, kMasterInterfaceId); | 465 return !ContainsKey(endpoints_, kMasterInterfaceId); |
| 304 } | 466 } |
| 305 | 467 |
| 306 void MultiplexRouter::EnableTestingMode() { | 468 void MultiplexRouter::EnableTestingMode() { |
| 307 DCHECK(thread_checker_.CalledOnValidThread()); | 469 DCHECK(thread_checker_.CalledOnValidThread()); |
| 308 base::AutoLock locker(lock_); | 470 base::AutoLock locker(lock_); |
| 309 | 471 |
| 310 testing_mode_ = true; | 472 testing_mode_ = true; |
| 311 connector_.set_enforce_errors_from_incoming_receiver(false); | 473 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 312 } | 474 } |
| 313 | 475 |
| 314 bool MultiplexRouter::Accept(Message* message) { | 476 bool MultiplexRouter::Accept(Message* message) { |
| 315 DCHECK(thread_checker_.CalledOnValidThread()); | 477 DCHECK(thread_checker_.CalledOnValidThread()); |
| 316 | 478 |
| 317 scoped_refptr<MultiplexRouter> protector(this); | 479 scoped_refptr<MultiplexRouter> protector(this); |
| 318 base::AutoLock locker(lock_); | 480 base::AutoLock locker(lock_); |
| 319 | 481 |
| 320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); | 482 ClientCallBehavior client_call_behavior = |
| 483 connector_.during_sync_handle_watcher_callback() | |
| 484 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | |
| 485 : ALLOW_DIRECT_CLIENT_CALLS; | |
| 486 | |
| 487 bool processed = | |
| 488 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); | |
| 321 | 489 |
| 322 if (!processed) { | 490 if (!processed) { |
| 323 // Either the task queue is not empty or we cannot process the message | 491 // Either the task queue is not empty or we cannot process the message |
| 324 // directly. In both cases, there is no need to call ProcessTasks(). | 492 // directly. In both cases, there is no need to call ProcessTasks(). |
| 325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | 493 tasks_.push_back(Task::CreateMessageTask(message)); |
| 494 Task* task = tasks_.back().get(); | |
| 495 | |
| 496 if (task->message->has_flag(kMessageIsSync)) { | |
| 497 InterfaceId id = task->message->interface_id(); | |
| 498 sync_message_tasks_[id].push_back(task); | |
| 499 auto iter = endpoints_.find(id); | |
| 500 if (iter != endpoints_.end()) | |
| 501 iter->second->SignalSyncMessageEvent(); | |
| 502 } | |
| 326 } else if (!tasks_.empty()) { | 503 } else if (!tasks_.empty()) { |
| 327 // Processing the message may result in new tasks (for error notification) | 504 // Processing the message may result in new tasks (for error notification) |
| 328 // being added to the queue. In this case, we have to attempt to process the | 505 // being added to the queue. In this case, we have to attempt to process the |
| 329 // tasks. | 506 // tasks. |
| 330 ProcessTasks(false); | 507 ProcessTasks(client_call_behavior); |
| 331 } | 508 } |
| 332 | 509 |
| 333 // Always return true. If we see errors during message processing, we will | 510 // Always return true. If we see errors during message processing, we will |
| 334 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 511 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 335 return true; | 512 return true; |
| 336 } | 513 } |
| 337 | 514 |
| 338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 515 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 339 lock_.AssertAcquired(); | 516 lock_.AssertAcquired(); |
| 340 | 517 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 381 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 558 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 382 // because it may remove the corresponding value from the map. | 559 // because it may remove the corresponding value from the map. |
| 383 ++iter; | 560 ++iter; |
| 384 | 561 |
| 385 if (endpoint->client()) | 562 if (endpoint->client()) |
| 386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 563 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 387 | 564 |
| 388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 565 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 389 } | 566 } |
| 390 | 567 |
| 391 ProcessTasks(false); | 568 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| 569 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | |
| 570 : ALLOW_DIRECT_CLIENT_CALLS); | |
| 392 } | 571 } |
| 393 | 572 |
| 394 void MultiplexRouter::ProcessTasks(bool force_async) { | 573 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
| 395 lock_.AssertAcquired(); | 574 lock_.AssertAcquired(); |
| 396 | 575 |
| 397 if (posted_to_process_tasks_) | 576 if (posted_to_process_tasks_) |
| 398 return; | 577 return; |
| 399 | 578 |
| 400 while (!tasks_.empty()) { | 579 while (!tasks_.empty()) { |
| 401 scoped_ptr<Task> task(std::move(tasks_.front())); | 580 scoped_ptr<Task> task(std::move(tasks_.front())); |
| 402 tasks_.pop_front(); | 581 tasks_.pop_front(); |
| 403 | 582 |
| 583 InterfaceId id = kInvalidInterfaceId; | |
| 584 bool sync_message = task->IsMessageTask() && task->message && | |
| 585 task->message->has_flag(kMessageIsSync); | |
| 586 if (sync_message) { | |
| 587 InterfaceId id = task->message->interface_id(); | |
| 588 auto& sync_message_queue = sync_message_tasks_[id]; | |
| 589 DCHECK_EQ(task.get(), sync_message_queue.front()); | |
| 590 sync_message_queue.pop_front(); | |
| 591 } | |
| 592 | |
| 404 bool processed = | 593 bool processed = |
| 405 task->IsNotifyErrorTask() | 594 task->IsNotifyErrorTask() |
| 406 ? ProcessNotifyErrorTask(task.get(), force_async) | 595 ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
| 407 : ProcessIncomingMessage(task->message.get(), force_async); | 596 : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
| 408 | 597 |
| 409 if (!processed) { | 598 if (!processed) { |
| 410 tasks_.push_front(std::move(task)); | 599 tasks_.push_front(std::move(task)); |
| 600 if (sync_message) { | |
| 601 auto& sync_message_queue = sync_message_tasks_[id]; | |
| 602 sync_message_queue.push_front(task.get()); | |
| 603 } | |
| 411 break; | 604 break; |
| 605 } else { | |
| 606 if (sync_message) { | |
| 607 auto iter = sync_message_tasks_.find(id); | |
| 608 if (iter != sync_message_tasks_.end() && iter->second.empty()) | |
| 609 sync_message_tasks_.erase(iter); | |
| 610 } | |
| 412 } | 611 } |
| 413 } | 612 } |
| 414 } | 613 } |
| 415 | 614 |
| 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { | 615 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| 616 lock_.AssertAcquired(); | |
| 617 | |
| 618 auto iter = sync_message_tasks_.find(id); | |
| 619 if (iter == sync_message_tasks_.end()) | |
| 620 return false; | |
| 621 | |
| 622 MultiplexRouter::Task* task = iter->second.front(); | |
| 623 iter->second.pop_front(); | |
| 624 | |
| 625 DCHECK(task->IsMessageTask()); | |
| 626 scoped_ptr<Message> message(std::move(task->message)); | |
| 627 | |
| 628 // Note: after this call, |task| and |iter| may be invalidated. | |
| 629 bool processed = ProcessIncomingMessage( | |
| 630 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); | |
| 631 DCHECK(processed); | |
| 632 | |
| 633 iter = sync_message_tasks_.find(id); | |
| 634 return iter != sync_message_tasks_.end() && !iter->second.empty(); | |
| 635 } | |
| 636 | |
| 637 bool MultiplexRouter::ProcessNotifyErrorTask( | |
| 638 Task* task, | |
| 639 ClientCallBehavior client_call_behavior) { | |
| 417 lock_.AssertAcquired(); | 640 lock_.AssertAcquired(); |
| 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 641 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 419 if (!endpoint->client()) | 642 if (!endpoint->client()) |
| 420 return true; | 643 return true; |
| 421 | 644 |
| 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 645 if (!endpoint->task_runner()->BelongsToCurrentThread() || |
| 646 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { | |
| 423 MaybePostToProcessTasks(endpoint->task_runner()); | 647 MaybePostToProcessTasks(endpoint->task_runner()); |
| 424 return false; | 648 return false; |
| 425 } | 649 } |
| 426 | 650 |
| 427 InterfaceEndpointClient* client = endpoint->client(); | 651 InterfaceEndpointClient* client = endpoint->client(); |
| 428 { | 652 { |
| 429 // We must unlock before calling into |client| because it may call this | 653 // We must unlock before calling into |client| because it may call this |
| 430 // object within NotifyError(). Holding the lock will lead to deadlock. | 654 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 431 // | 655 // |
| 432 // It is safe to call into |client| without the lock. Because |client| is | 656 // It is safe to call into |client| without the lock. Because |client| is |
| 433 // always accessed on the same thread, including DetachEndpointClient(). | 657 // always accessed on the same thread, including DetachEndpointClient(). |
| 434 base::AutoUnlock unlocker(lock_); | 658 base::AutoUnlock unlocker(lock_); |
| 435 client->NotifyError(); | 659 client->NotifyError(); |
| 436 } | 660 } |
| 437 return true; | 661 return true; |
| 438 } | 662 } |
| 439 | 663 |
| 440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, | 664 bool MultiplexRouter::ProcessIncomingMessage( |
| 441 bool force_async) { | 665 Message* message, |
| 666 ClientCallBehavior client_call_behavior) { | |
| 442 lock_.AssertAcquired(); | 667 lock_.AssertAcquired(); |
| 668 | |
| 669 if (!message) { | |
| 670 // This is a sync message and has been processed during sync handle | |
| 671 // watching. | |
| 672 return true; | |
| 673 } | |
| 674 | |
| 443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 675 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 444 if (!control_message_handler_.Accept(message)) | 676 if (!control_message_handler_.Accept(message)) |
| 445 RaiseErrorInNonTestingMode(); | 677 RaiseErrorInNonTestingMode(); |
| 446 return true; | 678 return true; |
| 447 } | 679 } |
| 448 | 680 |
| 449 InterfaceId id = message->interface_id(); | 681 InterfaceId id = message->interface_id(); |
| 450 DCHECK(IsValidInterfaceId(id)); | 682 DCHECK(IsValidInterfaceId(id)); |
| 451 | 683 |
| 452 bool inserted = false; | 684 bool inserted = false; |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 467 | 699 |
| 468 if (endpoint->closed()) | 700 if (endpoint->closed()) |
| 469 return true; | 701 return true; |
| 470 | 702 |
| 471 if (!endpoint->client()) { | 703 if (!endpoint->client()) { |
| 472 // We need to wait until a client is attached in order to dispatch further | 704 // We need to wait until a client is attached in order to dispatch further |
| 473 // messages. | 705 // messages. |
| 474 return false; | 706 return false; |
| 475 } | 707 } |
| 476 | 708 |
| 477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 709 bool can_direct_call = |
| 710 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || | |
| 711 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && | |
| 712 message->has_flag(kMessageIsSync)); | |
| 713 | |
| 714 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { | |
| 478 MaybePostToProcessTasks(endpoint->task_runner()); | 715 MaybePostToProcessTasks(endpoint->task_runner()); |
| 479 return false; | 716 return false; |
| 480 } | 717 } |
| 481 | 718 |
| 482 InterfaceEndpointClient* client = endpoint->client(); | 719 InterfaceEndpointClient* client = endpoint->client(); |
| 483 bool result = false; | 720 bool result = false; |
| 484 { | 721 { |
| 485 // We must unlock before calling into |client| because it may call this | 722 // We must unlock before calling into |client| because it may call this |
| 486 // object within HandleIncomingMessage(). Holding the lock will lead to | 723 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 487 // deadlock. | 724 // deadlock. |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 506 posted_to_process_tasks_ = true; | 743 posted_to_process_tasks_ = true; |
| 507 task_runner->PostTask( | 744 task_runner->PostTask( |
| 508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 745 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 509 } | 746 } |
| 510 | 747 |
| 511 void MultiplexRouter::LockAndCallProcessTasks() { | 748 void MultiplexRouter::LockAndCallProcessTasks() { |
| 512 // There is no need to hold a ref to this class in this case because this is | 749 // There is no need to hold a ref to this class in this case because this is |
| 513 // always called using base::Bind(), which holds a ref. | 750 // always called using base::Bind(), which holds a ref. |
| 514 base::AutoLock locker(lock_); | 751 base::AutoLock locker(lock_); |
| 515 posted_to_process_tasks_ = false; | 752 posted_to_process_tasks_ = false; |
| 516 ProcessTasks(false); | 753 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
| 517 } | 754 } |
| 518 | 755 |
| 519 void MultiplexRouter::UpdateEndpointStateMayRemove( | 756 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 520 InterfaceEndpoint* endpoint, | 757 InterfaceEndpoint* endpoint, |
| 521 EndpointStateUpdateType type) { | 758 EndpointStateUpdateType type) { |
| 522 switch (type) { | 759 switch (type) { |
| 523 case ENDPOINT_CLOSED: | 760 case ENDPOINT_CLOSED: |
| 524 endpoint->set_closed(); | 761 endpoint->set_closed(); |
| 525 break; | 762 break; |
| 526 case PEER_ENDPOINT_CLOSED: | 763 case PEER_ENDPOINT_CLOSED: |
| 527 endpoint->set_peer_closed(); | 764 endpoint->set_peer_closed(); |
| 765 // If the interface endpoint is performing a sync watch, this makes sure | |
| 766 // it is notified and eventually exits the sync watch. | |
| 767 endpoint->SignalSyncMessageEvent(); | |
| 528 break; | 768 break; |
| 529 } | 769 } |
| 530 if (endpoint->closed() && endpoint->peer_closed()) | 770 if (endpoint->closed() && endpoint->peer_closed()) |
| 531 endpoints_.erase(endpoint->id()); | 771 endpoints_.erase(endpoint->id()); |
| 532 } | 772 } |
| 533 | 773 |
| 534 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 774 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 535 lock_.AssertAcquired(); | 775 lock_.AssertAcquired(); |
| 536 if (!testing_mode_) | 776 if (!testing_mode_) |
| 537 RaiseError(); | 777 RaiseError(); |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 554 *inserted = true; | 794 *inserted = true; |
| 555 } else { | 795 } else { |
| 556 endpoint = iter->second.get(); | 796 endpoint = iter->second.get(); |
| 557 } | 797 } |
| 558 | 798 |
| 559 return endpoint; | 799 return endpoint; |
| 560 } | 800 } |
| 561 | 801 |
| 562 } // namespace internal | 802 } // namespace internal |
| 563 } // namespace mojo | 803 } // namespace mojo |
| OLD | NEW |