| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/macros.h" | 12 #include "base/macros.h" |
| 13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
| 14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
| 15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
| 16 #include "mojo/public/cpp/bindings/associated_group.h" | 16 #include "mojo/public/cpp/bindings/associated_group.h" |
| 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 18 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
| 18 | 19 |
| 19 namespace mojo { | 20 namespace mojo { |
| 20 namespace internal { | 21 namespace internal { |
| 21 | 22 |
| 22 // InterfaceEndpoint stores the information of an interface endpoint registered | 23 // InterfaceEndpoint stores the information of an interface endpoint registered |
| 23 // with the router. Always accessed under the router's lock. | 24 // with the router. Always accessed under the router's lock. |
| 24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 25 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| 25 // this object. | 26 // this object. |
| 27 // TODO(yzshen): update the comment about lock. |
| 26 class MultiplexRouter::InterfaceEndpoint | 28 class MultiplexRouter::InterfaceEndpoint |
| 27 : public base::RefCounted<InterfaceEndpoint> { | 29 : public base::RefCounted<InterfaceEndpoint>, |
| 30 public InterfaceEndpointController { |
| 28 public: | 31 public: |
| 29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| 30 : router_lock_(&router->lock_), | 33 : router_(router), |
| 31 id_(id), | 34 id_(id), |
| 32 closed_(false), | 35 closed_(false), |
| 33 peer_closed_(false), | 36 peer_closed_(false), |
| 34 client_(nullptr) { | 37 client_(nullptr), |
| 35 router_lock_->AssertAcquired(); | 38 event_signalled_(false) { |
| 39 router_->lock_.AssertAcquired(); |
| 40 |
| 41 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
| 42 &sync_message_event_receiver_); |
| 43 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 36 } | 44 } |
| 37 | 45 |
| 38 InterfaceId id() const { return id_; } | 46 InterfaceId id() const { return id_; } |
| 39 | 47 |
| 40 bool closed() const { return closed_; } | 48 bool closed() const { return closed_; } |
| 41 void set_closed() { | 49 void set_closed() { |
| 42 router_lock_->AssertAcquired(); | 50 router_->lock_.AssertAcquired(); |
| 43 closed_ = true; | 51 closed_ = true; |
| 44 } | 52 } |
| 45 | 53 |
| 46 bool peer_closed() const { return peer_closed_; } | 54 bool peer_closed() const { return peer_closed_; } |
| 47 void set_peer_closed() { | 55 void set_peer_closed() { |
| 48 router_lock_->AssertAcquired(); | 56 router_->lock_.AssertAcquired(); |
| 49 peer_closed_ = true; | 57 peer_closed_ = true; |
| 50 } | 58 } |
| 51 | 59 |
| 52 base::SingleThreadTaskRunner* task_runner() const { | 60 base::SingleThreadTaskRunner* task_runner() const { |
| 53 return task_runner_.get(); | 61 return task_runner_.get(); |
| 54 } | 62 } |
| 55 void set_task_runner( | 63 void set_task_runner( |
| 56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | 64 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| 57 router_lock_->AssertAcquired(); | 65 router_->lock_.AssertAcquired(); |
| 58 task_runner_ = std::move(task_runner); | 66 task_runner_ = std::move(task_runner); |
| 59 } | 67 } |
| 60 | 68 |
| 61 InterfaceEndpointClient* client() const { return client_; } | 69 InterfaceEndpointClient* client() const { return client_; } |
| 62 void set_client(InterfaceEndpointClient* client) { | 70 void set_client(InterfaceEndpointClient* client) { |
| 63 router_lock_->AssertAcquired(); | 71 router_->lock_.AssertAcquired(); |
| 64 client_ = client; | 72 client_ = client; |
| 65 } | 73 } |
| 66 | 74 |
| 75 void ClearSyncHandleWatcher() { |
| 76 router_->lock_.AssertAcquired(); |
| 77 sync_watcher_.reset(); |
| 78 } |
| 79 |
| 80 void SignalSyncMessageReceived() { |
| 81 router_->lock_.AssertAcquired(); |
| 82 |
| 83 if (event_signalled_) |
| 84 return; |
| 85 |
| 86 event_signalled_ = true; |
| 87 char dummy_message = '\0'; |
| 88 MojoResult result = |
| 89 WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1, |
| 90 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
| 91 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 92 } |
| 93 |
| 94 void ResetSyncMessageSignal() { |
| 95 router_->lock_.AssertAcquired(); |
| 96 |
| 97 if (!event_signalled_) |
| 98 return; |
| 99 |
| 100 char dummy_message = 0; |
| 101 uint32_t size = 1; |
| 102 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), |
| 103 &dummy_message, &size, nullptr, nullptr, |
| 104 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| 105 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 106 event_signalled_ = false; |
| 107 } |
| 108 |
| 109 bool SendMessage(Message* message) override { |
| 110 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 111 message->set_interface_id(id_); |
| 112 return router_->connector_.Accept(message); |
| 113 } |
| 114 |
| 115 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 116 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 117 |
| 118 { |
| 119 base::AutoLock locker(router_->lock_); |
| 120 |
| 121 auto iter = router_->sync_message_tasks_.find(id_); |
| 122 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| 123 SignalSyncMessageReceived(); |
| 124 } |
| 125 |
| 126 EnsureSyncWatcherExists(); |
| 127 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 128 } |
| 129 |
| 130 bool SyncWatch(const bool* should_stop) override { |
| 131 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 132 |
| 133 EnsureSyncWatcherExists(); |
| 134 return sync_watcher_->SyncWatch(should_stop); |
| 135 } |
| 136 |
| 67 private: | 137 private: |
| 68 friend class base::RefCounted<InterfaceEndpoint>; | 138 friend class base::RefCounted<InterfaceEndpoint>; |
| 69 | 139 |
| 70 ~InterfaceEndpoint() { | 140 ~InterfaceEndpoint() override { |
| 71 router_lock_->AssertAcquired(); | 141 router_->lock_.AssertAcquired(); |
| 72 | 142 |
| 73 DCHECK(!client_); | 143 DCHECK(!client_); |
| 74 DCHECK(closed_); | 144 DCHECK(closed_); |
| 75 DCHECK(peer_closed_); | 145 DCHECK(peer_closed_); |
| 146 DCHECK(!sync_watcher_); |
| 76 } | 147 } |
| 77 | 148 |
| 78 base::Lock* const router_lock_; | 149 void OnHandleReady(MojoResult result); |
| 150 |
| 151 void EnsureSyncWatcherExists() { |
| 152 if (sync_watcher_) |
| 153 return; |
| 154 |
| 155 sync_watcher_.reset(new SyncHandleWatcher( |
| 156 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 157 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
| 158 } |
| 159 |
| 160 MultiplexRouter* const router_; |
| 79 const InterfaceId id_; | 161 const InterfaceId id_; |
| 80 | 162 |
| 81 // Whether the endpoint has been closed. | 163 // Whether the endpoint has been closed. |
| 82 bool closed_; | 164 bool closed_; |
| 83 // Whether the peer endpoint has been closed. | 165 // Whether the peer endpoint has been closed. |
| 84 bool peer_closed_; | 166 bool peer_closed_; |
| 85 | 167 |
| 86 // The task runner on which |client_| can be accessed. | 168 // The task runner on which |client_| can be accessed. |
| 87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 169 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 88 // Not owned. It is null if no client is attached to this endpoint. | 170 // Not owned. It is null if no client is attached to this endpoint. |
| 89 InterfaceEndpointClient* client_; | 171 InterfaceEndpointClient* client_; |
| 90 | 172 |
| 173 // TODO(yzshen): is it too expensive? Do I need to lazy init? |
| 174 ScopedMessagePipeHandle sync_message_event_sender_; |
| 175 ScopedMessagePipeHandle sync_message_event_receiver_; |
| 176 bool event_signalled_; |
| 177 scoped_ptr<SyncHandleWatcher> sync_watcher_; |
| 178 |
| 179 // TODO(yzshen): The handling of sync watching is quite similar to what |
| 180 // Connector does. Consider unifying them. |
| 181 // |
| 182 // If non-zero, |sync_message_event_receiver_| should be registered with |
| 183 // SyncHandleRegistry. |
| 184 size_t register_sync_handle_watch_count_; |
| 185 scoped_refptr<base::RefCountedData<bool>> should_stop_sync_handle_watch_; |
| 186 |
| 91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 187 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| 92 }; | 188 }; |
| 93 | 189 |
| 94 struct MultiplexRouter::Task { | 190 struct MultiplexRouter::Task { |
| 95 public: | 191 public: |
| 96 // Doesn't take ownership of |message| but takes its contents. | 192 // Doesn't take ownership of |message| but takes its contents. |
| 97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { | 193 static scoped_ptr<Task> CreateMessageTask(Message* message) { |
| 98 Task* task = new Task(); | 194 Task* task = new Task(MESSAGE); |
| 99 task->message.reset(new Message); | 195 task->message.reset(new Message); |
| 100 message->MoveTo(task->message.get()); | 196 message->MoveTo(task->message.get()); |
| 101 return make_scoped_ptr(task); | 197 return make_scoped_ptr(task); |
| 102 } | 198 } |
| 103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { | 199 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
| 104 Task* task = new Task(); | 200 Task* task = new Task(NOTIFY_ERROR); |
| 105 task->endpoint_to_notify = endpoint; | 201 task->endpoint_to_notify = endpoint; |
| 106 return make_scoped_ptr(task); | 202 return make_scoped_ptr(task); |
| 107 } | 203 } |
| 108 | 204 |
| 109 ~Task() {} | 205 ~Task() {} |
| 110 | 206 |
| 111 bool IsIncomingMessageTask() const { return !!message; } | 207 bool IsMessageTask() const { return type == MESSAGE; } |
| 112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } | 208 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
| 113 | 209 |
| 114 scoped_ptr<Message> message; | 210 scoped_ptr<Message> message; |
| 115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 211 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 116 | 212 |
| 213 enum Type { MESSAGE, NOTIFY_ERROR }; |
| 214 |
| 215 Type type; |
| 216 |
| 117 private: | 217 private: |
| 118 Task() {} | 218 explicit Task(Type in_type) : type(in_type) {} |
| 119 }; | 219 }; |
| 120 | 220 |
| 221 void MultiplexRouter::InterfaceEndpoint::OnHandleReady(MojoResult result) { |
| 222 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 223 scoped_refptr<InterfaceEndpoint> self_protector(this); |
| 224 scoped_refptr<MultiplexRouter> router_protector(router_); |
| 225 |
| 226 { |
| 227 base::AutoLock locker(router_->lock_); |
| 228 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| 229 |
| 230 if (!more_to_process) |
| 231 ResetSyncMessageSignal(); |
| 232 |
| 233 bool no_more_sync_messages = !more_to_process && peer_closed_; |
| 234 bool sync_handle_watch_failed = result != MOJO_RESULT_OK; |
| 235 |
| 236 if (no_more_sync_messages || sync_handle_watch_failed) |
| 237 sync_watcher_.reset(); |
| 238 } |
| 239 } |
| 240 |
| 121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 241 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| 122 ScopedMessagePipeHandle message_pipe) | 242 ScopedMessagePipeHandle message_pipe) |
| 123 : RefCountedDeleteOnMessageLoop( | 243 : RefCountedDeleteOnMessageLoop( |
| 124 base::MessageLoop::current()->task_runner()), | 244 base::MessageLoop::current()->task_runner()), |
| 125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 245 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 126 header_validator_(this), | 246 header_validator_(this), |
| 127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), | 247 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
| 128 encountered_error_(false), | 248 encountered_error_(false), |
| 129 control_message_handler_(this), | 249 control_message_handler_(this), |
| 130 control_message_proxy_(&connector_), | 250 control_message_proxy_(&connector_), |
| 131 next_interface_id_value_(1), | 251 next_interface_id_value_(1), |
| 132 posted_to_process_tasks_(false), | 252 posted_to_process_tasks_(false), |
| 133 testing_mode_(false) { | 253 testing_mode_(false) { |
| 254 // Always participate in sync handle watch, because it may want to dispatch |
| 255 // messages to associated endpoints on a different thread; or it want to |
| 256 // dispatch sync requests to the master binding or associated bindings on the |
| 257 // same thread. |
| 258 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 134 connector_.set_incoming_receiver(&header_validator_); | 259 connector_.set_incoming_receiver(&header_validator_); |
| 135 connector_.set_connection_error_handler( | 260 connector_.set_connection_error_handler( |
| 136 [this]() { OnPipeConnectionError(); }); | 261 [this]() { OnPipeConnectionError(); }); |
| 137 } | 262 } |
| 138 | 263 |
| 139 MultiplexRouter::~MultiplexRouter() { | 264 MultiplexRouter::~MultiplexRouter() { |
| 140 base::AutoLock locker(lock_); | 265 base::AutoLock locker(lock_); |
| 141 | 266 |
| 267 sync_message_tasks_.clear(); |
| 142 tasks_.clear(); | 268 tasks_.clear(); |
| 143 | 269 |
| 144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 270 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 145 InterfaceEndpoint* endpoint = iter->second.get(); | 271 InterfaceEndpoint* endpoint = iter->second.get(); |
| 146 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 272 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 147 // because it may remove the corresponding value from the map. | 273 // because it may remove the corresponding value from the map. |
| 148 ++iter; | 274 ++iter; |
| 149 | 275 |
| 150 DCHECK(endpoint->closed()); | 276 DCHECK(endpoint->closed()); |
| 151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 277 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 214 | 340 |
| 215 DCHECK(ContainsKey(endpoints_, id)); | 341 DCHECK(ContainsKey(endpoints_, id)); |
| 216 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 342 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 217 DCHECK(!endpoint->client()); | 343 DCHECK(!endpoint->client()); |
| 218 DCHECK(!endpoint->closed()); | 344 DCHECK(!endpoint->closed()); |
| 219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 345 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 220 | 346 |
| 221 if (!IsMasterInterfaceId(id)) | 347 if (!IsMasterInterfaceId(id)) |
| 222 control_message_proxy_.NotifyPeerEndpointClosed(id); | 348 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 223 | 349 |
| 224 ProcessTasks(true); | 350 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| 225 } | 351 } |
| 226 | 352 |
| 227 void MultiplexRouter::AttachEndpointClient( | 353 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| 228 const ScopedInterfaceEndpointHandle& handle, | 354 const ScopedInterfaceEndpointHandle& handle, |
| 229 InterfaceEndpointClient* client) { | 355 InterfaceEndpointClient* client) { |
| 230 const InterfaceId id = handle.id(); | 356 const InterfaceId id = handle.id(); |
| 231 | 357 |
| 232 DCHECK(IsValidInterfaceId(id)); | 358 DCHECK(IsValidInterfaceId(id)); |
| 233 DCHECK(client); | 359 DCHECK(client); |
| 234 | 360 |
| 235 base::AutoLock locker(lock_); | 361 base::AutoLock locker(lock_); |
| 236 DCHECK(ContainsKey(endpoints_, id)); | 362 DCHECK(ContainsKey(endpoints_, id)); |
| 237 | 363 |
| 238 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 364 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 239 DCHECK(!endpoint->client()); | 365 DCHECK(!endpoint->client()); |
| 240 DCHECK(!endpoint->closed()); | 366 DCHECK(!endpoint->closed()); |
| 241 | 367 |
| 242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); | 368 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
| 243 endpoint->set_client(client); | 369 endpoint->set_client(client); |
| 244 | 370 |
| 245 if (endpoint->peer_closed()) | 371 if (endpoint->peer_closed()) |
| 246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 372 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 247 ProcessTasks(true); | 373 ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
| 374 |
| 375 return endpoint; |
| 248 } | 376 } |
| 249 | 377 |
| 250 void MultiplexRouter::DetachEndpointClient( | 378 void MultiplexRouter::DetachEndpointClient( |
| 251 const ScopedInterfaceEndpointHandle& handle) { | 379 const ScopedInterfaceEndpointHandle& handle) { |
| 252 const InterfaceId id = handle.id(); | 380 const InterfaceId id = handle.id(); |
| 253 | 381 |
| 254 DCHECK(IsValidInterfaceId(id)); | 382 DCHECK(IsValidInterfaceId(id)); |
| 255 | 383 |
| 256 base::AutoLock locker(lock_); | 384 base::AutoLock locker(lock_); |
| 257 DCHECK(ContainsKey(endpoints_, id)); | 385 DCHECK(ContainsKey(endpoints_, id)); |
| 258 | 386 |
| 259 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 387 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 260 DCHECK(endpoint->client()); | 388 DCHECK(endpoint->client()); |
| 261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 389 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 262 DCHECK(!endpoint->closed()); | 390 DCHECK(!endpoint->closed()); |
| 263 | 391 |
| 264 endpoint->set_task_runner(nullptr); | 392 endpoint->set_task_runner(nullptr); |
| 265 endpoint->set_client(nullptr); | 393 endpoint->set_client(nullptr); |
| 266 } | 394 endpoint->ClearSyncHandleWatcher(); |
| 267 | |
| 268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | |
| 269 Message* message) { | |
| 270 message->set_interface_id(handle.id()); | |
| 271 return connector_.Accept(message); | |
| 272 } | 395 } |
| 273 | 396 |
| 274 void MultiplexRouter::RaiseError() { | 397 void MultiplexRouter::RaiseError() { |
| 275 if (task_runner_->BelongsToCurrentThread()) { | 398 if (task_runner_->BelongsToCurrentThread()) { |
| 276 connector_.RaiseError(); | 399 connector_.RaiseError(); |
| 277 } else { | 400 } else { |
| 278 task_runner_->PostTask(FROM_HERE, | 401 task_runner_->PostTask(FROM_HERE, |
| 279 base::Bind(&MultiplexRouter::RaiseError, this)); | 402 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 280 } | 403 } |
| 281 } | 404 } |
| (...skipping 28 matching lines...) Expand all Loading... |
| 310 testing_mode_ = true; | 433 testing_mode_ = true; |
| 311 connector_.set_enforce_errors_from_incoming_receiver(false); | 434 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 312 } | 435 } |
| 313 | 436 |
| 314 bool MultiplexRouter::Accept(Message* message) { | 437 bool MultiplexRouter::Accept(Message* message) { |
| 315 DCHECK(thread_checker_.CalledOnValidThread()); | 438 DCHECK(thread_checker_.CalledOnValidThread()); |
| 316 | 439 |
| 317 scoped_refptr<MultiplexRouter> protector(this); | 440 scoped_refptr<MultiplexRouter> protector(this); |
| 318 base::AutoLock locker(lock_); | 441 base::AutoLock locker(lock_); |
| 319 | 442 |
| 320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); | 443 ClientCallBehavior client_call_behavior = |
| 444 connector_.during_sync_handle_watcher_callback() |
| 445 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 446 : ALLOW_DIRECT_CLIENT_CALLS; |
| 447 |
| 448 bool processed = |
| 449 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); |
| 321 | 450 |
| 322 if (!processed) { | 451 if (!processed) { |
| 323 // Either the task queue is not empty or we cannot process the message | 452 // Either the task queue is not empty or we cannot process the message |
| 324 // directly. In both cases, there is no need to call ProcessTasks(). | 453 // directly. In both cases, there is no need to call ProcessTasks(). |
| 325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | 454 tasks_.push_back(Task::CreateMessageTask(message)); |
| 455 Task* task = tasks_.back().get(); |
| 456 |
| 457 if (task->message->has_flag(kMessageIsSync)) { |
| 458 InterfaceId id = task->message->interface_id(); |
| 459 sync_message_tasks_[id].push_back(task); |
| 460 auto iter = endpoints_.find(id); |
| 461 if (iter != endpoints_.end()) |
| 462 iter->second->SignalSyncMessageReceived(); |
| 463 } |
| 326 } else if (!tasks_.empty()) { | 464 } else if (!tasks_.empty()) { |
| 327 // Processing the message may result in new tasks (for error notification) | 465 // Processing the message may result in new tasks (for error notification) |
| 328 // being added to the queue. In this case, we have to attempt to process the | 466 // being added to the queue. In this case, we have to attempt to process the |
| 329 // tasks. | 467 // tasks. |
| 330 ProcessTasks(false); | 468 ProcessTasks(client_call_behavior); |
| 331 } | 469 } |
| 332 | 470 |
| 333 // Always return true. If we see errors during message processing, we will | 471 // Always return true. If we see errors during message processing, we will |
| 334 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 472 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 335 return true; | 473 return true; |
| 336 } | 474 } |
| 337 | 475 |
| 338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 476 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 339 lock_.AssertAcquired(); | 477 lock_.AssertAcquired(); |
| 340 | 478 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 381 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 519 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 382 // because it may remove the corresponding value from the map. | 520 // because it may remove the corresponding value from the map. |
| 383 ++iter; | 521 ++iter; |
| 384 | 522 |
| 385 if (endpoint->client()) | 523 if (endpoint->client()) |
| 386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 524 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 387 | 525 |
| 388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 526 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 389 } | 527 } |
| 390 | 528 |
| 391 ProcessTasks(false); | 529 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| 530 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 531 : ALLOW_DIRECT_CLIENT_CALLS); |
| 392 } | 532 } |
| 393 | 533 |
| 394 void MultiplexRouter::ProcessTasks(bool force_async) { | 534 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
| 395 lock_.AssertAcquired(); | 535 lock_.AssertAcquired(); |
| 396 | 536 |
| 397 if (posted_to_process_tasks_) | 537 if (posted_to_process_tasks_) |
| 398 return; | 538 return; |
| 399 | 539 |
| 400 while (!tasks_.empty()) { | 540 while (!tasks_.empty()) { |
| 401 scoped_ptr<Task> task(std::move(tasks_.front())); | 541 scoped_ptr<Task> task(std::move(tasks_.front())); |
| 402 tasks_.pop_front(); | 542 tasks_.pop_front(); |
| 403 | 543 |
| 544 InterfaceId id = kInvalidInterfaceId; |
| 545 if (task->IsMessageTask() && task->message && |
| 546 task->message->has_flag(kMessageIsSync)) { |
| 547 InterfaceId id = task->message->interface_id(); |
| 548 auto& sync_message_queue = sync_message_tasks_[id]; |
| 549 DCHECK_EQ(task.get(), sync_message_queue.front()); |
| 550 sync_message_queue.pop_front(); |
| 551 } |
| 552 |
| 404 bool processed = | 553 bool processed = |
| 405 task->IsNotifyErrorTask() | 554 task->IsNotifyErrorTask() |
| 406 ? ProcessNotifyErrorTask(task.get(), force_async) | 555 ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
| 407 : ProcessIncomingMessage(task->message.get(), force_async); | 556 : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
| 408 | 557 |
| 409 if (!processed) { | 558 if (!processed) { |
| 410 tasks_.push_front(std::move(task)); | 559 tasks_.push_front(std::move(task)); |
| 560 if (IsValidInterfaceId(id)) { |
| 561 auto& sync_message_queue = sync_message_tasks_[id]; |
| 562 sync_message_queue.push_front(task.get()); |
| 563 } |
| 411 break; | 564 break; |
| 565 } else { |
| 566 if (IsValidInterfaceId(id)) { |
| 567 auto iter = sync_message_tasks_.find(id); |
| 568 if (iter != sync_message_tasks_.end() && iter->second.empty()) |
| 569 sync_message_tasks_.erase(iter); |
| 570 } |
| 412 } | 571 } |
| 413 } | 572 } |
| 414 } | 573 } |
| 415 | 574 |
| 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { | 575 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| 576 lock_.AssertAcquired(); |
| 577 |
| 578 auto iter = sync_message_tasks_.find(id); |
| 579 if (iter == sync_message_tasks_.end()) |
| 580 return false; |
| 581 |
| 582 MultiplexRouter::Task* task = iter->second.front(); |
| 583 iter->second.pop_front(); |
| 584 |
| 585 DCHECK(task->IsMessageTask()); |
| 586 scoped_ptr<Message> message(std::move(task->message)); |
| 587 |
| 588 // Note: after this call, |task|, |tasks| and |iter| may be invalidated. |
| 589 bool processed = ProcessIncomingMessage( |
| 590 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); |
| 591 DCHECK(processed); |
| 592 |
| 593 iter = sync_message_tasks_.find(id); |
| 594 return iter != sync_message_tasks_.end() && !iter->second.empty(); |
| 595 } |
| 596 |
| 597 bool MultiplexRouter::ProcessNotifyErrorTask( |
| 598 Task* task, |
| 599 ClientCallBehavior client_call_behavior) { |
| 417 lock_.AssertAcquired(); | 600 lock_.AssertAcquired(); |
| 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 601 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 419 if (!endpoint->client()) | 602 if (!endpoint->client()) |
| 420 return true; | 603 return true; |
| 421 | 604 |
| 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 605 if (!endpoint->task_runner()->BelongsToCurrentThread() || |
| 606 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { |
| 423 MaybePostToProcessTasks(endpoint->task_runner()); | 607 MaybePostToProcessTasks(endpoint->task_runner()); |
| 424 return false; | 608 return false; |
| 425 } | 609 } |
| 426 | 610 |
| 427 InterfaceEndpointClient* client = endpoint->client(); | 611 InterfaceEndpointClient* client = endpoint->client(); |
| 428 { | 612 { |
| 429 // We must unlock before calling into |client| because it may call this | 613 // We must unlock before calling into |client| because it may call this |
| 430 // object within NotifyError(). Holding the lock will lead to deadlock. | 614 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 431 // | 615 // |
| 432 // It is safe to call into |client| without the lock. Because |client| is | 616 // It is safe to call into |client| without the lock. Because |client| is |
| 433 // always accessed on the same thread, including DetachEndpointClient(). | 617 // always accessed on the same thread, including DetachEndpointClient(). |
| 434 base::AutoUnlock unlocker(lock_); | 618 base::AutoUnlock unlocker(lock_); |
| 435 client->NotifyError(); | 619 client->NotifyError(); |
| 436 } | 620 } |
| 437 return true; | 621 return true; |
| 438 } | 622 } |
| 439 | 623 |
| 440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, | 624 bool MultiplexRouter::ProcessIncomingMessage( |
| 441 bool force_async) { | 625 Message* message, |
| 626 ClientCallBehavior client_call_behavior) { |
| 442 lock_.AssertAcquired(); | 627 lock_.AssertAcquired(); |
| 628 |
| 629 if (!message) { |
| 630 // This is a sync message and has been processed during sync handle |
| 631 // watching. |
| 632 return true; |
| 633 } |
| 634 |
| 443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 635 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 444 if (!control_message_handler_.Accept(message)) | 636 if (!control_message_handler_.Accept(message)) |
| 445 RaiseErrorInNonTestingMode(); | 637 RaiseErrorInNonTestingMode(); |
| 446 return true; | 638 return true; |
| 447 } | 639 } |
| 448 | 640 |
| 449 InterfaceId id = message->interface_id(); | 641 InterfaceId id = message->interface_id(); |
| 450 DCHECK(IsValidInterfaceId(id)); | 642 DCHECK(IsValidInterfaceId(id)); |
| 451 | 643 |
| 452 bool inserted = false; | 644 bool inserted = false; |
| 453 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 645 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 454 if (inserted) { | 646 if (inserted) { |
| 455 DCHECK(!IsMasterInterfaceId(id)); | 647 DCHECK(!IsMasterInterfaceId(id)); |
| 456 | 648 |
| 457 // Currently, it is legitimate to receive messages for an endpoint | 649 // Currently, it is legitimate to receive messages for an endpoint |
| 458 // that is not registered. For example, the endpoint is transferred in | 650 // that is not registered. For example, the endpoint is transferred in |
| 459 // a message that is discarded. Once we add support to specify all | 651 // a message that is discarded. Once we add support to specify all |
| 460 // enclosing endpoints in message header, we should be able to remove | 652 // enclosing endpoints in message header, we should be able to remove |
| 461 // this. | 653 // this. |
| 462 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 654 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 463 | 655 |
| 464 control_message_proxy_.NotifyPeerEndpointClosed(id); | 656 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 465 return true; | 657 return true; |
| 466 } | 658 } |
| 467 | 659 |
| 468 if (endpoint->closed()) | 660 if (endpoint->closed()) |
| 469 return true; | 661 return true; |
| 470 | 662 |
| 471 if (!endpoint->client()) { | 663 if (!endpoint->client()) |
| 472 // We need to wait until a client is attached in order to dispatch further | |
| 473 // messages. | |
| 474 return false; | 664 return false; |
| 475 } | |
| 476 | 665 |
| 477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 666 bool can_direct_call = |
| 667 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || |
| 668 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && |
| 669 message->has_flag(kMessageIsSync)); |
| 670 |
| 671 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { |
| 478 MaybePostToProcessTasks(endpoint->task_runner()); | 672 MaybePostToProcessTasks(endpoint->task_runner()); |
| 479 return false; | 673 return false; |
| 480 } | 674 } |
| 481 | 675 |
| 482 InterfaceEndpointClient* client = endpoint->client(); | 676 InterfaceEndpointClient* client = endpoint->client(); |
| 483 bool result = false; | 677 bool result = false; |
| 484 { | 678 { |
| 485 // We must unlock before calling into |client| because it may call this | 679 // We must unlock before calling into |client| because it may call this |
| 486 // object within HandleIncomingMessage(). Holding the lock will lead to | 680 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 487 // deadlock. | 681 // deadlock. |
| (...skipping 18 matching lines...) Expand all Loading... |
| 506 posted_to_process_tasks_ = true; | 700 posted_to_process_tasks_ = true; |
| 507 task_runner->PostTask( | 701 task_runner->PostTask( |
| 508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 702 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 509 } | 703 } |
| 510 | 704 |
| 511 void MultiplexRouter::LockAndCallProcessTasks() { | 705 void MultiplexRouter::LockAndCallProcessTasks() { |
| 512 // There is no need to hold a ref to this class in this case because this is | 706 // There is no need to hold a ref to this class in this case because this is |
| 513 // always called using base::Bind(), which holds a ref. | 707 // always called using base::Bind(), which holds a ref. |
| 514 base::AutoLock locker(lock_); | 708 base::AutoLock locker(lock_); |
| 515 posted_to_process_tasks_ = false; | 709 posted_to_process_tasks_ = false; |
| 516 ProcessTasks(false); | 710 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
| 517 } | 711 } |
| 518 | 712 |
| 519 void MultiplexRouter::UpdateEndpointStateMayRemove( | 713 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 520 InterfaceEndpoint* endpoint, | 714 InterfaceEndpoint* endpoint, |
| 521 EndpointStateUpdateType type) { | 715 EndpointStateUpdateType type) { |
| 522 switch (type) { | 716 switch (type) { |
| 523 case ENDPOINT_CLOSED: | 717 case ENDPOINT_CLOSED: |
| 524 endpoint->set_closed(); | 718 endpoint->set_closed(); |
| 525 break; | 719 break; |
| 526 case PEER_ENDPOINT_CLOSED: | 720 case PEER_ENDPOINT_CLOSED: |
| 527 endpoint->set_peer_closed(); | 721 endpoint->set_peer_closed(); |
| 722 endpoint->SignalSyncMessageReceived(); |
| 528 break; | 723 break; |
| 529 } | 724 } |
| 530 if (endpoint->closed() && endpoint->peer_closed()) | 725 if (endpoint->closed() && endpoint->peer_closed()) |
| 531 endpoints_.erase(endpoint->id()); | 726 endpoints_.erase(endpoint->id()); |
| 532 } | 727 } |
| 533 | 728 |
| 534 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 729 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 535 lock_.AssertAcquired(); | 730 lock_.AssertAcquired(); |
| 536 if (!testing_mode_) | 731 if (!testing_mode_) |
| 537 RaiseError(); | 732 RaiseError(); |
| (...skipping 16 matching lines...) Expand all Loading... |
| 554 *inserted = true; | 749 *inserted = true; |
| 555 } else { | 750 } else { |
| 556 endpoint = iter->second.get(); | 751 endpoint = iter->second.get(); |
| 557 } | 752 } |
| 558 | 753 |
| 559 return endpoint; | 754 return endpoint; |
| 560 } | 755 } |
| 561 | 756 |
| 562 } // namespace internal | 757 } // namespace internal |
| 563 } // namespace mojo | 758 } // namespace mojo |
| OLD | NEW |