Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(476)

Side by Side Diff: mojo/edk/system/channel_win.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <windows.h>
8
9 #include <algorithm>
10 #include <deque>
11
12 #include "base/bind.h"
13 #include "base/location.h"
14 #include "base/macros.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/memory/scoped_ptr.h"
17 #include "base/message_loop/message_loop.h"
18 #include "base/synchronization/lock.h"
19 #include "base/task_runner.h"
20 #include "mojo/edk/embedder/platform_handle_vector.h"
21
22 namespace mojo {
23 namespace edk {
24
25 namespace {
26
27 const size_t kMaxBatchReadCapacity = 256 * 1024;
28
29 // A view over a Channel::Message object. The write queue uses these since
30 // large messages may need to be sent in chunks.
31 class MessageView {
32 public:
33 // Owns |message|. |offset| indexes the first unsent byte in the message.
34 MessageView(Channel::MessagePtr message, size_t offset)
35 : message_(std::move(message)),
36 offset_(offset) {
37 DCHECK_GT(message_->data_num_bytes(), offset_);
38 }
39
40 MessageView(MessageView&& other) { *this = std::move(other); }
41
42 MessageView& operator=(MessageView&& other) {
43 message_ = std::move(other.message_);
44 offset_ = other.offset_;
45 return *this;
46 }
47
48 ~MessageView() {}
49
50 const void* data() const {
51 return static_cast<const char*>(message_->data()) + offset_;
52 }
53
54 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
55
56 size_t data_offset() const { return offset_; }
57 void advance_data_offset(size_t num_bytes) {
58 DCHECK_GE(message_->data_num_bytes(), offset_ + num_bytes);
59 offset_ += num_bytes;
60 }
61
62 Channel::MessagePtr TakeChannelMessage() { return std::move(message_); }
63
64 private:
65 Channel::MessagePtr message_;
66 size_t offset_;
67
68 DISALLOW_COPY_AND_ASSIGN(MessageView);
69 };
70
71 class ChannelWin : public Channel,
72 public base::MessageLoop::DestructionObserver,
73 public base::MessageLoopForIO::IOHandler {
74 public:
75 ChannelWin(Delegate* delegate,
76 ScopedPlatformHandle handle,
77 scoped_refptr<base::TaskRunner> io_task_runner)
78 : Channel(delegate),
79 self_(this),
80 handle_(std::move(handle)),
81 io_task_runner_(io_task_runner) {
82 memset(&read_context_, 0, sizeof(read_context_));
83 read_context_.handler = this;
84
85 memset(&write_context_, 0, sizeof(write_context_));
86 write_context_.handler = this;
87 }
88
89 void Start() override {
90 io_task_runner_->PostTask(
91 FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this));
92 }
93
94 void ShutDownImpl() override {
95 // Always shut down asynchronously when called through the public interface.
96 io_task_runner_->PostTask(
97 FROM_HERE, base::Bind(&ChannelWin::ShutDownOnIOThread, this));
98 }
99
100 void Write(MessagePtr message) override {
101 bool write_error = false;
102 {
103 base::AutoLock lock(write_lock_);
104 if (reject_writes_)
105 return;
106
107 bool write_now = !delay_writes_ && outgoing_messages_.empty();
108 outgoing_messages_.emplace_back(std::move(message), 0);
109
110 if (write_now && !WriteNoLock(outgoing_messages_.front()))
111 reject_writes_ = write_error = true;
112 }
113 if (write_error) {
114 // Do not synchronously invoke OnError(). Write() may have been called by
115 // the delegate and we don't want to re-enter it.
116 io_task_runner_->PostTask(FROM_HERE,
117 base::Bind(&ChannelWin::OnError, this));
118 }
119 }
120
121 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
122 size_t num_handles,
123 void** payload,
124 size_t* payload_size) override {
125 size_t handles_size = sizeof(PlatformHandle) * num_handles;
126 if (handles_size > *payload_size)
127 return nullptr;
128
129 *payload_size -= handles_size;
130 ScopedPlatformHandleVectorPtr handles(
131 new PlatformHandleVector(num_handles));
132 memcpy(handles->data(),
133 static_cast<const char*>(*payload) + *payload_size, handles_size);
134 return handles;
135 }
136
137 private:
138 // May run on any thread.
139 ~ChannelWin() override {}
140
141 void StartOnIOThread() {
142 base::MessageLoop::current()->AddDestructionObserver(this);
143 base::MessageLoopForIO::current()->RegisterIOHandler(
144 handle_.get().handle, this);
145
146 // Now that we have registered our IOHandler, we can start writing.
147 {
148 base::AutoLock lock(write_lock_);
149 if (delay_writes_) {
150 delay_writes_ = false;
151 WriteNextNoLock();
152 }
153 }
154
155 // Keep this alive in case we synchronously run shutdown.
156 scoped_refptr<ChannelWin> keep_alive(this);
157 ReadMore(0);
158 }
159
160 void ShutDownOnIOThread() {
161 base::MessageLoop::current()->RemoveDestructionObserver(this);
162
163 CancelIo(handle_.get().handle);
164 handle_.reset();
165
166 // May destroy the |this| if it was the last reference.
167 self_ = nullptr;
168 }
169
170 // base::MessageLoop::DestructionObserver:
171 void WillDestroyCurrentMessageLoop() override {
172 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
173 if (self_)
174 ShutDownOnIOThread();
175 }
176
177 // base::MessageLoop::IOHandler:
178 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
179 DWORD bytes_transfered,
180 DWORD error) override {
181 if (error != ERROR_SUCCESS) {
182 OnError();
183 } else if (context == &read_context_) {
184 OnReadDone(static_cast<size_t>(bytes_transfered));
185 } else {
186 CHECK(context == &write_context_);
187 OnWriteDone(static_cast<size_t>(bytes_transfered));
188 }
189 Release(); // Balancing reference taken after ReadFile / WriteFile.
190 }
191
192 void OnReadDone(size_t bytes_read) {
193 if (bytes_read > 0) {
194 size_t next_read_size = 0;
195 if (OnReadComplete(bytes_read, &next_read_size)) {
196 ReadMore(next_read_size);
197 } else {
198 OnError();
199 }
200 } else if (bytes_read == 0) {
201 OnError();
202 }
203 }
204
205 void OnWriteDone(size_t bytes_written) {
206 if (bytes_written == 0)
207 return;
208
209 bool write_error = false;
210 {
211 base::AutoLock lock(write_lock_);
212
213 DCHECK(!outgoing_messages_.empty());
214
215 MessageView& message_view = outgoing_messages_.front();
216 message_view.advance_data_offset(bytes_written);
217 if (message_view.data_num_bytes() == 0) {
218 Channel::MessagePtr message = message_view.TakeChannelMessage();
219 outgoing_messages_.pop_front();
220
221 // Clear any handles so they don't get closed on destruction.
222 ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
223 if (handles)
224 handles->clear();
225 }
226
227 if (!WriteNextNoLock())
228 reject_writes_ = write_error = true;
229 }
230 if (write_error)
231 OnError();
232 }
233
234 void ReadMore(size_t next_read_size_hint) {
235 size_t buffer_capacity = next_read_size_hint;
236 char* buffer = GetReadBuffer(&buffer_capacity);
237 DCHECK_GT(buffer_capacity, 0u);
238
239 BOOL ok = ReadFile(handle_.get().handle,
240 buffer,
241 static_cast<DWORD>(buffer_capacity),
242 NULL,
243 &read_context_.overlapped);
244
245 if (ok || GetLastError() == ERROR_IO_PENDING) {
246 AddRef(); // Will be balanced in OnIOCompleted
247 } else {
248 OnError();
249 }
250 }
251
252 // Attempts to write a message directly to the channel. If the full message
253 // cannot be written, it's queued and a wait is initiated to write the message
254 // ASAP on the I/O thread.
255 bool WriteNoLock(const MessageView& message_view) {
256 BOOL ok = WriteFile(handle_.get().handle,
257 message_view.data(),
258 static_cast<DWORD>(message_view.data_num_bytes()),
259 NULL,
260 &write_context_.overlapped);
261
262 if (ok || GetLastError() == ERROR_IO_PENDING) {
263 AddRef(); // Will be balanced in OnIOCompleted.
264 return true;
265 }
266 return false;
267 }
268
269 bool WriteNextNoLock() {
270 if (outgoing_messages_.empty())
271 return true;
272 return WriteNoLock(outgoing_messages_.front());
273 }
274
275 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
276 scoped_refptr<Channel> self_;
277
278 ScopedPlatformHandle handle_;
279 scoped_refptr<base::TaskRunner> io_task_runner_;
280
281 base::MessageLoopForIO::IOContext read_context_;
282 base::MessageLoopForIO::IOContext write_context_;
283
284 // Protects |reject_writes_| and |outgoing_messages_|.
285 base::Lock write_lock_;
286
287 bool delay_writes_ = true;
288
289 bool reject_writes_ = false;
290 std::deque<MessageView> outgoing_messages_;
291
292 DISALLOW_COPY_AND_ASSIGN(ChannelWin);
293 };
294
295 } // namespace
296
297 // static
298 scoped_refptr<Channel> Channel::Create(
299 Delegate* delegate,
300 ScopedPlatformHandle platform_handle,
301 scoped_refptr<base::TaskRunner> io_task_runner) {
302 return new ChannelWin(delegate, std::move(platform_handle), io_task_runner);
303 }
304
305 } // namespace edk
306 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698