OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/channel.h" |
| 6 |
| 7 #include <errno.h> |
| 8 |
| 9 #include <algorithm> |
| 10 #include <deque> |
| 11 #include <limits> |
| 12 #include <memory> |
| 13 |
| 14 #include "base/bind.h" |
| 15 #include "base/location.h" |
| 16 #include "base/macros.h" |
| 17 #include "base/memory/ref_counted.h" |
| 18 #include "base/message_loop/message_loop.h" |
| 19 #include "base/synchronization/lock.h" |
| 20 #include "base/task_runner.h" |
| 21 #include "mojo/edk/embedder/platform_handle_vector.h" |
| 22 |
| 23 #include "base/threading/simple_thread.h" |
| 24 #include "native_client/src/public/imc_syscalls.h" |
| 25 #include "native_client/src/public/imc_types.h" |
| 26 |
| 27 namespace mojo { |
| 28 namespace edk { |
| 29 |
| 30 namespace { |
| 31 |
| 32 const size_t kReadBufferSize = 4 * 1024; |
| 33 |
| 34 // Very simple message structure. Intentionally optimised for simplicity over |
| 35 // performance. |
| 36 struct SimpleMessage { |
| 37 std::string data; |
| 38 std::vector<int> fds; |
| 39 }; |
| 40 |
| 41 class ChannelNacl : public Channel { |
| 42 public: |
| 43 ChannelNacl(Delegate* delegate, |
| 44 ScopedPlatformHandle handle, |
| 45 scoped_refptr<base::TaskRunner> io_task_runner) |
| 46 : Channel(delegate), |
| 47 self_(this), |
| 48 handle_(std::move(handle)), |
| 49 io_task_runner_(io_task_runner) { |
| 50 reader_thread_.reset( |
| 51 new ReaderThread(this, handle.get().handle, io_task_runner_)); |
| 52 } |
| 53 |
| 54 void Start() override { |
| 55 reader_thread_->Start(); |
| 56 } |
| 57 |
| 58 void ShutDownImpl() override { |
| 59 // Always shut down asynchronously when called through the public interface. |
| 60 io_task_runner_->PostTask( |
| 61 FROM_HERE, base::Bind(&ChannelNacl::ShutDownOnIOThread, this)); |
| 62 } |
| 63 |
| 64 void Write(MessagePtr message) override { |
| 65 bool write_error = false; |
| 66 { |
| 67 base::AutoLock lock(write_lock_); |
| 68 if (reject_writes_) |
| 69 return; |
| 70 |
| 71 // For simplicity, do blocking sends. |
| 72 ScopedPlatformHandleVectorPtr handles = |
| 73 message->TakeHandlesForTransport(); |
| 74 std::unique_ptr<int[]> fds(new int[handles->size()]); |
| 75 for (size_t i = 0; i < handles->size(); i++) |
| 76 fds[i] = handles->at(i).handle; |
| 77 |
| 78 NaClAbiNaClImcMsgIoVec iov = { |
| 79 const_cast<void*>(message->data()), message->data_num_bytes() |
| 80 }; |
| 81 NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds.get(), handles->size() }; |
| 82 ssize_t bytes_written = imc_sendmsg(handle_.get().handle, &msgh, 0); |
| 83 DCHECK(bytes_written); |
| 84 |
| 85 if (bytes_written < 0 || |
| 86 static_cast<size_t>(bytes_written) != message->data_num_bytes()) { |
| 87 write_error = true; |
| 88 } |
| 89 } |
| 90 if (write_error) { |
| 91 // Do not synchronously invoke OnError(). Write() may have been called by |
| 92 // the delegate and we don't want to re-enter it. |
| 93 io_task_runner_->PostTask(FROM_HERE, |
| 94 base::Bind(&ChannelNacl::OnError, this)); |
| 95 } |
| 96 } |
| 97 |
| 98 bool GetReadPlatformHandles( |
| 99 size_t num_handles, |
| 100 const void* extra_header, |
| 101 size_t extra_header_size, |
| 102 ScopedPlatformHandleVectorPtr* handles) override { |
| 103 if (num_handles > std::numeric_limits<uint16_t>::max()) |
| 104 return false; |
| 105 if (incoming_platform_handles_.size() < num_handles) { |
| 106 handles->reset(); |
| 107 return true; |
| 108 } |
| 109 |
| 110 handles->reset(new PlatformHandleVector(num_handles)); |
| 111 for (size_t i = 0; i < num_handles; ++i) { |
| 112 (*handles)->at(i) = incoming_platform_handles_.front(); |
| 113 incoming_platform_handles_.pop_front(); |
| 114 } |
| 115 |
| 116 return true; |
| 117 } |
| 118 |
| 119 private: |
| 120 class ReaderThread : public base::SimpleThread { |
| 121 public: |
| 122 ReaderThread(scoped_refptr<ChannelNacl> channel_nacl, int channel_fd, |
| 123 scoped_refptr<base::TaskRunner> io_task_runner) |
| 124 : base::SimpleThread("Mojo_ChannelNacl_ReaderThread"), |
| 125 channel_nacl_(channel_nacl), |
| 126 channel_fd_(channel_fd), |
| 127 io_task_runner_(io_task_runner) { |
| 128 DCHECK(channel_fd_ != -1); |
| 129 } |
| 130 |
| 131 ~ReaderThread() override { |
| 132 Stop(); |
| 133 } |
| 134 |
| 135 void Run() override { |
| 136 while (true) { |
| 137 { |
| 138 base::AutoLock l(lock_); |
| 139 if (shutdown_) |
| 140 return; |
| 141 } |
| 142 |
| 143 SimpleMessage message; |
| 144 message.data.resize(kReadBufferSize); |
| 145 message.fds.resize(NACL_ABI_IMC_DESC_MAX); |
| 146 |
| 147 NaClAbiNaClImcMsgIoVec iov = { |
| 148 &message.data[0], message.data.size() |
| 149 }; |
| 150 NaClAbiNaClImcMsgHdr msg = { |
| 151 &iov, 1, message.fds.data(), message.fds.size() |
| 152 }; |
| 153 |
| 154 int bytes_read = imc_recvmsg(channel_fd_, &msg, 0); |
| 155 if (bytes_read < 0) { |
| 156 io_task_runner_->PostTask(FROM_HERE, |
| 157 base::Bind(&ChannelNacl::OnError, |
| 158 channel_nacl_)); |
| 159 return; |
| 160 } |
| 161 |
| 162 message.data.resize(bytes_read); |
| 163 message.fds.resize(msg.desc_length); |
| 164 |
| 165 io_task_runner_->PostTask(FROM_HERE, |
| 166 base::Bind(&ChannelNacl::OnDataReceived, |
| 167 channel_nacl_, |
| 168 base::Passed(&message))); |
| 169 } |
| 170 } |
| 171 |
| 172 void Stop() { |
| 173 if (HasBeenJoined()) |
| 174 return; |
| 175 |
| 176 { |
| 177 base::AutoLock l(lock_); |
| 178 shutdown_ = true; |
| 179 } |
| 180 |
| 181 // Signals the thread to wake up. |
| 182 close(channel_fd_); |
| 183 Join(); |
| 184 |
| 185 channel_fd_ = -1; |
| 186 } |
| 187 |
| 188 private: |
| 189 scoped_refptr<ChannelNacl> channel_nacl_; |
| 190 int channel_fd_; |
| 191 scoped_refptr<base::TaskRunner> io_task_runner_; |
| 192 |
| 193 base::Lock lock_; |
| 194 bool shutdown_ = false; |
| 195 }; |
| 196 |
| 197 ~ChannelNacl() override { |
| 198 for (auto handle : incoming_platform_handles_) |
| 199 handle.CloseIfNecessary(); |
| 200 } |
| 201 |
| 202 void ShutDownOnIOThread() { |
| 203 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 204 reader_thread_->Stop(); |
| 205 |
| 206 // May destroy the |this| if it was the last reference. |
| 207 self_ = nullptr; |
| 208 } |
| 209 |
| 210 void OnDataReceived(SimpleMessage message) { |
| 211 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 212 |
| 213 for (int fd : message.fds) { |
| 214 incoming_platform_handles_.push_back(PlatformHandle(fd)); |
| 215 } |
| 216 |
| 217 bool read_error = false; |
| 218 size_t next_read_size = 0; |
| 219 size_t buffer_capacity = 0; |
| 220 size_t total_bytes_read = 0; |
| 221 do { |
| 222 buffer_capacity = next_read_size; |
| 223 char* buffer = GetReadBuffer(&buffer_capacity); |
| 224 DCHECK_GT(buffer_capacity, 0u); |
| 225 |
| 226 size_t bytes_copied = std::min(message.data.size() - total_bytes_read, |
| 227 buffer_capacity); |
| 228 memcpy(buffer, message.data.data() + total_bytes_read, bytes_copied); |
| 229 |
| 230 total_bytes_read += bytes_copied; |
| 231 if (!OnReadComplete(bytes_copied, &next_read_size)) { |
| 232 read_error = true; |
| 233 break; |
| 234 } |
| 235 } while (total_bytes_read < message.data.size() && next_read_size > 0); |
| 236 if (read_error) { |
| 237 // Stop reading. Note, there may be a pending read in the message loop. |
| 238 reader_thread_->Stop(); |
| 239 |
| 240 OnError(); |
| 241 } |
| 242 } |
| 243 |
| 244 // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
| 245 scoped_refptr<Channel> self_; |
| 246 |
| 247 ScopedPlatformHandle handle_; |
| 248 scoped_refptr<base::TaskRunner> io_task_runner_; |
| 249 |
| 250 std::unique_ptr<ReaderThread> reader_thread_; |
| 251 |
| 252 std::deque<PlatformHandle> incoming_platform_handles_; |
| 253 |
| 254 // Protects |pending_write_| and |outgoing_messages_|. |
| 255 base::Lock write_lock_; |
| 256 bool reject_writes_ = false; |
| 257 |
| 258 DISALLOW_COPY_AND_ASSIGN(ChannelNacl); |
| 259 }; |
| 260 |
| 261 } // namespace |
| 262 |
| 263 // static |
| 264 scoped_refptr<Channel> Channel::Create( |
| 265 Delegate* delegate, |
| 266 ScopedPlatformHandle platform_handle, |
| 267 scoped_refptr<base::TaskRunner> io_task_runner) { |
| 268 return new ChannelNacl(delegate, std::move(platform_handle), io_task_runner); |
| 269 } |
| 270 |
| 271 } // namespace edk |
| 272 } // namespace mojo |
OLD | NEW |