Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Side by Side Diff: mojo/edk/system/channel_win.cc

Issue 1685183004: Bootstrap Mojo IPC independent of Chrome IPC (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: and fix posix Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/channel.h" 5 #include "mojo/edk/system/channel.h"
6 6
7 #include <windows.h> 7 #include <windows.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <deque> 10 #include <deque>
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 public base::MessageLoop::DestructionObserver, 70 public base::MessageLoop::DestructionObserver,
71 public base::MessageLoopForIO::IOHandler { 71 public base::MessageLoopForIO::IOHandler {
72 public: 72 public:
73 ChannelWin(Delegate* delegate, 73 ChannelWin(Delegate* delegate,
74 ScopedPlatformHandle handle, 74 ScopedPlatformHandle handle,
75 scoped_refptr<base::TaskRunner> io_task_runner) 75 scoped_refptr<base::TaskRunner> io_task_runner)
76 : Channel(delegate), 76 : Channel(delegate),
77 self_(this), 77 self_(this),
78 handle_(std::move(handle)), 78 handle_(std::move(handle)),
79 io_task_runner_(io_task_runner) { 79 io_task_runner_(io_task_runner) {
80 memset(&connect_context_, 0, sizeof(connect_context_));
81 connect_context_.handler = this;
82
80 memset(&read_context_, 0, sizeof(read_context_)); 83 memset(&read_context_, 0, sizeof(read_context_));
81 read_context_.handler = this; 84 read_context_.handler = this;
82 85
83 memset(&write_context_, 0, sizeof(write_context_)); 86 memset(&write_context_, 0, sizeof(write_context_));
84 write_context_.handler = this; 87 write_context_.handler = this;
85 } 88 }
86 89
87 void Start() override { 90 void Start() override {
88 io_task_runner_->PostTask( 91 io_task_runner_->PostTask(
89 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); 92 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this));
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 137
135 private: 138 private:
136 // May run on any thread. 139 // May run on any thread.
137 ~ChannelWin() override {} 140 ~ChannelWin() override {}
138 141
139 void StartOnIOThread() { 142 void StartOnIOThread() {
140 base::MessageLoop::current()->AddDestructionObserver(this); 143 base::MessageLoop::current()->AddDestructionObserver(this);
141 base::MessageLoopForIO::current()->RegisterIOHandler( 144 base::MessageLoopForIO::current()->RegisterIOHandler(
142 handle_.get().handle, this); 145 handle_.get().handle, this);
143 146
144 // Now that we have registered our IOHandler, we can start writing. 147 // We may be starting the channel with a disconnected server pipe, e.g.,
148 // when not using PlatformChannelPair. Make sure the pipe is connected
149 // before we attempt to do any reading or writing.
150 BOOL result = ConnectNamedPipe(handle_.get().handle,
151 &connect_context_.overlapped);
152 if (!result && (GetLastError() == ERROR_PIPE_CONNECTED ||
153 GetLastError() == ERROR_INVALID_FUNCTION)) {
154 // If the pipe's already connected, we can start using it immediately.
155 // Note that ERROR_INVALID_FUNCTION is returned if we call this on a
156 // client pipe. If this is a client pipe we know it's already connected.
157 OnPipeConnected();
158 } else if (result || GetLastError() == ERROR_IO_PENDING) {
159 // Balanced in OnIOCompleted.
160 AddRef();
161 } else {
162 OnError();
163 }
164 }
165
166 void OnPipeConnected() {
167 // Now that we are connected we can start writing and reading.
145 { 168 {
146 base::AutoLock lock(write_lock_); 169 base::AutoLock lock(write_lock_);
147 if (delay_writes_) { 170 if (delay_writes_) {
148 delay_writes_ = false; 171 delay_writes_ = false;
149 WriteNextNoLock(); 172 WriteNextNoLock();
150 } 173 }
151 } 174 }
152 175
153 // Keep this alive in case we synchronously run shutdown. 176 // Keep this alive in case we synchronously run shutdown.
154 scoped_refptr<ChannelWin> keep_alive(this); 177 scoped_refptr<ChannelWin> keep_alive(this);
(...skipping 18 matching lines...) Expand all
173 } 196 }
174 197
175 // base::MessageLoop::IOHandler: 198 // base::MessageLoop::IOHandler:
176 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, 199 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
177 DWORD bytes_transfered, 200 DWORD bytes_transfered,
178 DWORD error) override { 201 DWORD error) override {
179 if (error != ERROR_SUCCESS) { 202 if (error != ERROR_SUCCESS) {
180 OnError(); 203 OnError();
181 } else if (context == &read_context_) { 204 } else if (context == &read_context_) {
182 OnReadDone(static_cast<size_t>(bytes_transfered)); 205 OnReadDone(static_cast<size_t>(bytes_transfered));
206 } else if (context == &write_context_) {
207 OnWriteDone(static_cast<size_t>(bytes_transfered));
183 } else { 208 } else {
184 CHECK(context == &write_context_); 209 CHECK(context == &connect_context_);
185 OnWriteDone(static_cast<size_t>(bytes_transfered)); 210 OnPipeConnected();
186 } 211 }
187 Release(); // Balancing reference taken after ReadFile / WriteFile. 212 Release(); // Balancing reference taken after ReadFile / WriteFile.
188 } 213 }
189 214
190 void OnReadDone(size_t bytes_read) { 215 void OnReadDone(size_t bytes_read) {
191 if (bytes_read > 0) { 216 if (bytes_read > 0) {
192 size_t next_read_size = 0; 217 size_t next_read_size = 0;
193 if (OnReadComplete(bytes_read, &next_read_size)) { 218 if (OnReadComplete(bytes_read, &next_read_size)) {
194 ReadMore(next_read_size); 219 ReadMore(next_read_size);
195 } else { 220 } else {
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
269 return true; 294 return true;
270 return WriteNoLock(outgoing_messages_.front()); 295 return WriteNoLock(outgoing_messages_.front());
271 } 296 }
272 297
273 // Keeps the Channel alive at least until explicit shutdown on the IO thread. 298 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
274 scoped_refptr<Channel> self_; 299 scoped_refptr<Channel> self_;
275 300
276 ScopedPlatformHandle handle_; 301 ScopedPlatformHandle handle_;
277 scoped_refptr<base::TaskRunner> io_task_runner_; 302 scoped_refptr<base::TaskRunner> io_task_runner_;
278 303
304 base::MessageLoopForIO::IOContext connect_context_;
279 base::MessageLoopForIO::IOContext read_context_; 305 base::MessageLoopForIO::IOContext read_context_;
280 base::MessageLoopForIO::IOContext write_context_; 306 base::MessageLoopForIO::IOContext write_context_;
281 307
282 // Protects |reject_writes_| and |outgoing_messages_|. 308 // Protects |reject_writes_| and |outgoing_messages_|.
283 base::Lock write_lock_; 309 base::Lock write_lock_;
284 310
285 bool delay_writes_ = true; 311 bool delay_writes_ = true;
286 312
287 bool reject_writes_ = false; 313 bool reject_writes_ = false;
288 std::deque<MessageView> outgoing_messages_; 314 std::deque<MessageView> outgoing_messages_;
289 315
290 DISALLOW_COPY_AND_ASSIGN(ChannelWin); 316 DISALLOW_COPY_AND_ASSIGN(ChannelWin);
291 }; 317 };
292 318
293 } // namespace 319 } // namespace
294 320
295 // static 321 // static
296 scoped_refptr<Channel> Channel::Create( 322 scoped_refptr<Channel> Channel::Create(
297 Delegate* delegate, 323 Delegate* delegate,
298 ScopedPlatformHandle platform_handle, 324 ScopedPlatformHandle platform_handle,
299 scoped_refptr<base::TaskRunner> io_task_runner) { 325 scoped_refptr<base::TaskRunner> io_task_runner) {
300 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); 326 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner);
301 } 327 }
302 328
303 } // namespace edk 329 } // namespace edk
304 } // namespace mojo 330 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698