Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: mojo/edk/system/master_connection_manager.cc

Issue 1526063003: EDK: Add TaskRunner and PlatformHandleWatcher to RawChannel. (Closed) Base URL: https://github.com/domokit/mojo.git@channel_watcher
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/master_connection_manager.h ('k') | mojo/edk/system/raw_channel.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/master_connection_manager.h" 5 #include "mojo/edk/system/master_connection_manager.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <unordered_map> 8 #include <unordered_map>
9 #include <utility> 9 #include <utility>
10 #include <vector> 10 #include <vector>
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
75 // |MasterConnectionManager::Helper| is not thread-safe, and must only be used 75 // |MasterConnectionManager::Helper| is not thread-safe, and must only be used
76 // on its |owner_|'s private thread. 76 // on its |owner_|'s private thread.
77 class MasterConnectionManager::Helper final : public RawChannel::Delegate { 77 class MasterConnectionManager::Helper final : public RawChannel::Delegate {
78 public: 78 public:
79 Helper(MasterConnectionManager* owner, 79 Helper(MasterConnectionManager* owner,
80 ProcessIdentifier process_identifier, 80 ProcessIdentifier process_identifier,
81 embedder::SlaveInfo slave_info, 81 embedder::SlaveInfo slave_info,
82 ScopedPlatformHandle platform_handle); 82 ScopedPlatformHandle platform_handle);
83 ~Helper() override; 83 ~Helper() override;
84 84
85 void Init(); 85 void Init(RefPtr<TaskRunner>&& task_runner,
86 PlatformHandleWatcher* platform_handle_watcher);
86 embedder::SlaveInfo Shutdown(); 87 embedder::SlaveInfo Shutdown();
87 88
88 private: 89 private:
89 // |RawChannel::Delegate| methods: 90 // |RawChannel::Delegate| methods:
90 void OnReadMessage(const MessageInTransit::View& message_view, 91 void OnReadMessage(const MessageInTransit::View& message_view,
91 std::unique_ptr<std::vector<ScopedPlatformHandle>> 92 std::unique_ptr<std::vector<ScopedPlatformHandle>>
92 platform_handles) override; 93 platform_handles) override;
93 void OnError(Error error) override; 94 void OnError(Error error) override;
94 95
95 // Handles an error that's fatal to this object. Note that this probably 96 // Handles an error that's fatal to this object. Note that this probably
(...skipping 15 matching lines...) Expand all
111 ScopedPlatformHandle platform_handle) 112 ScopedPlatformHandle platform_handle)
112 : owner_(owner), 113 : owner_(owner),
113 process_identifier_(process_identifier), 114 process_identifier_(process_identifier),
114 slave_info_(slave_info), 115 slave_info_(slave_info),
115 raw_channel_(RawChannel::Create(platform_handle.Pass())) {} 116 raw_channel_(RawChannel::Create(platform_handle.Pass())) {}
116 117
117 MasterConnectionManager::Helper::~Helper() { 118 MasterConnectionManager::Helper::~Helper() {
118 DCHECK(!raw_channel_); 119 DCHECK(!raw_channel_);
119 } 120 }
120 121
121 void MasterConnectionManager::Helper::Init() { 122 void MasterConnectionManager::Helper::Init(
122 raw_channel_->Init(this); 123 RefPtr<TaskRunner>&& task_runner,
124 PlatformHandleWatcher* platform_handle_watcher) {
125 raw_channel_->Init(std::move(task_runner), platform_handle_watcher, this);
123 } 126 }
124 127
125 embedder::SlaveInfo MasterConnectionManager::Helper::Shutdown() { 128 embedder::SlaveInfo MasterConnectionManager::Helper::Shutdown() {
126 raw_channel_->Shutdown(); 129 raw_channel_->Shutdown();
127 raw_channel_.reset(); 130 raw_channel_.reset();
128 return slave_info_; 131 return slave_info_;
129 } 132 }
130 133
131 void MasterConnectionManager::Helper::OnReadMessage( 134 void MasterConnectionManager::Helper::OnReadMessage(
132 const MessageInTransit::View& message_view, 135 const MessageInTransit::View& message_view,
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
322 process_connections_; // "Owns" any valid platform handles. 325 process_connections_; // "Owns" any valid platform handles.
323 326
324 MOJO_DISALLOW_COPY_AND_ASSIGN(ProcessConnections); 327 MOJO_DISALLOW_COPY_AND_ASSIGN(ProcessConnections);
325 }; 328 };
326 329
327 // MasterConnectionManager ----------------------------------------------------- 330 // MasterConnectionManager -----------------------------------------------------
328 331
329 MasterConnectionManager::MasterConnectionManager( 332 MasterConnectionManager::MasterConnectionManager(
330 embedder::PlatformSupport* platform_support) 333 embedder::PlatformSupport* platform_support)
331 : ConnectionManager(platform_support), 334 : ConnectionManager(platform_support),
332 master_process_delegate_(), 335 master_process_delegate_(nullptr),
336 private_thread_platform_handle_watcher_(nullptr),
333 next_process_identifier_(kFirstSlaveProcessIdentifier) { 337 next_process_identifier_(kFirstSlaveProcessIdentifier) {
334 connections_[kMasterProcessIdentifier] = new ProcessConnections(); 338 connections_[kMasterProcessIdentifier] = new ProcessConnections();
335 } 339 }
336 340
337 MasterConnectionManager::~MasterConnectionManager() { 341 MasterConnectionManager::~MasterConnectionManager() {
338 DCHECK(!delegate_thread_task_runner_); 342 DCHECK(!delegate_thread_task_runner_);
339 DCHECK(!master_process_delegate_); 343 DCHECK(!master_process_delegate_);
340 DCHECK(!private_thread_); 344 DCHECK(!private_thread_);
341 DCHECK(helpers_.empty()); 345 DCHECK(helpers_.empty());
342 DCHECK(pending_connects_.empty()); 346 DCHECK(pending_connects_.empty());
343 } 347 }
344 348
345 void MasterConnectionManager::Init( 349 void MasterConnectionManager::Init(
346 RefPtr<TaskRunner>&& delegate_thread_task_runner, 350 RefPtr<TaskRunner>&& delegate_thread_task_runner,
347 embedder::MasterProcessDelegate* master_process_delegate) { 351 embedder::MasterProcessDelegate* master_process_delegate) {
348 DCHECK(delegate_thread_task_runner); 352 DCHECK(delegate_thread_task_runner);
349 DCHECK(master_process_delegate); 353 DCHECK(master_process_delegate);
350 DCHECK(!delegate_thread_task_runner_); 354 DCHECK(!delegate_thread_task_runner_);
351 DCHECK(!master_process_delegate_); 355 DCHECK(!master_process_delegate_);
352 DCHECK(!private_thread_); 356 DCHECK(!private_thread_);
353 357
354 delegate_thread_task_runner_ = std::move(delegate_thread_task_runner); 358 delegate_thread_task_runner_ = std::move(delegate_thread_task_runner);
355 master_process_delegate_ = master_process_delegate; 359 master_process_delegate_ = master_process_delegate;
356 // TODO(vtl): We'll need to plumb this in further.
357 PlatformHandleWatcher* platform_handle_watcher = nullptr;
358 private_thread_ = platform::CreateAndStartIOThread( 360 private_thread_ = platform::CreateAndStartIOThread(
359 &private_thread_task_runner_, &platform_handle_watcher); 361 &private_thread_task_runner_, &private_thread_platform_handle_watcher_);
360 } 362 }
361 363
362 ProcessIdentifier MasterConnectionManager::AddSlave( 364 ProcessIdentifier MasterConnectionManager::AddSlave(
363 embedder::SlaveInfo slave_info, 365 embedder::SlaveInfo slave_info,
364 ScopedPlatformHandle platform_handle) { 366 ScopedPlatformHandle platform_handle) {
365 // We don't really care if |slave_info| is non-null or not. 367 // We don't really care if |slave_info| is non-null or not.
366 DCHECK(platform_handle.is_valid()); 368 DCHECK(platform_handle.is_valid());
367 AssertNotOnPrivateThread(); 369 AssertNotOnPrivateThread();
368 370
369 ProcessIdentifier slave_process_identifier; 371 ProcessIdentifier slave_process_identifier;
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
414 AssertNotOnPrivateThread(); 416 AssertNotOnPrivateThread();
415 DCHECK(master_process_delegate_); 417 DCHECK(master_process_delegate_);
416 DCHECK(private_thread_); 418 DCHECK(private_thread_);
417 419
418 // The |Stop()| will actually finish all posted tasks. 420 // The |Stop()| will actually finish all posted tasks.
419 private_thread_task_runner_->PostTask( 421 private_thread_task_runner_->PostTask(
420 [this]() { ShutdownOnPrivateThread(); }); 422 [this]() { ShutdownOnPrivateThread(); });
421 private_thread_->Stop(); 423 private_thread_->Stop();
422 private_thread_.reset(); 424 private_thread_.reset();
423 private_thread_task_runner_ = nullptr; 425 private_thread_task_runner_ = nullptr;
426 private_thread_platform_handle_watcher_ = nullptr;
424 DCHECK(helpers_.empty()); 427 DCHECK(helpers_.empty());
425 DCHECK(pending_connects_.empty()); 428 DCHECK(pending_connects_.empty());
426 master_process_delegate_ = nullptr; 429 master_process_delegate_ = nullptr;
427 delegate_thread_task_runner_ = nullptr; 430 delegate_thread_task_runner_ = nullptr;
428 } 431 }
429 432
430 bool MasterConnectionManager::AllowConnect( 433 bool MasterConnectionManager::AllowConnect(
431 const ConnectionIdentifier& connection_id) { 434 const ConnectionIdentifier& connection_id) {
432 AssertNotOnPrivateThread(); 435 AssertNotOnPrivateThread();
433 return AllowConnectImpl(kMasterProcessIdentifier, connection_id); 436 return AllowConnectImpl(kMasterProcessIdentifier, connection_id);
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
682 embedder::SlaveInfo slave_info, 685 embedder::SlaveInfo slave_info,
683 ScopedPlatformHandle platform_handle, 686 ScopedPlatformHandle platform_handle,
684 ProcessIdentifier slave_process_identifier, 687 ProcessIdentifier slave_process_identifier,
685 AutoResetWaitableEvent* event) { 688 AutoResetWaitableEvent* event) {
686 DCHECK(platform_handle.is_valid()); 689 DCHECK(platform_handle.is_valid());
687 DCHECK(event); 690 DCHECK(event);
688 AssertOnPrivateThread(); 691 AssertOnPrivateThread();
689 692
690 std::unique_ptr<Helper> helper(new Helper( 693 std::unique_ptr<Helper> helper(new Helper(
691 this, slave_process_identifier, slave_info, platform_handle.Pass())); 694 this, slave_process_identifier, slave_info, platform_handle.Pass()));
692 helper->Init(); 695 helper->Init(private_thread_task_runner_.Clone(),
696 private_thread_platform_handle_watcher_);
693 697
694 DCHECK(helpers_.find(slave_process_identifier) == helpers_.end()); 698 DCHECK(helpers_.find(slave_process_identifier) == helpers_.end());
695 helpers_[slave_process_identifier] = helper.release(); 699 helpers_[slave_process_identifier] = helper.release();
696 700
697 DVLOG(1) << "Added slave process identifier " << slave_process_identifier; 701 DVLOG(1) << "Added slave process identifier " << slave_process_identifier;
698 event->Signal(); 702 event->Signal();
699 } 703 }
700 704
701 void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) { 705 void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) {
702 DCHECK_NE(process_identifier, kInvalidProcessIdentifier); 706 DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
746 DCHECK(!private_thread_task_runner_->RunsTasksOnCurrentThread()); 750 DCHECK(!private_thread_task_runner_->RunsTasksOnCurrentThread());
747 } 751 }
748 752
749 void MasterConnectionManager::AssertOnPrivateThread() const { 753 void MasterConnectionManager::AssertOnPrivateThread() const {
750 // This should only be called after |Init()| and before |Shutdown()|. 754 // This should only be called after |Init()| and before |Shutdown()|.
751 DCHECK(private_thread_task_runner_->RunsTasksOnCurrentThread()); 755 DCHECK(private_thread_task_runner_->RunsTasksOnCurrentThread());
752 } 756 }
753 757
754 } // namespace system 758 } // namespace system
755 } // namespace mojo 759 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/master_connection_manager.h ('k') | mojo/edk/system/raw_channel.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698