Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: mojo/edk/system/channel_posix.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <errno.h>
8 #include <sys/uio.h>
9
10 #include <algorithm>
11 #include <deque>
12
13 #include "base/bind.h"
14 #include "base/location.h"
15 #include "base/macros.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_ptr.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/synchronization/lock.h"
20 #include "base/task_runner.h"
21 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
22 #include "mojo/edk/embedder/platform_handle_vector.h"
23
24 namespace mojo {
25 namespace edk {
26
27 namespace {
28
29 const size_t kMaxBatchReadCapacity = 256 * 1024;
30
31 // A view over a Channel::Message object. The write queue uses these since
32 // large messages may need to be sent in chunks.
33 class MessageView {
34 public:
35 // Owns |message|. |offset| indexes the first unsent byte in the message.
36 MessageView(Channel::MessagePtr message, size_t offset)
37 : message_(std::move(message)),
38 offset_(offset),
39 handles_(message_->TakeHandles()) {
40 DCHECK_GT(message_->data_num_bytes(), offset_);
41 }
42
43 MessageView(MessageView&& other) { *this = std::move(other); }
44
45 MessageView& operator=(MessageView&& other) {
46 message_ = std::move(other.message_);
47 offset_ = other.offset_;
48 handles_ = std::move(other.handles_);
49 return *this;
50 }
51
52 ~MessageView() {}
53
54 const void* data() const {
55 return static_cast<const char*>(message_->data()) + offset_;
56 }
57
58 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
59
60 size_t data_offset() const { return offset_; }
61 void advance_data_offset(size_t num_bytes) {
62 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
63 offset_ += num_bytes;
64 }
65
66 ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
67 Channel::MessagePtr TakeMessage() { return std::move(message_); }
68
69 private:
70 Channel::MessagePtr message_;
71 size_t offset_;
72 ScopedPlatformHandleVectorPtr handles_;
73
74 DISALLOW_COPY_AND_ASSIGN(MessageView);
75 };
76
77 class ChannelPosix : public Channel,
78 public base::MessageLoop::DestructionObserver,
79 public base::MessageLoopForIO::Watcher {
80 public:
81 ChannelPosix(Delegate* delegate,
82 ScopedPlatformHandle handle,
83 scoped_refptr<base::TaskRunner> io_task_runner)
84 : Channel(delegate),
85 self_(this),
86 handle_(std::move(handle)),
87 io_task_runner_(io_task_runner) {
88 }
89
90 void Start() override {
91 if (io_task_runner_->RunsTasksOnCurrentThread()) {
92 StartOnIOThread();
93 } else {
94 io_task_runner_->PostTask(
95 FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
96 }
97 }
98
99 void ShutDownImpl() override {
100 // Always shut down asynchronously when called through the public interface.
101 io_task_runner_->PostTask(
102 FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
103 }
104
105 void Write(MessagePtr message) override {
106 bool write_error = false;
107 {
108 base::AutoLock lock(write_lock_);
109 if (reject_writes_)
110 return;
111 if (outgoing_messages_.empty()) {
112 if (!WriteNoLock(MessageView(std::move(message), 0)))
113 reject_writes_ = write_error = true;
114 } else {
115 outgoing_messages_.emplace_back(std::move(message), 0);
116 }
117 }
118 if (write_error) {
119 // Do not synchronously invoke OnError(). Write() may have been called by
120 // the delegate and we don't want to re-enter it.
121 io_task_runner_->PostTask(FROM_HERE,
122 base::Bind(&ChannelPosix::OnError, this));
123 }
124 }
125
126 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
127 size_t num_handles,
128 void** payload,
129 size_t* payload_size) override {
130 if (incoming_platform_handles_.size() < num_handles)
131 return nullptr;
132 ScopedPlatformHandleVectorPtr handles(
133 new PlatformHandleVector(num_handles));
134 for (size_t i = 0; i < num_handles; ++i) {
135 (*handles)[i] = incoming_platform_handles_.front();
136 incoming_platform_handles_.pop_front();
137 }
138 return handles;
139 }
140
141 private:
142 ~ChannelPosix() override {
143 DCHECK(!read_watcher_);
144 DCHECK(!write_watcher_);
145 for (auto handle : incoming_platform_handles_)
146 handle.CloseIfNecessary();
147 }
148
149 void StartOnIOThread() {
150 DCHECK(!read_watcher_);
151 DCHECK(!write_watcher_);
152 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
153 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
154 base::MessageLoopForIO::current()->WatchFileDescriptor(
155 handle_.get().handle, true /* persistent */,
156 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
157 base::MessageLoop::current()->AddDestructionObserver(this);
158 }
159
160 void WaitForWriteOnIOThread() {
161 base::AutoLock lock(write_lock_);
162 WaitForWriteOnIOThreadNoLock();
163 }
164
165 void WaitForWriteOnIOThreadNoLock() {
166 if (pending_write_)
167 return;
168 if (!write_watcher_)
169 return;
170 if (io_task_runner_->RunsTasksOnCurrentThread()) {
171 pending_write_ = true;
172 base::MessageLoopForIO::current()->WatchFileDescriptor(
173 handle_.get().handle, false /* persistent */,
174 base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
175 } else {
176 io_task_runner_->PostTask(
177 FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
178 }
179 }
180
181 void ShutDownOnIOThread() {
182 base::MessageLoop::current()->RemoveDestructionObserver(this);
183
184 read_watcher_.reset();
185 write_watcher_.reset();
186 handle_.reset();
187
188 // May destroy the |this| if it was the last reference.
189 self_ = nullptr;
190 }
191
192 // base::MessageLoop::DestructionObserver:
193 void WillDestroyCurrentMessageLoop() override {
194 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
195 if (self_)
196 ShutDownOnIOThread();
197 }
198
199 // base::MessageLoopForIO::Watcher:
200 void OnFileCanReadWithoutBlocking(int fd) override {
201 CHECK_EQ(fd, handle_.get().handle);
202
203 bool read_error = false;
204 size_t next_read_size = 0;
205 size_t buffer_capacity = 0;
206 size_t total_bytes_read = 0;
207 size_t bytes_read = 0;
208 do {
209 buffer_capacity = next_read_size;
210 char* buffer = GetReadBuffer(&buffer_capacity);
211 DCHECK_GT(buffer_capacity, 0u);
212
213 ssize_t read_result = PlatformChannelRecvmsg(
214 handle_.get(),
215 buffer,
216 buffer_capacity,
217 &incoming_platform_handles_);
218
219 if (read_result > 0) {
220 bytes_read = static_cast<size_t>(read_result);
221 total_bytes_read += bytes_read;
222 if (!OnReadComplete(bytes_read, &next_read_size)) {
223 read_error = true;
224 break;
225 }
226 } else if (read_result == 0 ||
227 (errno != EAGAIN && errno != EWOULDBLOCK)) {
228 read_error = true;
229 break;
230 }
231 } while (bytes_read == buffer_capacity &&
232 total_bytes_read < kMaxBatchReadCapacity &&
233 next_read_size > 0);
234 if (read_error) {
235 // Stop receiving read notifications.
236 read_watcher_.reset();
237
238 OnError();
239 }
240 }
241
242 void OnFileCanWriteWithoutBlocking(int fd) override {
243 bool write_error = false;
244 {
245 base::AutoLock lock(write_lock_);
246 pending_write_ = false;
247 if (!FlushOutgoingMessagesNoLock())
248 reject_writes_ = write_error = true;
249 }
250 if (write_error)
251 OnError();
252 }
253
254 // Attempts to write a message directly to the channel. If the full message
255 // cannot be written, it's queued and a wait is initiated to write the message
256 // ASAP on the I/O thread.
257 bool WriteNoLock(MessageView message_view) {
258 size_t bytes_written = 0;
259 do {
260 message_view.advance_data_offset(bytes_written);
261
262 ssize_t result;
263 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
264 if (handles && handles->size()) {
265 iovec iov = {
266 const_cast<void*>(message_view.data()),
267 message_view.data_num_bytes()
268 };
269 // TODO: Handle lots of handles.
270 result = PlatformChannelSendmsgWithHandles(
271 handle_.get(), &iov, 1, handles->data(), handles->size());
272 handles->clear();
273 } else {
274 result = PlatformChannelWrite(handle_.get(), message_view.data(),
275 message_view.data_num_bytes());
276 }
277
278 if (result < 0) {
279 if (errno != EAGAIN && errno != EWOULDBLOCK)
280 return false;
281 outgoing_messages_.emplace_back(std::move(message_view));
282 WaitForWriteOnIOThreadNoLock();
283 return true;
284 }
285
286 bytes_written = static_cast<size_t>(result);
287 } while (bytes_written < message_view.data_num_bytes());
288
289 return true;
290 }
291
292 bool FlushOutgoingMessagesNoLock() {
293 std::deque<MessageView> messages;
294 std::swap(outgoing_messages_, messages);
295
296 while (!messages.empty()) {
297 if (!WriteNoLock(std::move(messages.front())))
298 return false;
299
300 messages.pop_front();
301 if (!outgoing_messages_.empty()) {
302 // The message was requeued by WriteNoLock(), so we have to wait for
303 // pipe to become writable again. Repopulate the message queue and exit.
304 DCHECK_EQ(outgoing_messages_.size(), 1u);
305 MessageView message_view = std::move(outgoing_messages_.front());
306 std::swap(messages, outgoing_messages_);
307 outgoing_messages_.push_front(std::move(message_view));
308 return true;
309 }
310 }
311
312 return true;
313 }
314
315 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
316 scoped_refptr<Channel> self_;
317
318 ScopedPlatformHandle handle_;
319 scoped_refptr<base::TaskRunner> io_task_runner_;
320
321 // These watchers must only be accessed on the IO thread.
322 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
323 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
324
325 std::deque<PlatformHandle> incoming_platform_handles_;
326
327 // Protects |pending_write_| and |outgoing_messages_|.
328 base::Lock write_lock_;
329 bool pending_write_ = false;
330 bool reject_writes_ = false;
331 std::deque<MessageView> outgoing_messages_;
332
333 DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
334 };
335
336 } // namespace
337
338 // static
339 scoped_refptr<Channel> Channel::Create(
340 Delegate* delegate,
341 ScopedPlatformHandle platform_handle,
342 scoped_refptr<base::TaskRunner> io_task_runner) {
343 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
344 }
345
346 } // namespace edk
347 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698