Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(144)

Side by Side Diff: mojo/system/raw_channel_posix.cc

Issue 137273003: Mojo: Rename PlatformChannelHandle to PlatformHandle, etc. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/raw_channel.h ('k') | mojo/system/raw_channel_posix_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <string.h> 8 #include <string.h>
9 #include <unistd.h> 9 #include <unistd.h>
10 10
11 #include <algorithm> 11 #include <algorithm>
12 #include <deque> 12 #include <deque>
13 #include <vector> 13 #include <vector>
14 14
15 #include "base/basictypes.h" 15 #include "base/basictypes.h"
16 #include "base/bind.h" 16 #include "base/bind.h"
17 #include "base/compiler_specific.h" 17 #include "base/compiler_specific.h"
18 #include "base/location.h" 18 #include "base/location.h"
19 #include "base/logging.h" 19 #include "base/logging.h"
20 #include "base/memory/scoped_ptr.h" 20 #include "base/memory/scoped_ptr.h"
21 #include "base/memory/weak_ptr.h" 21 #include "base/memory/weak_ptr.h"
22 #include "base/message_loop/message_loop.h" 22 #include "base/message_loop/message_loop.h"
23 #include "base/posix/eintr_wrapper.h" 23 #include "base/posix/eintr_wrapper.h"
24 #include "base/synchronization/lock.h" 24 #include "base/synchronization/lock.h"
25 #include "mojo/system/message_in_transit.h" 25 #include "mojo/system/message_in_transit.h"
26 #include "mojo/system/platform_channel_handle.h" 26 #include "mojo/system/platform_handle.h"
27 27
28 namespace mojo { 28 namespace mojo {
29 namespace system { 29 namespace system {
30 30
31 namespace { 31 namespace {
32 32
33 const size_t kReadSize = 4096; 33 const size_t kReadSize = 4096;
34 34
35 class RawChannelPosix : public RawChannel, 35 class RawChannelPosix : public RawChannel,
36 public base::MessageLoopForIO::Watcher { 36 public base::MessageLoopForIO::Watcher {
37 public: 37 public:
38 RawChannelPosix(const PlatformChannelHandle& handle, 38 RawChannelPosix(ScopedPlatformHandle handle,
39 Delegate* delegate, 39 Delegate* delegate,
40 base::MessageLoop* message_loop); 40 base::MessageLoop* message_loop);
41 virtual ~RawChannelPosix(); 41 virtual ~RawChannelPosix();
42 42
43 // |RawChannel| implementation: 43 // |RawChannel| implementation:
44 virtual bool Init() OVERRIDE; 44 virtual bool Init() OVERRIDE;
45 virtual void Shutdown() OVERRIDE; 45 virtual void Shutdown() OVERRIDE;
46 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; 46 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
47 47
48 private: 48 private:
(...skipping 16 matching lines...) Expand all
65 65
66 // Cancels all pending writes and destroys the contents of 66 // Cancels all pending writes and destroys the contents of
67 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets 67 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
68 // |is_dead_| to true. Must be called under |write_lock_|. 68 // |is_dead_| to true. Must be called under |write_lock_|.
69 void CancelPendingWritesNoLock(); 69 void CancelPendingWritesNoLock();
70 70
71 base::MessageLoopForIO* message_loop_for_io() { 71 base::MessageLoopForIO* message_loop_for_io() {
72 return static_cast<base::MessageLoopForIO*>(message_loop()); 72 return static_cast<base::MessageLoopForIO*>(message_loop());
73 } 73 }
74 74
75 int fd_; 75 ScopedPlatformHandle fd_;
76 76
77 // Only used on the I/O thread: 77 // Only used on the I/O thread:
78 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 78 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
79 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 79 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
80 80
81 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 81 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
82 // is always aligned with a message boundary (we will copy memory to ensure 82 // is always aligned with a message boundary (we will copy memory to ensure
83 // this), but |read_buffer_| may be larger than the actual number of bytes we 83 // this), but |read_buffer_| may be larger than the actual number of bytes we
84 // have. 84 // have.
85 std::vector<char> read_buffer_; 85 std::vector<char> read_buffer_;
86 size_t read_buffer_num_valid_bytes_; 86 size_t read_buffer_num_valid_bytes_;
87 87
88 base::Lock write_lock_; // Protects the following members. 88 base::Lock write_lock_; // Protects the following members.
89 bool is_dead_; 89 bool is_dead_;
90 std::deque<MessageInTransit*> write_message_queue_; 90 std::deque<MessageInTransit*> write_message_queue_;
91 size_t write_message_offset_; 91 size_t write_message_offset_;
92 // This is used for posting tasks from write threads to the I/O thread. It 92 // This is used for posting tasks from write threads to the I/O thread. It
93 // must only be accessed under |write_lock_|. The weak pointers it produces 93 // must only be accessed under |write_lock_|. The weak pointers it produces
94 // are only used/invalidated on the I/O thread. 94 // are only used/invalidated on the I/O thread.
95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
96 96
97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
98 }; 98 };
99 99
100 RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, 100 RawChannelPosix::RawChannelPosix(ScopedPlatformHandle handle,
101 Delegate* delegate, 101 Delegate* delegate,
102 base::MessageLoop* message_loop) 102 base::MessageLoop* message_loop)
103 : RawChannel(delegate, message_loop), 103 : RawChannel(delegate, message_loop),
104 fd_(handle.fd), 104 fd_(handle.Pass()),
105 read_buffer_num_valid_bytes_(0), 105 read_buffer_num_valid_bytes_(0),
106 is_dead_(false), 106 is_dead_(false),
107 write_message_offset_(0), 107 write_message_offset_(0),
108 weak_ptr_factory_(this) { 108 weak_ptr_factory_(this) {
109 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); 109 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
110 DCHECK_NE(fd_, -1); 110 DCHECK(fd_.is_valid());
111 } 111 }
112 112
113 RawChannelPosix::~RawChannelPosix() { 113 RawChannelPosix::~RawChannelPosix() {
114 DCHECK(is_dead_); 114 DCHECK(is_dead_);
115 DCHECK_EQ(fd_, -1); 115 DCHECK(!fd_.is_valid());
116 116
117 // No need to take the |write_lock_| here -- if there are still weak pointers 117 // No need to take the |write_lock_| here -- if there are still weak pointers
118 // outstanding, then we're hosed anyway (since we wouldn't be able to 118 // outstanding, then we're hosed anyway (since we wouldn't be able to
119 // invalidate them cleanly, since we might not be on the I/O thread). 119 // invalidate them cleanly, since we might not be on the I/O thread).
120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 120 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
121 121
122 // These must have been shut down/destroyed on the I/O thread. 122 // These must have been shut down/destroyed on the I/O thread.
123 DCHECK(!read_watcher_.get()); 123 DCHECK(!read_watcher_.get());
124 DCHECK(!write_watcher_.get()); 124 DCHECK(!write_watcher_.get());
125 } 125 }
126 126
127 bool RawChannelPosix::Init() { 127 bool RawChannelPosix::Init() {
128 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 128 DCHECK_EQ(base::MessageLoop::current(), message_loop());
129 129
130 DCHECK(!read_watcher_.get()); 130 DCHECK(!read_watcher_.get());
131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
132 DCHECK(!write_watcher_.get()); 132 DCHECK(!write_watcher_.get());
133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
134 134
135 // No need to take the lock. No one should be using us yet. 135 // No need to take the lock. No one should be using us yet.
136 DCHECK(write_message_queue_.empty()); 136 DCHECK(write_message_queue_.empty());
137 137
138 if (!message_loop_for_io()->WatchFileDescriptor(fd_, true, 138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
141 // (in the sense of returning the message loop's state to what it was before 141 // (in the sense of returning the message loop's state to what it was before
142 // it was called). 142 // it was called).
143 read_watcher_.reset(); 143 read_watcher_.reset();
144 write_watcher_.reset(); 144 write_watcher_.reset();
145 return false; 145 return false;
146 } 146 }
147 147
148 return true; 148 return true;
149 } 149 }
150 150
151 void RawChannelPosix::Shutdown() { 151 void RawChannelPosix::Shutdown() {
152 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 152 DCHECK_EQ(base::MessageLoop::current(), message_loop());
153 153
154 base::AutoLock locker(write_lock_); 154 base::AutoLock locker(write_lock_);
155 if (!is_dead_) 155 if (!is_dead_)
156 CancelPendingWritesNoLock(); 156 CancelPendingWritesNoLock();
157 157
158 DCHECK_NE(fd_, -1); 158 DCHECK(fd_.is_valid());
159 if (close(fd_) != 0) 159 fd_.reset();
160 PLOG(ERROR) << "close";
161 fd_ = -1;
162 160
163 weak_ptr_factory_.InvalidateWeakPtrs(); 161 weak_ptr_factory_.InvalidateWeakPtrs();
164 162
165 read_watcher_.reset(); // This will stop watching (if necessary). 163 read_watcher_.reset(); // This will stop watching (if necessary).
166 write_watcher_.reset(); // This will stop watching (if necessary). 164 write_watcher_.reset(); // This will stop watching (if necessary).
167 } 165 }
168 166
169 // Reminder: This must be thread-safe, and takes ownership of |message| on 167 // Reminder: This must be thread-safe, and takes ownership of |message| on
170 // success. 168 // success.
171 bool RawChannelPosix::WriteMessage(MessageInTransit* message) { 169 bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
(...skipping 29 matching lines...) Expand all
201 message_loop()->PostTask(FROM_HERE, 199 message_loop()->PostTask(FROM_HERE,
202 base::Bind(&RawChannelPosix::WaitToWrite, 200 base::Bind(&RawChannelPosix::WaitToWrite,
203 weak_ptr_factory_.GetWeakPtr())); 201 weak_ptr_factory_.GetWeakPtr()));
204 } 202 }
205 } 203 }
206 204
207 return result; 205 return result;
208 } 206 }
209 207
210 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 208 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
211 DCHECK_EQ(fd, fd_); 209 DCHECK_EQ(fd, fd_.get().fd);
212 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 210 DCHECK_EQ(base::MessageLoop::current(), message_loop());
213 211
214 bool did_dispatch_message = false; 212 bool did_dispatch_message = false;
215 // Tracks the offset of the first undispatched message in |read_buffer_|. 213 // Tracks the offset of the first undispatched message in |read_buffer_|.
216 // Currently, we copy data to ensure that this is zero at the beginning. 214 // Currently, we copy data to ensure that this is zero at the beginning.
217 size_t read_buffer_start = 0; 215 size_t read_buffer_start = 0;
218 for (;;) { 216 for (;;) {
219 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 217 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
220 < kReadSize) { 218 < kReadSize) {
221 // Use power-of-2 buffer sizes. 219 // Use power-of-2 buffer sizes.
222 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 220 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
223 // maximum message size to whatever extent necessary). 221 // maximum message size to whatever extent necessary).
224 // TODO(vtl): We may often be able to peek at the header and get the real 222 // TODO(vtl): We may often be able to peek at the header and get the real
225 // required extra space (which may be much bigger than |kReadSize|). 223 // required extra space (which may be much bigger than |kReadSize|).
226 size_t new_size = std::max(read_buffer_.size(), kReadSize); 224 size_t new_size = std::max(read_buffer_.size(), kReadSize);
227 while (new_size < 225 while (new_size <
228 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 226 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
229 new_size *= 2; 227 new_size *= 2;
230 228
231 // TODO(vtl): It's suboptimal to zero out the fresh memory. 229 // TODO(vtl): It's suboptimal to zero out the fresh memory.
232 read_buffer_.resize(new_size, 0); 230 read_buffer_.resize(new_size, 0);
233 } 231 }
234 232
235 ssize_t bytes_read = HANDLE_EINTR( 233 ssize_t bytes_read = HANDLE_EINTR(
236 read(fd_, 234 read(fd_.get().fd,
237 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], 235 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
238 kReadSize)); 236 kReadSize));
239 if (bytes_read < 0) { 237 if (bytes_read < 0) {
240 if (errno != EAGAIN && errno != EWOULDBLOCK) { 238 if (errno != EAGAIN && errno != EWOULDBLOCK) {
241 PLOG(ERROR) << "read"; 239 PLOG(ERROR) << "read";
242 { 240 {
243 base::AutoLock locker(write_lock_); 241 base::AutoLock locker(write_lock_);
244 CancelPendingWritesNoLock(); 242 CancelPendingWritesNoLock();
245 } 243 }
246 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 if (read_buffer_start > 0) { 296 if (read_buffer_start > 0) {
299 if (read_buffer_num_valid_bytes_ > 0) { 297 if (read_buffer_num_valid_bytes_ > 0) {
300 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], 298 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
301 read_buffer_num_valid_bytes_); 299 read_buffer_num_valid_bytes_);
302 } 300 }
303 read_buffer_start = 0; 301 read_buffer_start = 0;
304 } 302 }
305 } 303 }
306 304
307 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 305 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
308 DCHECK_EQ(fd, fd_); 306 DCHECK_EQ(fd, fd_.get().fd);
309 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 307 DCHECK_EQ(base::MessageLoop::current(), message_loop());
310 308
311 bool did_fail = false; 309 bool did_fail = false;
312 { 310 {
313 base::AutoLock locker(write_lock_); 311 base::AutoLock locker(write_lock_);
314 DCHECK(!is_dead_); 312 DCHECK(!is_dead_);
315 DCHECK(!write_message_queue_.empty()); 313 DCHECK(!write_message_queue_.empty());
316 314
317 bool result = WriteFrontMessageNoLock(); 315 bool result = WriteFrontMessageNoLock();
318 DCHECK(result || write_message_queue_.empty()); 316 DCHECK(result || write_message_queue_.empty());
319 317
320 if (!result) 318 if (!result)
321 did_fail = true; 319 did_fail = true;
322 else if (!write_message_queue_.empty()) 320 else if (!write_message_queue_.empty())
323 WaitToWrite(); 321 WaitToWrite();
324 } 322 }
325 if (did_fail) 323 if (did_fail)
326 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 324 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
327 } 325 }
328 326
329 void RawChannelPosix::WaitToWrite() { 327 void RawChannelPosix::WaitToWrite() {
330 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 328 DCHECK_EQ(base::MessageLoop::current(), message_loop());
331 329
332 DCHECK(write_watcher_.get()); 330 DCHECK(write_watcher_.get());
333 bool result = message_loop_for_io()->WatchFileDescriptor( 331 bool result = message_loop_for_io()->WatchFileDescriptor(
334 fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), 332 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
335 this); 333 write_watcher_.get(), this);
336 DCHECK(result); 334 DCHECK(result);
337 } 335 }
338 336
339 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { 337 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
340 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 338 DCHECK_EQ(base::MessageLoop::current(), message_loop());
341 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 339 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
342 delegate()->OnFatalError(fatal_error); 340 delegate()->OnFatalError(fatal_error);
343 } 341 }
344 342
345 bool RawChannelPosix::WriteFrontMessageNoLock() { 343 bool RawChannelPosix::WriteFrontMessageNoLock() {
346 write_lock_.AssertAcquired(); 344 write_lock_.AssertAcquired();
347 345
348 DCHECK(!is_dead_); 346 DCHECK(!is_dead_);
349 DCHECK(!write_message_queue_.empty()); 347 DCHECK(!write_message_queue_.empty());
350 348
351 MessageInTransit* message = write_message_queue_.front(); 349 MessageInTransit* message = write_message_queue_.front();
352 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); 350 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
353 size_t bytes_to_write = 351 size_t bytes_to_write =
354 message->size_with_header_and_padding() - write_message_offset_; 352 message->size_with_header_and_padding() - write_message_offset_;
355 ssize_t bytes_written = HANDLE_EINTR( 353 ssize_t bytes_written = HANDLE_EINTR(
356 write(fd_, 354 write(fd_.get().fd,
357 reinterpret_cast<char*>(message) + write_message_offset_, 355 reinterpret_cast<char*>(message) + write_message_offset_,
358 bytes_to_write)); 356 bytes_to_write));
359 if (bytes_written < 0) { 357 if (bytes_written < 0) {
360 if (errno != EAGAIN && errno != EWOULDBLOCK) { 358 if (errno != EAGAIN && errno != EWOULDBLOCK) {
361 PLOG(ERROR) << "write of size " << bytes_to_write; 359 PLOG(ERROR) << "write of size " << bytes_to_write;
362 CancelPendingWritesNoLock(); 360 CancelPendingWritesNoLock();
363 return false; 361 return false;
364 } 362 }
365 363
366 // We simply failed to write since we'd block. The logic is the same as if 364 // We simply failed to write since we'd block. The logic is the same as if
(...skipping 28 matching lines...) Expand all
395 } 393 }
396 write_message_queue_.clear(); 394 write_message_queue_.clear();
397 } 395 }
398 396
399 } // namespace 397 } // namespace
400 398
401 // ----------------------------------------------------------------------------- 399 // -----------------------------------------------------------------------------
402 400
403 // Static factory method declared in raw_channel.h. 401 // Static factory method declared in raw_channel.h.
404 // static 402 // static
405 RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, 403 RawChannel* RawChannel::Create(ScopedPlatformHandle handle,
406 Delegate* delegate, 404 Delegate* delegate,
407 base::MessageLoop* message_loop) { 405 base::MessageLoop* message_loop) {
408 return new RawChannelPosix(handle, delegate, message_loop); 406 return new RawChannelPosix(handle.Pass(), delegate, message_loop);
409 } 407 }
410 408
411 } // namespace system 409 } // namespace system
412 } // namespace mojo 410 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/raw_channel.h ('k') | mojo/system/raw_channel_posix_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698