Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Side by Side Diff: mojo/edk/system/channel_nacl.cc

Issue 2025763002: Use ChannelMojo in Pepper and NaCl processes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-utility-channel-mojo
Patch Set: iujbhirtughfbnjrthiubj Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/BUILD.gn ('k') | ppapi/nacl_irt/irt_start.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <errno.h>
8
9 #include <algorithm>
10 #include <deque>
11 #include <limits>
12 #include <memory>
13
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/macros.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/synchronization/lock.h"
20 #include "base/task_runner.h"
21 #include "mojo/edk/embedder/platform_handle_vector.h"
22
23 #include "base/threading/simple_thread.h"
24 #include "native_client/src/public/imc_syscalls.h"
25 #include "native_client/src/public/imc_types.h"
26
27 namespace mojo {
28 namespace edk {
29
30 namespace {
31
32 const size_t kReadBufferSize = 4 * 1024;
33
34 // Very simple message structure. Intentionally optimised for simplicity over
35 // performance.
36 struct SimpleMessage {
37 std::string data;
38 std::vector<int> fds;
39 };
40
41 class ChannelNacl : public Channel {
42 public:
43 ChannelNacl(Delegate* delegate,
44 ScopedPlatformHandle handle,
45 scoped_refptr<base::TaskRunner> io_task_runner)
46 : Channel(delegate),
47 self_(this),
48 handle_(std::move(handle)),
49 io_task_runner_(io_task_runner) {
50 reader_thread_.reset(
51 new ReaderThread(this, handle.get().handle, io_task_runner_));
52 }
53
54 void Start() override {
55 reader_thread_->Start();
56 }
57
58 void ShutDownImpl() override {
59 // Always shut down asynchronously when called through the public interface.
60 io_task_runner_->PostTask(
61 FROM_HERE, base::Bind(&ChannelNacl::ShutDownOnIOThread, this));
62 }
63
64 void Write(MessagePtr message) override {
65 bool write_error = false;
66 {
67 base::AutoLock lock(write_lock_);
68 if (reject_writes_)
69 return;
70
71 // For simplicity, do blocking sends.
72 ScopedPlatformHandleVectorPtr handles =
73 message->TakeHandlesForTransport();
74 std::unique_ptr<int[]> fds(new int[handles->size()]);
75 for (size_t i = 0; i < handles->size(); i++)
76 fds[i] = handles->at(i).handle;
77
78 NaClAbiNaClImcMsgIoVec iov = {
79 const_cast<void*>(message->data()), message->data_num_bytes()
80 };
81 NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds.get(), handles->size() };
82 ssize_t bytes_written = imc_sendmsg(handle_.get().handle, &msgh, 0);
83 DCHECK(bytes_written);
84
85 if (bytes_written < 0 ||
86 static_cast<size_t>(bytes_written) != message->data_num_bytes()) {
87 write_error = true;
88 }
89 }
90 if (write_error) {
91 // Do not synchronously invoke OnError(). Write() may have been called by
92 // the delegate and we don't want to re-enter it.
93 io_task_runner_->PostTask(FROM_HERE,
94 base::Bind(&ChannelNacl::OnError, this));
95 }
96 }
97
98 bool GetReadPlatformHandles(
99 size_t num_handles,
100 const void* extra_header,
101 size_t extra_header_size,
102 ScopedPlatformHandleVectorPtr* handles) override {
103 if (num_handles > std::numeric_limits<uint16_t>::max())
104 return false;
105 if (incoming_platform_handles_.size() < num_handles) {
106 handles->reset();
107 return true;
108 }
109
110 handles->reset(new PlatformHandleVector(num_handles));
111 for (size_t i = 0; i < num_handles; ++i) {
112 (*handles)->at(i) = incoming_platform_handles_.front();
113 incoming_platform_handles_.pop_front();
114 }
115
116 return true;
117 }
118
119 private:
120 class ReaderThread : public base::SimpleThread {
121 public:
122 ReaderThread(scoped_refptr<ChannelNacl> channel_nacl, int channel_fd,
123 scoped_refptr<base::TaskRunner> io_task_runner)
124 : base::SimpleThread("Mojo_ChannelNacl_ReaderThread"),
125 channel_nacl_(channel_nacl),
126 channel_fd_(channel_fd),
127 io_task_runner_(io_task_runner) {
128 DCHECK(channel_fd_ != -1);
129 }
130
131 ~ReaderThread() override {
132 Stop();
133 }
134
135 void Run() override {
136 while (true) {
137 {
138 base::AutoLock l(lock_);
139 if (shutdown_)
140 return;
141 }
142
143 SimpleMessage message;
144 message.data.resize(kReadBufferSize);
145 message.fds.resize(NACL_ABI_IMC_DESC_MAX);
146
147 NaClAbiNaClImcMsgIoVec iov = {
148 &message.data[0], message.data.size()
149 };
150 NaClAbiNaClImcMsgHdr msg = {
151 &iov, 1, message.fds.data(), message.fds.size()
152 };
153
154 int bytes_read = imc_recvmsg(channel_fd_, &msg, 0);
155 if (bytes_read < 0) {
156 io_task_runner_->PostTask(FROM_HERE,
157 base::Bind(&ChannelNacl::OnError,
158 channel_nacl_));
159 return;
160 }
161
162 message.data.resize(bytes_read);
163 message.fds.resize(msg.desc_length);
164
165 io_task_runner_->PostTask(FROM_HERE,
166 base::Bind(&ChannelNacl::OnDataReceived,
167 channel_nacl_,
168 base::Passed(&message)));
169 }
170 }
171
172 void Stop() {
173 if (HasBeenJoined())
174 return;
175
176 {
177 base::AutoLock l(lock_);
178 shutdown_ = true;
179 }
180
181 // Signals the thread to wake up.
182 close(channel_fd_);
183 Join();
184
185 channel_fd_ = -1;
186 }
187
188 private:
189 scoped_refptr<ChannelNacl> channel_nacl_;
190 int channel_fd_;
191 scoped_refptr<base::TaskRunner> io_task_runner_;
192
193 base::Lock lock_;
194 bool shutdown_ = false;
195 };
196
197 ~ChannelNacl() override {
198 for (auto handle : incoming_platform_handles_)
199 handle.CloseIfNecessary();
200 }
201
202 void ShutDownOnIOThread() {
203 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
204 reader_thread_->Stop();
205
206 // May destroy the |this| if it was the last reference.
207 self_ = nullptr;
208 }
209
210 void OnDataReceived(SimpleMessage message) {
211 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
212
213 for (int fd : message.fds) {
214 incoming_platform_handles_.push_back(PlatformHandle(fd));
215 }
216
217 bool read_error = false;
218 size_t next_read_size = 0;
219 size_t buffer_capacity = 0;
220 size_t total_bytes_read = 0;
221 do {
222 buffer_capacity = next_read_size;
223 char* buffer = GetReadBuffer(&buffer_capacity);
224 DCHECK_GT(buffer_capacity, 0u);
225
226 size_t bytes_copied = std::min(message.data.size() - total_bytes_read,
227 buffer_capacity);
228 memcpy(buffer, message.data.data() + total_bytes_read, bytes_copied);
229
230 total_bytes_read += bytes_copied;
231 if (!OnReadComplete(bytes_copied, &next_read_size)) {
232 read_error = true;
233 break;
234 }
235 } while (total_bytes_read < message.data.size() && next_read_size > 0);
236 if (read_error) {
237 // Stop reading. Note, there may be a pending read in the message loop.
238 reader_thread_->Stop();
239
240 OnError();
241 }
242 }
243
244 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
245 scoped_refptr<Channel> self_;
246
247 ScopedPlatformHandle handle_;
248 scoped_refptr<base::TaskRunner> io_task_runner_;
249
250 std::unique_ptr<ReaderThread> reader_thread_;
251
252 std::deque<PlatformHandle> incoming_platform_handles_;
253
254 // Protects |pending_write_| and |outgoing_messages_|.
255 base::Lock write_lock_;
256 bool reject_writes_ = false;
257
258 DISALLOW_COPY_AND_ASSIGN(ChannelNacl);
259 };
260
261 } // namespace
262
263 // static
264 scoped_refptr<Channel> Channel::Create(
265 Delegate* delegate,
266 ScopedPlatformHandle platform_handle,
267 scoped_refptr<base::TaskRunner> io_task_runner) {
268 return new ChannelNacl(delegate, std::move(platform_handle), io_task_runner);
269 }
270
271 } // namespace edk
272 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/BUILD.gn ('k') | ppapi/nacl_irt/irt_start.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698