OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <string.h> | 8 #include <string.h> |
9 #include <unistd.h> | 9 #include <unistd.h> |
10 | 10 |
11 #include <algorithm> | 11 #include <algorithm> |
12 #include <deque> | 12 #include <deque> |
13 #include <vector> | 13 #include <vector> |
14 | 14 |
15 #include "base/basictypes.h" | 15 #include "base/basictypes.h" |
16 #include "base/bind.h" | 16 #include "base/bind.h" |
17 #include "base/compiler_specific.h" | 17 #include "base/compiler_specific.h" |
18 #include "base/location.h" | 18 #include "base/location.h" |
19 #include "base/logging.h" | 19 #include "base/logging.h" |
20 #include "base/memory/scoped_ptr.h" | 20 #include "base/memory/scoped_ptr.h" |
21 #include "base/memory/weak_ptr.h" | 21 #include "base/memory/weak_ptr.h" |
22 #include "base/message_loop/message_loop.h" | 22 #include "base/message_loop/message_loop.h" |
23 #include "base/posix/eintr_wrapper.h" | 23 #include "base/posix/eintr_wrapper.h" |
24 #include "base/synchronization/lock.h" | 24 #include "base/synchronization/lock.h" |
25 #include "mojo/system/message_in_transit.h" | 25 #include "mojo/system/message_in_transit.h" |
26 #include "mojo/system/platform_channel_handle.h" | 26 #include "mojo/system/platform_handle.h" |
27 | 27 |
28 namespace mojo { | 28 namespace mojo { |
29 namespace system { | 29 namespace system { |
30 | 30 |
31 namespace { | 31 namespace { |
32 | 32 |
33 const size_t kReadSize = 4096; | 33 const size_t kReadSize = 4096; |
34 | 34 |
35 class RawChannelPosix : public RawChannel, | 35 class RawChannelPosix : public RawChannel, |
36 public base::MessageLoopForIO::Watcher { | 36 public base::MessageLoopForIO::Watcher { |
37 public: | 37 public: |
38 RawChannelPosix(const PlatformChannelHandle& handle, | 38 RawChannelPosix(ScopedPlatformHandle handle, |
39 Delegate* delegate, | 39 Delegate* delegate, |
40 base::MessageLoop* message_loop); | 40 base::MessageLoop* message_loop); |
41 virtual ~RawChannelPosix(); | 41 virtual ~RawChannelPosix(); |
42 | 42 |
43 // |RawChannel| implementation: | 43 // |RawChannel| implementation: |
44 virtual bool Init() OVERRIDE; | 44 virtual bool Init() OVERRIDE; |
45 virtual void Shutdown() OVERRIDE; | 45 virtual void Shutdown() OVERRIDE; |
46 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; | 46 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; |
47 | 47 |
48 private: | 48 private: |
(...skipping 16 matching lines...) Expand all Loading... |
65 | 65 |
66 // Cancels all pending writes and destroys the contents of | 66 // Cancels all pending writes and destroys the contents of |
67 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets | 67 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets |
68 // |is_dead_| to true. Must be called under |write_lock_|. | 68 // |is_dead_| to true. Must be called under |write_lock_|. |
69 void CancelPendingWritesNoLock(); | 69 void CancelPendingWritesNoLock(); |
70 | 70 |
71 base::MessageLoopForIO* message_loop_for_io() { | 71 base::MessageLoopForIO* message_loop_for_io() { |
72 return static_cast<base::MessageLoopForIO*>(message_loop()); | 72 return static_cast<base::MessageLoopForIO*>(message_loop()); |
73 } | 73 } |
74 | 74 |
75 int fd_; | 75 ScopedPlatformHandle fd_; |
76 | 76 |
77 // Only used on the I/O thread: | 77 // Only used on the I/O thread: |
78 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; | 78 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
79 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; | 79 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
80 | 80 |
81 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| | 81 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| |
82 // is always aligned with a message boundary (we will copy memory to ensure | 82 // is always aligned with a message boundary (we will copy memory to ensure |
83 // this), but |read_buffer_| may be larger than the actual number of bytes we | 83 // this), but |read_buffer_| may be larger than the actual number of bytes we |
84 // have. | 84 // have. |
85 std::vector<char> read_buffer_; | 85 std::vector<char> read_buffer_; |
86 size_t read_buffer_num_valid_bytes_; | 86 size_t read_buffer_num_valid_bytes_; |
87 | 87 |
88 base::Lock write_lock_; // Protects the following members. | 88 base::Lock write_lock_; // Protects the following members. |
89 bool is_dead_; | 89 bool is_dead_; |
90 std::deque<MessageInTransit*> write_message_queue_; | 90 std::deque<MessageInTransit*> write_message_queue_; |
91 size_t write_message_offset_; | 91 size_t write_message_offset_; |
92 // This is used for posting tasks from write threads to the I/O thread. It | 92 // This is used for posting tasks from write threads to the I/O thread. It |
93 // must only be accessed under |write_lock_|. The weak pointers it produces | 93 // must only be accessed under |write_lock_|. The weak pointers it produces |
94 // are only used/invalidated on the I/O thread. | 94 // are only used/invalidated on the I/O thread. |
95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; | 95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; |
96 | 96 |
97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); | 97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); |
98 }; | 98 }; |
99 | 99 |
100 RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, | 100 RawChannelPosix::RawChannelPosix(ScopedPlatformHandle handle, |
101 Delegate* delegate, | 101 Delegate* delegate, |
102 base::MessageLoop* message_loop) | 102 base::MessageLoop* message_loop) |
103 : RawChannel(delegate, message_loop), | 103 : RawChannel(delegate, message_loop), |
104 fd_(handle.fd), | 104 fd_(handle.Pass()), |
105 read_buffer_num_valid_bytes_(0), | 105 read_buffer_num_valid_bytes_(0), |
106 is_dead_(false), | 106 is_dead_(false), |
107 write_message_offset_(0), | 107 write_message_offset_(0), |
108 weak_ptr_factory_(this) { | 108 weak_ptr_factory_(this) { |
109 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); | 109 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); |
110 DCHECK_NE(fd_, -1); | 110 DCHECK(fd_.is_valid()); |
111 } | 111 } |
112 | 112 |
113 RawChannelPosix::~RawChannelPosix() { | 113 RawChannelPosix::~RawChannelPosix() { |
114 DCHECK(is_dead_); | 114 DCHECK(is_dead_); |
115 DCHECK_EQ(fd_, -1); | 115 DCHECK(!fd_.is_valid()); |
116 | 116 |
117 // No need to take the |write_lock_| here -- if there are still weak pointers | 117 // No need to take the |write_lock_| here -- if there are still weak pointers |
118 // outstanding, then we're hosed anyway (since we wouldn't be able to | 118 // outstanding, then we're hosed anyway (since we wouldn't be able to |
119 // invalidate them cleanly, since we might not be on the I/O thread). | 119 // invalidate them cleanly, since we might not be on the I/O thread). |
120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
121 | 121 |
122 // These must have been shut down/destroyed on the I/O thread. | 122 // These must have been shut down/destroyed on the I/O thread. |
123 DCHECK(!read_watcher_.get()); | 123 DCHECK(!read_watcher_.get()); |
124 DCHECK(!write_watcher_.get()); | 124 DCHECK(!write_watcher_.get()); |
125 } | 125 } |
126 | 126 |
127 bool RawChannelPosix::Init() { | 127 bool RawChannelPosix::Init() { |
128 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 128 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
129 | 129 |
130 DCHECK(!read_watcher_.get()); | 130 DCHECK(!read_watcher_.get()); |
131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
132 DCHECK(!write_watcher_.get()); | 132 DCHECK(!write_watcher_.get()); |
133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); | 133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); |
134 | 134 |
135 // No need to take the lock. No one should be using us yet. | 135 // No need to take the lock. No one should be using us yet. |
136 DCHECK(write_message_queue_.empty()); | 136 DCHECK(write_message_queue_.empty()); |
137 | 137 |
138 if (!message_loop_for_io()->WatchFileDescriptor(fd_, true, | 138 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, |
139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { | 139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { |
140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly | 140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly |
141 // (in the sense of returning the message loop's state to what it was before | 141 // (in the sense of returning the message loop's state to what it was before |
142 // it was called). | 142 // it was called). |
143 read_watcher_.reset(); | 143 read_watcher_.reset(); |
144 write_watcher_.reset(); | 144 write_watcher_.reset(); |
145 return false; | 145 return false; |
146 } | 146 } |
147 | 147 |
148 return true; | 148 return true; |
149 } | 149 } |
150 | 150 |
151 void RawChannelPosix::Shutdown() { | 151 void RawChannelPosix::Shutdown() { |
152 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 152 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
153 | 153 |
154 base::AutoLock locker(write_lock_); | 154 base::AutoLock locker(write_lock_); |
155 if (!is_dead_) | 155 if (!is_dead_) |
156 CancelPendingWritesNoLock(); | 156 CancelPendingWritesNoLock(); |
157 | 157 |
158 DCHECK_NE(fd_, -1); | 158 DCHECK(fd_.is_valid()); |
159 if (close(fd_) != 0) | 159 fd_.reset(); |
160 PLOG(ERROR) << "close"; | |
161 fd_ = -1; | |
162 | 160 |
163 weak_ptr_factory_.InvalidateWeakPtrs(); | 161 weak_ptr_factory_.InvalidateWeakPtrs(); |
164 | 162 |
165 read_watcher_.reset(); // This will stop watching (if necessary). | 163 read_watcher_.reset(); // This will stop watching (if necessary). |
166 write_watcher_.reset(); // This will stop watching (if necessary). | 164 write_watcher_.reset(); // This will stop watching (if necessary). |
167 } | 165 } |
168 | 166 |
169 // Reminder: This must be thread-safe, and takes ownership of |message| on | 167 // Reminder: This must be thread-safe, and takes ownership of |message| on |
170 // success. | 168 // success. |
171 bool RawChannelPosix::WriteMessage(MessageInTransit* message) { | 169 bool RawChannelPosix::WriteMessage(MessageInTransit* message) { |
(...skipping 29 matching lines...) Expand all Loading... |
201 message_loop()->PostTask(FROM_HERE, | 199 message_loop()->PostTask(FROM_HERE, |
202 base::Bind(&RawChannelPosix::WaitToWrite, | 200 base::Bind(&RawChannelPosix::WaitToWrite, |
203 weak_ptr_factory_.GetWeakPtr())); | 201 weak_ptr_factory_.GetWeakPtr())); |
204 } | 202 } |
205 } | 203 } |
206 | 204 |
207 return result; | 205 return result; |
208 } | 206 } |
209 | 207 |
210 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { | 208 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { |
211 DCHECK_EQ(fd, fd_); | 209 DCHECK_EQ(fd, fd_.get().fd); |
212 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 210 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
213 | 211 |
214 bool did_dispatch_message = false; | 212 bool did_dispatch_message = false; |
215 // Tracks the offset of the first undispatched message in |read_buffer_|. | 213 // Tracks the offset of the first undispatched message in |read_buffer_|. |
216 // Currently, we copy data to ensure that this is zero at the beginning. | 214 // Currently, we copy data to ensure that this is zero at the beginning. |
217 size_t read_buffer_start = 0; | 215 size_t read_buffer_start = 0; |
218 for (;;) { | 216 for (;;) { |
219 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) | 217 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) |
220 < kReadSize) { | 218 < kReadSize) { |
221 // Use power-of-2 buffer sizes. | 219 // Use power-of-2 buffer sizes. |
222 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | 220 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
223 // maximum message size to whatever extent necessary). | 221 // maximum message size to whatever extent necessary). |
224 // TODO(vtl): We may often be able to peek at the header and get the real | 222 // TODO(vtl): We may often be able to peek at the header and get the real |
225 // required extra space (which may be much bigger than |kReadSize|). | 223 // required extra space (which may be much bigger than |kReadSize|). |
226 size_t new_size = std::max(read_buffer_.size(), kReadSize); | 224 size_t new_size = std::max(read_buffer_.size(), kReadSize); |
227 while (new_size < | 225 while (new_size < |
228 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) | 226 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) |
229 new_size *= 2; | 227 new_size *= 2; |
230 | 228 |
231 // TODO(vtl): It's suboptimal to zero out the fresh memory. | 229 // TODO(vtl): It's suboptimal to zero out the fresh memory. |
232 read_buffer_.resize(new_size, 0); | 230 read_buffer_.resize(new_size, 0); |
233 } | 231 } |
234 | 232 |
235 ssize_t bytes_read = HANDLE_EINTR( | 233 ssize_t bytes_read = HANDLE_EINTR( |
236 read(fd_, | 234 read(fd_.get().fd, |
237 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], | 235 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], |
238 kReadSize)); | 236 kReadSize)); |
239 if (bytes_read < 0) { | 237 if (bytes_read < 0) { |
240 if (errno != EAGAIN && errno != EWOULDBLOCK) { | 238 if (errno != EAGAIN && errno != EWOULDBLOCK) { |
241 PLOG(ERROR) << "read"; | 239 PLOG(ERROR) << "read"; |
242 { | 240 { |
243 base::AutoLock locker(write_lock_); | 241 base::AutoLock locker(write_lock_); |
244 CancelPendingWritesNoLock(); | 242 CancelPendingWritesNoLock(); |
245 } | 243 } |
246 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); | 244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
298 if (read_buffer_start > 0) { | 296 if (read_buffer_start > 0) { |
299 if (read_buffer_num_valid_bytes_ > 0) { | 297 if (read_buffer_num_valid_bytes_ > 0) { |
300 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], | 298 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
301 read_buffer_num_valid_bytes_); | 299 read_buffer_num_valid_bytes_); |
302 } | 300 } |
303 read_buffer_start = 0; | 301 read_buffer_start = 0; |
304 } | 302 } |
305 } | 303 } |
306 | 304 |
307 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { | 305 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { |
308 DCHECK_EQ(fd, fd_); | 306 DCHECK_EQ(fd, fd_.get().fd); |
309 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 307 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
310 | 308 |
311 bool did_fail = false; | 309 bool did_fail = false; |
312 { | 310 { |
313 base::AutoLock locker(write_lock_); | 311 base::AutoLock locker(write_lock_); |
314 DCHECK(!is_dead_); | 312 DCHECK(!is_dead_); |
315 DCHECK(!write_message_queue_.empty()); | 313 DCHECK(!write_message_queue_.empty()); |
316 | 314 |
317 bool result = WriteFrontMessageNoLock(); | 315 bool result = WriteFrontMessageNoLock(); |
318 DCHECK(result || write_message_queue_.empty()); | 316 DCHECK(result || write_message_queue_.empty()); |
319 | 317 |
320 if (!result) | 318 if (!result) |
321 did_fail = true; | 319 did_fail = true; |
322 else if (!write_message_queue_.empty()) | 320 else if (!write_message_queue_.empty()) |
323 WaitToWrite(); | 321 WaitToWrite(); |
324 } | 322 } |
325 if (did_fail) | 323 if (did_fail) |
326 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | 324 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
327 } | 325 } |
328 | 326 |
329 void RawChannelPosix::WaitToWrite() { | 327 void RawChannelPosix::WaitToWrite() { |
330 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 328 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
331 | 329 |
332 DCHECK(write_watcher_.get()); | 330 DCHECK(write_watcher_.get()); |
333 bool result = message_loop_for_io()->WatchFileDescriptor( | 331 bool result = message_loop_for_io()->WatchFileDescriptor( |
334 fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), | 332 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, |
335 this); | 333 write_watcher_.get(), this); |
336 DCHECK(result); | 334 DCHECK(result); |
337 } | 335 } |
338 | 336 |
339 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { | 337 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { |
340 DCHECK_EQ(base::MessageLoop::current(), message_loop()); | 338 DCHECK_EQ(base::MessageLoop::current(), message_loop()); |
341 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 339 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
342 delegate()->OnFatalError(fatal_error); | 340 delegate()->OnFatalError(fatal_error); |
343 } | 341 } |
344 | 342 |
345 bool RawChannelPosix::WriteFrontMessageNoLock() { | 343 bool RawChannelPosix::WriteFrontMessageNoLock() { |
346 write_lock_.AssertAcquired(); | 344 write_lock_.AssertAcquired(); |
347 | 345 |
348 DCHECK(!is_dead_); | 346 DCHECK(!is_dead_); |
349 DCHECK(!write_message_queue_.empty()); | 347 DCHECK(!write_message_queue_.empty()); |
350 | 348 |
351 MessageInTransit* message = write_message_queue_.front(); | 349 MessageInTransit* message = write_message_queue_.front(); |
352 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); | 350 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); |
353 size_t bytes_to_write = | 351 size_t bytes_to_write = |
354 message->size_with_header_and_padding() - write_message_offset_; | 352 message->size_with_header_and_padding() - write_message_offset_; |
355 ssize_t bytes_written = HANDLE_EINTR( | 353 ssize_t bytes_written = HANDLE_EINTR( |
356 write(fd_, | 354 write(fd_.get().fd, |
357 reinterpret_cast<char*>(message) + write_message_offset_, | 355 reinterpret_cast<char*>(message) + write_message_offset_, |
358 bytes_to_write)); | 356 bytes_to_write)); |
359 if (bytes_written < 0) { | 357 if (bytes_written < 0) { |
360 if (errno != EAGAIN && errno != EWOULDBLOCK) { | 358 if (errno != EAGAIN && errno != EWOULDBLOCK) { |
361 PLOG(ERROR) << "write of size " << bytes_to_write; | 359 PLOG(ERROR) << "write of size " << bytes_to_write; |
362 CancelPendingWritesNoLock(); | 360 CancelPendingWritesNoLock(); |
363 return false; | 361 return false; |
364 } | 362 } |
365 | 363 |
366 // We simply failed to write since we'd block. The logic is the same as if | 364 // We simply failed to write since we'd block. The logic is the same as if |
(...skipping 28 matching lines...) Expand all Loading... |
395 } | 393 } |
396 write_message_queue_.clear(); | 394 write_message_queue_.clear(); |
397 } | 395 } |
398 | 396 |
399 } // namespace | 397 } // namespace |
400 | 398 |
401 // ----------------------------------------------------------------------------- | 399 // ----------------------------------------------------------------------------- |
402 | 400 |
403 // Static factory method declared in raw_channel.h. | 401 // Static factory method declared in raw_channel.h. |
404 // static | 402 // static |
405 RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, | 403 RawChannel* RawChannel::Create(ScopedPlatformHandle handle, |
406 Delegate* delegate, | 404 Delegate* delegate, |
407 base::MessageLoop* message_loop) { | 405 base::MessageLoop* message_loop) { |
408 return new RawChannelPosix(handle, delegate, message_loop); | 406 return new RawChannelPosix(handle.Pass(), delegate, message_loop); |
409 } | 407 } |
410 | 408 |
411 } // namespace system | 409 } // namespace system |
412 } // namespace mojo | 410 } // namespace mojo |
OLD | NEW |