OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/channel.h" | 5 #include "mojo/edk/system/channel.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <windows.h> | 8 #include <windows.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
73 public base::MessageLoopForIO::IOHandler { | 73 public base::MessageLoopForIO::IOHandler { |
74 public: | 74 public: |
75 ChannelWin(Delegate* delegate, | 75 ChannelWin(Delegate* delegate, |
76 ScopedPlatformHandle handle, | 76 ScopedPlatformHandle handle, |
77 scoped_refptr<base::TaskRunner> io_task_runner) | 77 scoped_refptr<base::TaskRunner> io_task_runner) |
78 : Channel(delegate), | 78 : Channel(delegate), |
79 self_(this), | 79 self_(this), |
80 handle_(std::move(handle)), | 80 handle_(std::move(handle)), |
81 io_task_runner_(io_task_runner) { | 81 io_task_runner_(io_task_runner) { |
82 CHECK(handle_.is_valid()); | 82 CHECK(handle_.is_valid()); |
83 | |
84 wait_for_connect_ = handle_.get().needs_connection; | |
85 } | 83 } |
86 | 84 |
87 void Start() override { | 85 void Start() override { |
88 io_task_runner_->PostTask( | 86 io_task_runner_->PostTask( |
89 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); | 87 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); |
90 } | 88 } |
91 | 89 |
92 void ShutDownImpl() override { | 90 void ShutDownImpl() override { |
93 // Always shut down asynchronously when called through the public interface. | 91 // Always shut down asynchronously when called through the public interface. |
94 io_task_runner_->PostTask( | 92 io_task_runner_->PostTask( |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
134 | 132 |
135 private: | 133 private: |
136 // May run on any thread. | 134 // May run on any thread. |
137 ~ChannelWin() override {} | 135 ~ChannelWin() override {} |
138 | 136 |
139 void StartOnIOThread() { | 137 void StartOnIOThread() { |
140 base::MessageLoop::current()->AddDestructionObserver(this); | 138 base::MessageLoop::current()->AddDestructionObserver(this); |
141 base::MessageLoopForIO::current()->RegisterIOHandler( | 139 base::MessageLoopForIO::current()->RegisterIOHandler( |
142 handle_.get().handle, this); | 140 handle_.get().handle, this); |
143 | 141 |
144 if (wait_for_connect_) { | |
145 BOOL ok = ConnectNamedPipe(handle_.get().handle, | |
146 &connect_context_.overlapped); | |
147 if (ok) { | |
148 PLOG(ERROR) << "Unexpected success while waiting for pipe connection"; | |
149 OnError(); | |
150 return; | |
151 } | |
152 | |
153 const DWORD err = GetLastError(); | |
154 switch (err) { | |
155 case ERROR_PIPE_CONNECTED: | |
156 wait_for_connect_ = false; | |
157 break; | |
158 case ERROR_IO_PENDING: | |
159 AddRef(); | |
160 return; | |
161 case ERROR_NO_DATA: | |
162 OnError(); | |
163 return; | |
164 } | |
165 } | |
166 | |
167 // Now that we have registered our IOHandler, we can start writing. | 142 // Now that we have registered our IOHandler, we can start writing. |
168 { | 143 { |
169 base::AutoLock lock(write_lock_); | 144 base::AutoLock lock(write_lock_); |
170 if (delay_writes_) { | 145 if (delay_writes_) { |
171 delay_writes_ = false; | 146 delay_writes_ = false; |
172 WriteNextNoLock(); | 147 WriteNextNoLock(); |
173 } | 148 } |
174 } | 149 } |
175 | 150 |
176 // Keep this alive in case we synchronously run shutdown. | 151 // Keep this alive in case we synchronously run shutdown. |
(...skipping 20 matching lines...) Expand all Loading... |
197 if (self_) | 172 if (self_) |
198 ShutDownOnIOThread(); | 173 ShutDownOnIOThread(); |
199 } | 174 } |
200 | 175 |
201 // base::MessageLoop::IOHandler: | 176 // base::MessageLoop::IOHandler: |
202 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | 177 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, |
203 DWORD bytes_transfered, | 178 DWORD bytes_transfered, |
204 DWORD error) override { | 179 DWORD error) override { |
205 if (error != ERROR_SUCCESS) { | 180 if (error != ERROR_SUCCESS) { |
206 OnError(); | 181 OnError(); |
207 } else if (context == &connect_context_) { | |
208 DCHECK(wait_for_connect_); | |
209 wait_for_connect_ = false; | |
210 ReadMore(0); | |
211 | |
212 base::AutoLock lock(write_lock_); | |
213 if (delay_writes_) { | |
214 delay_writes_ = false; | |
215 WriteNextNoLock(); | |
216 } | |
217 } else if (context == &read_context_) { | 182 } else if (context == &read_context_) { |
218 OnReadDone(static_cast<size_t>(bytes_transfered)); | 183 OnReadDone(static_cast<size_t>(bytes_transfered)); |
219 } else { | 184 } else { |
220 CHECK(context == &write_context_); | 185 CHECK(context == &write_context_); |
221 OnWriteDone(static_cast<size_t>(bytes_transfered)); | 186 OnWriteDone(static_cast<size_t>(bytes_transfered)); |
222 } | 187 } |
223 Release(); // Balancing reference taken after ReadFile / WriteFile. | 188 Release(); // Balancing reference taken after ReadFile / WriteFile. |
224 } | 189 } |
225 | 190 |
226 void OnReadDone(size_t bytes_read) { | 191 void OnReadDone(size_t bytes_read) { |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
305 return true; | 270 return true; |
306 return WriteNoLock(outgoing_messages_.front()); | 271 return WriteNoLock(outgoing_messages_.front()); |
307 } | 272 } |
308 | 273 |
309 // Keeps the Channel alive at least until explicit shutdown on the IO thread. | 274 // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
310 scoped_refptr<Channel> self_; | 275 scoped_refptr<Channel> self_; |
311 | 276 |
312 ScopedPlatformHandle handle_; | 277 ScopedPlatformHandle handle_; |
313 scoped_refptr<base::TaskRunner> io_task_runner_; | 278 scoped_refptr<base::TaskRunner> io_task_runner_; |
314 | 279 |
315 base::MessageLoopForIO::IOContext connect_context_; | |
316 base::MessageLoopForIO::IOContext read_context_; | 280 base::MessageLoopForIO::IOContext read_context_; |
317 base::MessageLoopForIO::IOContext write_context_; | 281 base::MessageLoopForIO::IOContext write_context_; |
318 | 282 |
319 // Protects |reject_writes_| and |outgoing_messages_|. | 283 // Protects |reject_writes_| and |outgoing_messages_|. |
320 base::Lock write_lock_; | 284 base::Lock write_lock_; |
321 | 285 |
322 bool delay_writes_ = true; | 286 bool delay_writes_ = true; |
323 | 287 |
324 bool reject_writes_ = false; | 288 bool reject_writes_ = false; |
325 std::deque<MessageView> outgoing_messages_; | 289 std::deque<MessageView> outgoing_messages_; |
326 | 290 |
327 bool wait_for_connect_; | |
328 | |
329 DISALLOW_COPY_AND_ASSIGN(ChannelWin); | 291 DISALLOW_COPY_AND_ASSIGN(ChannelWin); |
330 }; | 292 }; |
331 | 293 |
332 } // namespace | 294 } // namespace |
333 | 295 |
334 // static | 296 // static |
335 scoped_refptr<Channel> Channel::Create( | 297 scoped_refptr<Channel> Channel::Create( |
336 Delegate* delegate, | 298 Delegate* delegate, |
337 ScopedPlatformHandle platform_handle, | 299 ScopedPlatformHandle platform_handle, |
338 scoped_refptr<base::TaskRunner> io_task_runner) { | 300 scoped_refptr<base::TaskRunner> io_task_runner) { |
339 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); | 301 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); |
340 } | 302 } |
341 | 303 |
342 } // namespace edk | 304 } // namespace edk |
343 } // namespace mojo | 305 } // namespace mojo |
OLD | NEW |