OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/channel.h" | 5 #include "mojo/edk/system/channel.h" |
6 | 6 |
7 #include <windows.h> | 7 #include <windows.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <deque> | 10 #include <deque> |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
70 public base::MessageLoop::DestructionObserver, | 70 public base::MessageLoop::DestructionObserver, |
71 public base::MessageLoopForIO::IOHandler { | 71 public base::MessageLoopForIO::IOHandler { |
72 public: | 72 public: |
73 ChannelWin(Delegate* delegate, | 73 ChannelWin(Delegate* delegate, |
74 ScopedPlatformHandle handle, | 74 ScopedPlatformHandle handle, |
75 scoped_refptr<base::TaskRunner> io_task_runner) | 75 scoped_refptr<base::TaskRunner> io_task_runner) |
76 : Channel(delegate), | 76 : Channel(delegate), |
77 self_(this), | 77 self_(this), |
78 handle_(std::move(handle)), | 78 handle_(std::move(handle)), |
79 io_task_runner_(io_task_runner) { | 79 io_task_runner_(io_task_runner) { |
| 80 memset(&connect_context_, 0, sizeof(connect_context_)); |
| 81 connect_context_.handler = this; |
| 82 |
80 memset(&read_context_, 0, sizeof(read_context_)); | 83 memset(&read_context_, 0, sizeof(read_context_)); |
81 read_context_.handler = this; | 84 read_context_.handler = this; |
82 | 85 |
83 memset(&write_context_, 0, sizeof(write_context_)); | 86 memset(&write_context_, 0, sizeof(write_context_)); |
84 write_context_.handler = this; | 87 write_context_.handler = this; |
85 } | 88 } |
86 | 89 |
87 void Start() override { | 90 void Start() override { |
88 io_task_runner_->PostTask( | 91 io_task_runner_->PostTask( |
89 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); | 92 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
134 | 137 |
135 private: | 138 private: |
136 // May run on any thread. | 139 // May run on any thread. |
137 ~ChannelWin() override {} | 140 ~ChannelWin() override {} |
138 | 141 |
139 void StartOnIOThread() { | 142 void StartOnIOThread() { |
140 base::MessageLoop::current()->AddDestructionObserver(this); | 143 base::MessageLoop::current()->AddDestructionObserver(this); |
141 base::MessageLoopForIO::current()->RegisterIOHandler( | 144 base::MessageLoopForIO::current()->RegisterIOHandler( |
142 handle_.get().handle, this); | 145 handle_.get().handle, this); |
143 | 146 |
144 // Now that we have registered our IOHandler, we can start writing. | 147 // We may be starting the channel with a disconnected server pipe, e.g., |
| 148 // when not using PlatformChannelPair. Make sure the pipe is connected |
| 149 // before we attempt to do any reading or writing. |
| 150 BOOL result = ConnectNamedPipe(handle_.get().handle, |
| 151 &connect_context_.overlapped); |
| 152 if (!result && (GetLastError() == ERROR_PIPE_CONNECTED || |
| 153 GetLastError() == ERROR_INVALID_FUNCTION)) { |
| 154 // If the pipe's already connected, we can start using it immediately. |
| 155 // Note that ERROR_INVALID_FUNCTION is returned if we call this on a |
| 156 // client pipe. If this is a client pipe we know it's already connected. |
| 157 OnPipeConnected(); |
| 158 } else if (result || GetLastError() == ERROR_IO_PENDING) { |
| 159 // Balanced in OnIOCompleted. |
| 160 AddRef(); |
| 161 } else { |
| 162 OnError(); |
| 163 } |
| 164 } |
| 165 |
| 166 void OnPipeConnected() { |
| 167 // Now that we are connected we can start writing and reading. |
145 { | 168 { |
146 base::AutoLock lock(write_lock_); | 169 base::AutoLock lock(write_lock_); |
147 if (delay_writes_) { | 170 if (delay_writes_) { |
148 delay_writes_ = false; | 171 delay_writes_ = false; |
149 WriteNextNoLock(); | 172 WriteNextNoLock(); |
150 } | 173 } |
151 } | 174 } |
152 | 175 |
153 // Keep this alive in case we synchronously run shutdown. | 176 // Keep this alive in case we synchronously run shutdown. |
154 scoped_refptr<ChannelWin> keep_alive(this); | 177 scoped_refptr<ChannelWin> keep_alive(this); |
(...skipping 18 matching lines...) Expand all Loading... |
173 } | 196 } |
174 | 197 |
175 // base::MessageLoop::IOHandler: | 198 // base::MessageLoop::IOHandler: |
176 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | 199 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, |
177 DWORD bytes_transfered, | 200 DWORD bytes_transfered, |
178 DWORD error) override { | 201 DWORD error) override { |
179 if (error != ERROR_SUCCESS) { | 202 if (error != ERROR_SUCCESS) { |
180 OnError(); | 203 OnError(); |
181 } else if (context == &read_context_) { | 204 } else if (context == &read_context_) { |
182 OnReadDone(static_cast<size_t>(bytes_transfered)); | 205 OnReadDone(static_cast<size_t>(bytes_transfered)); |
| 206 } else if (context == &write_context_) { |
| 207 OnWriteDone(static_cast<size_t>(bytes_transfered)); |
183 } else { | 208 } else { |
184 CHECK(context == &write_context_); | 209 CHECK(context == &connect_context_); |
185 OnWriteDone(static_cast<size_t>(bytes_transfered)); | 210 OnPipeConnected(); |
186 } | 211 } |
187 Release(); // Balancing reference taken after ReadFile / WriteFile. | 212 Release(); // Balancing reference taken after ReadFile / WriteFile. |
188 } | 213 } |
189 | 214 |
190 void OnReadDone(size_t bytes_read) { | 215 void OnReadDone(size_t bytes_read) { |
191 if (bytes_read > 0) { | 216 if (bytes_read > 0) { |
192 size_t next_read_size = 0; | 217 size_t next_read_size = 0; |
193 if (OnReadComplete(bytes_read, &next_read_size)) { | 218 if (OnReadComplete(bytes_read, &next_read_size)) { |
194 ReadMore(next_read_size); | 219 ReadMore(next_read_size); |
195 } else { | 220 } else { |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
269 return true; | 294 return true; |
270 return WriteNoLock(outgoing_messages_.front()); | 295 return WriteNoLock(outgoing_messages_.front()); |
271 } | 296 } |
272 | 297 |
273 // Keeps the Channel alive at least until explicit shutdown on the IO thread. | 298 // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
274 scoped_refptr<Channel> self_; | 299 scoped_refptr<Channel> self_; |
275 | 300 |
276 ScopedPlatformHandle handle_; | 301 ScopedPlatformHandle handle_; |
277 scoped_refptr<base::TaskRunner> io_task_runner_; | 302 scoped_refptr<base::TaskRunner> io_task_runner_; |
278 | 303 |
| 304 base::MessageLoopForIO::IOContext connect_context_; |
279 base::MessageLoopForIO::IOContext read_context_; | 305 base::MessageLoopForIO::IOContext read_context_; |
280 base::MessageLoopForIO::IOContext write_context_; | 306 base::MessageLoopForIO::IOContext write_context_; |
281 | 307 |
282 // Protects |reject_writes_| and |outgoing_messages_|. | 308 // Protects |reject_writes_| and |outgoing_messages_|. |
283 base::Lock write_lock_; | 309 base::Lock write_lock_; |
284 | 310 |
285 bool delay_writes_ = true; | 311 bool delay_writes_ = true; |
286 | 312 |
287 bool reject_writes_ = false; | 313 bool reject_writes_ = false; |
288 std::deque<MessageView> outgoing_messages_; | 314 std::deque<MessageView> outgoing_messages_; |
289 | 315 |
290 DISALLOW_COPY_AND_ASSIGN(ChannelWin); | 316 DISALLOW_COPY_AND_ASSIGN(ChannelWin); |
291 }; | 317 }; |
292 | 318 |
293 } // namespace | 319 } // namespace |
294 | 320 |
295 // static | 321 // static |
296 scoped_refptr<Channel> Channel::Create( | 322 scoped_refptr<Channel> Channel::Create( |
297 Delegate* delegate, | 323 Delegate* delegate, |
298 ScopedPlatformHandle platform_handle, | 324 ScopedPlatformHandle platform_handle, |
299 scoped_refptr<base::TaskRunner> io_task_runner) { | 325 scoped_refptr<base::TaskRunner> io_task_runner) { |
300 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); | 326 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); |
301 } | 327 } |
302 | 328 |
303 } // namespace edk | 329 } // namespace edk |
304 } // namespace mojo | 330 } // namespace mojo |
OLD | NEW |