Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: mojo/edk/system/channel_posix.cc

Issue 2282413004: Support creating mojo peer connections from named pipes. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/channel.h" 5 #include "mojo/edk/system/channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <sys/socket.h> 8 #include <sys/socket.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after
204 DCHECK(!read_watcher_); 204 DCHECK(!read_watcher_);
205 DCHECK(!write_watcher_); 205 DCHECK(!write_watcher_);
206 for (auto handle : incoming_platform_handles_) 206 for (auto handle : incoming_platform_handles_)
207 handle.CloseIfNecessary(); 207 handle.CloseIfNecessary();
208 } 208 }
209 209
210 void StartOnIOThread() { 210 void StartOnIOThread() {
211 DCHECK(!read_watcher_); 211 DCHECK(!read_watcher_);
212 DCHECK(!write_watcher_); 212 DCHECK(!write_watcher_);
213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); 213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
215 base::MessageLoopForIO::current()->WatchFileDescriptor(
216 handle_.get().handle, true /* persistent */,
217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
218 base::MessageLoop::current()->AddDestructionObserver(this); 214 base::MessageLoop::current()->AddDestructionObserver(this);
215 if (handle_.get().needs_connection) {
216 base::MessageLoopForIO::current()->WatchFileDescriptor(
217 handle_.get().handle, false /* persistent */,
218 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
219 } else {
220 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
221 base::MessageLoopForIO::current()->WatchFileDescriptor(
222 handle_.get().handle, true /* persistent */,
223 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
224 base::AutoLock lock(write_lock_);
225 FlushOutgoingMessagesNoLock();
226 }
219 } 227 }
220 228
221 void WaitForWriteOnIOThread() { 229 void WaitForWriteOnIOThread() {
222 base::AutoLock lock(write_lock_); 230 base::AutoLock lock(write_lock_);
223 WaitForWriteOnIOThreadNoLock(); 231 WaitForWriteOnIOThreadNoLock();
224 } 232 }
225 233
226 void WaitForWriteOnIOThreadNoLock() { 234 void WaitForWriteOnIOThreadNoLock() {
227 if (pending_write_) 235 if (pending_write_)
228 return; 236 return;
(...skipping 29 matching lines...) Expand all
258 // base::MessageLoop::DestructionObserver: 266 // base::MessageLoop::DestructionObserver:
259 void WillDestroyCurrentMessageLoop() override { 267 void WillDestroyCurrentMessageLoop() override {
260 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 268 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
261 if (self_) 269 if (self_)
262 ShutDownOnIOThread(); 270 ShutDownOnIOThread();
263 } 271 }
264 272
265 // base::MessageLoopForIO::Watcher: 273 // base::MessageLoopForIO::Watcher:
266 void OnFileCanReadWithoutBlocking(int fd) override { 274 void OnFileCanReadWithoutBlocking(int fd) override {
267 CHECK_EQ(fd, handle_.get().handle); 275 CHECK_EQ(fd, handle_.get().handle);
276 if (handle_.get().needs_connection) {
277 #if !defined(OS_NACL)
278 read_watcher_.reset();
279 base::MessageLoop::current()->RemoveDestructionObserver(this);
280
281 ScopedPlatformHandle accept_fd;
282 ServerAcceptConnection(handle_.get(), &accept_fd);
283 if (!accept_fd.is_valid()) {
284 OnError();
285 return;
286 }
287 handle_ = std::move(accept_fd);
288 StartOnIOThread();
289 #else
290 NOTREACHED();
291 #endif
292 return;
293 }
268 294
269 bool read_error = false; 295 bool read_error = false;
270 size_t next_read_size = 0; 296 size_t next_read_size = 0;
271 size_t buffer_capacity = 0; 297 size_t buffer_capacity = 0;
272 size_t total_bytes_read = 0; 298 size_t total_bytes_read = 0;
273 size_t bytes_read = 0; 299 size_t bytes_read = 0;
274 do { 300 do {
275 buffer_capacity = next_read_size; 301 buffer_capacity = next_read_size;
276 char* buffer = GetReadBuffer(&buffer_capacity); 302 char* buffer = GetReadBuffer(&buffer_capacity);
277 DCHECK_GT(buffer_capacity, 0u); 303 DCHECK_GT(buffer_capacity, 0u);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
314 reject_writes_ = write_error = true; 340 reject_writes_ = write_error = true;
315 } 341 }
316 if (write_error) 342 if (write_error)
317 OnError(); 343 OnError();
318 } 344 }
319 345
320 // Attempts to write a message directly to the channel. If the full message 346 // Attempts to write a message directly to the channel. If the full message
321 // cannot be written, it's queued and a wait is initiated to write the message 347 // cannot be written, it's queued and a wait is initiated to write the message
322 // ASAP on the I/O thread. 348 // ASAP on the I/O thread.
323 bool WriteNoLock(MessageView message_view) { 349 bool WriteNoLock(MessageView message_view) {
350 if (handle_.get().needs_connection) {
351 outgoing_messages_.emplace_front(std::move(message_view));
352 return true;
353 }
324 size_t bytes_written = 0; 354 size_t bytes_written = 0;
325 do { 355 do {
326 message_view.advance_data_offset(bytes_written); 356 message_view.advance_data_offset(bytes_written);
327 357
328 ssize_t result; 358 ssize_t result;
329 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); 359 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
330 if (handles && handles->size()) { 360 if (handles && handles->size()) {
331 iovec iov = { 361 iovec iov = {
332 const_cast<void*>(message_view.data()), 362 const_cast<void*>(message_view.data()),
333 message_view.data_num_bytes() 363 message_view.data_num_bytes()
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after
511 // static 541 // static
512 scoped_refptr<Channel> Channel::Create( 542 scoped_refptr<Channel> Channel::Create(
513 Delegate* delegate, 543 Delegate* delegate,
514 ScopedPlatformHandle platform_handle, 544 ScopedPlatformHandle platform_handle,
515 scoped_refptr<base::TaskRunner> io_task_runner) { 545 scoped_refptr<base::TaskRunner> io_task_runner) {
516 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); 546 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
517 } 547 }
518 548
519 } // namespace edk 549 } // namespace edk
520 } // namespace mojo 550 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/embedder/platform_handle_utils_posix.cc ('k') | mojo/edk/system/multiprocess_message_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698