OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/channel.h" | 5 #include "mojo/edk/system/channel.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <windows.h> | 8 #include <windows.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
73 public base::MessageLoopForIO::IOHandler { | 73 public base::MessageLoopForIO::IOHandler { |
74 public: | 74 public: |
75 ChannelWin(Delegate* delegate, | 75 ChannelWin(Delegate* delegate, |
76 ScopedPlatformHandle handle, | 76 ScopedPlatformHandle handle, |
77 scoped_refptr<base::TaskRunner> io_task_runner) | 77 scoped_refptr<base::TaskRunner> io_task_runner) |
78 : Channel(delegate), | 78 : Channel(delegate), |
79 self_(this), | 79 self_(this), |
80 handle_(std::move(handle)), | 80 handle_(std::move(handle)), |
81 io_task_runner_(io_task_runner) { | 81 io_task_runner_(io_task_runner) { |
82 CHECK(handle_.is_valid()); | 82 CHECK(handle_.is_valid()); |
| 83 |
| 84 wait_for_connect_ = handle_.get().needs_connection; |
83 } | 85 } |
84 | 86 |
85 void Start() override { | 87 void Start() override { |
86 io_task_runner_->PostTask( | 88 io_task_runner_->PostTask( |
87 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); | 89 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); |
88 } | 90 } |
89 | 91 |
90 void ShutDownImpl() override { | 92 void ShutDownImpl() override { |
91 // Always shut down asynchronously when called through the public interface. | 93 // Always shut down asynchronously when called through the public interface. |
92 io_task_runner_->PostTask( | 94 io_task_runner_->PostTask( |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
132 | 134 |
133 private: | 135 private: |
134 // May run on any thread. | 136 // May run on any thread. |
135 ~ChannelWin() override {} | 137 ~ChannelWin() override {} |
136 | 138 |
137 void StartOnIOThread() { | 139 void StartOnIOThread() { |
138 base::MessageLoop::current()->AddDestructionObserver(this); | 140 base::MessageLoop::current()->AddDestructionObserver(this); |
139 base::MessageLoopForIO::current()->RegisterIOHandler( | 141 base::MessageLoopForIO::current()->RegisterIOHandler( |
140 handle_.get().handle, this); | 142 handle_.get().handle, this); |
141 | 143 |
| 144 if (wait_for_connect_) { |
| 145 BOOL ok = ConnectNamedPipe(handle_.get().handle, |
| 146 &connect_context_.overlapped); |
| 147 if (ok) { |
| 148 PLOG(ERROR) << "Unexpected success while waiting for pipe connection"; |
| 149 OnError(); |
| 150 return; |
| 151 } |
| 152 |
| 153 const DWORD err = GetLastError(); |
| 154 switch (err) { |
| 155 case ERROR_PIPE_CONNECTED: |
| 156 wait_for_connect_ = false; |
| 157 break; |
| 158 case ERROR_IO_PENDING: |
| 159 AddRef(); |
| 160 return; |
| 161 case ERROR_NO_DATA: |
| 162 OnError(); |
| 163 return; |
| 164 } |
| 165 } |
| 166 |
142 // Now that we have registered our IOHandler, we can start writing. | 167 // Now that we have registered our IOHandler, we can start writing. |
143 { | 168 { |
144 base::AutoLock lock(write_lock_); | 169 base::AutoLock lock(write_lock_); |
145 if (delay_writes_) { | 170 if (delay_writes_) { |
146 delay_writes_ = false; | 171 delay_writes_ = false; |
147 WriteNextNoLock(); | 172 WriteNextNoLock(); |
148 } | 173 } |
149 } | 174 } |
150 | 175 |
151 // Keep this alive in case we synchronously run shutdown. | 176 // Keep this alive in case we synchronously run shutdown. |
(...skipping 20 matching lines...) Expand all Loading... |
172 if (self_) | 197 if (self_) |
173 ShutDownOnIOThread(); | 198 ShutDownOnIOThread(); |
174 } | 199 } |
175 | 200 |
176 // base::MessageLoop::IOHandler: | 201 // base::MessageLoop::IOHandler: |
177 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | 202 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, |
178 DWORD bytes_transfered, | 203 DWORD bytes_transfered, |
179 DWORD error) override { | 204 DWORD error) override { |
180 if (error != ERROR_SUCCESS) { | 205 if (error != ERROR_SUCCESS) { |
181 OnError(); | 206 OnError(); |
| 207 } else if (context == &connect_context_) { |
| 208 DCHECK(wait_for_connect_); |
| 209 wait_for_connect_ = false; |
| 210 ReadMore(0); |
| 211 |
| 212 base::AutoLock lock(write_lock_); |
| 213 if (delay_writes_) { |
| 214 delay_writes_ = false; |
| 215 WriteNextNoLock(); |
| 216 } |
182 } else if (context == &read_context_) { | 217 } else if (context == &read_context_) { |
183 OnReadDone(static_cast<size_t>(bytes_transfered)); | 218 OnReadDone(static_cast<size_t>(bytes_transfered)); |
184 } else { | 219 } else { |
185 CHECK(context == &write_context_); | 220 CHECK(context == &write_context_); |
186 OnWriteDone(static_cast<size_t>(bytes_transfered)); | 221 OnWriteDone(static_cast<size_t>(bytes_transfered)); |
187 } | 222 } |
188 Release(); // Balancing reference taken after ReadFile / WriteFile. | 223 Release(); // Balancing reference taken after ReadFile / WriteFile. |
189 } | 224 } |
190 | 225 |
191 void OnReadDone(size_t bytes_read) { | 226 void OnReadDone(size_t bytes_read) { |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
270 return true; | 305 return true; |
271 return WriteNoLock(outgoing_messages_.front()); | 306 return WriteNoLock(outgoing_messages_.front()); |
272 } | 307 } |
273 | 308 |
274 // Keeps the Channel alive at least until explicit shutdown on the IO thread. | 309 // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
275 scoped_refptr<Channel> self_; | 310 scoped_refptr<Channel> self_; |
276 | 311 |
277 ScopedPlatformHandle handle_; | 312 ScopedPlatformHandle handle_; |
278 scoped_refptr<base::TaskRunner> io_task_runner_; | 313 scoped_refptr<base::TaskRunner> io_task_runner_; |
279 | 314 |
| 315 base::MessageLoopForIO::IOContext connect_context_; |
280 base::MessageLoopForIO::IOContext read_context_; | 316 base::MessageLoopForIO::IOContext read_context_; |
281 base::MessageLoopForIO::IOContext write_context_; | 317 base::MessageLoopForIO::IOContext write_context_; |
282 | 318 |
283 // Protects |reject_writes_| and |outgoing_messages_|. | 319 // Protects |reject_writes_| and |outgoing_messages_|. |
284 base::Lock write_lock_; | 320 base::Lock write_lock_; |
285 | 321 |
286 bool delay_writes_ = true; | 322 bool delay_writes_ = true; |
287 | 323 |
288 bool reject_writes_ = false; | 324 bool reject_writes_ = false; |
289 std::deque<MessageView> outgoing_messages_; | 325 std::deque<MessageView> outgoing_messages_; |
290 | 326 |
| 327 bool wait_for_connect_; |
| 328 |
291 DISALLOW_COPY_AND_ASSIGN(ChannelWin); | 329 DISALLOW_COPY_AND_ASSIGN(ChannelWin); |
292 }; | 330 }; |
293 | 331 |
294 } // namespace | 332 } // namespace |
295 | 333 |
296 // static | 334 // static |
297 scoped_refptr<Channel> Channel::Create( | 335 scoped_refptr<Channel> Channel::Create( |
298 Delegate* delegate, | 336 Delegate* delegate, |
299 ScopedPlatformHandle platform_handle, | 337 ScopedPlatformHandle platform_handle, |
300 scoped_refptr<base::TaskRunner> io_task_runner) { | 338 scoped_refptr<base::TaskRunner> io_task_runner) { |
301 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); | 339 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); |
302 } | 340 } |
303 | 341 |
304 } // namespace edk | 342 } // namespace edk |
305 } // namespace mojo | 343 } // namespace mojo |
OLD | NEW |