Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: mojo/system/raw_channel_posix.cc

Issue 463883004: Mojo: Properly cancel further read on all read failures in RawChannelPosix. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <sys/uio.h> 8 #include <sys/uio.h>
9 #include <unistd.h> 9 #include <unistd.h>
10 10
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 size_t* bytes_written) OVERRIDE; 56 size_t* bytes_written) OVERRIDE;
57 virtual IOResult ScheduleWriteNoLock() OVERRIDE; 57 virtual IOResult ScheduleWriteNoLock() OVERRIDE;
58 virtual bool OnInit() OVERRIDE; 58 virtual bool OnInit() OVERRIDE;
59 virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, 59 virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
60 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; 60 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
61 61
62 // |base::MessageLoopForIO::Watcher| implementation: 62 // |base::MessageLoopForIO::Watcher| implementation:
63 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 63 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
64 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 64 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
65 65
66 // Implements most of |Read()| (except for a bit of clean-up):
67 IOResult ReadImpl(size_t* bytes_read);
68
66 // Watches for |fd_| to become writable. Must be called on the I/O thread. 69 // Watches for |fd_| to become writable. Must be called on the I/O thread.
67 void WaitToWrite(); 70 void WaitToWrite();
68 71
69 embedder::ScopedPlatformHandle fd_; 72 embedder::ScopedPlatformHandle fd_;
70 73
71 // The following members are only used on the I/O thread: 74 // The following members are only used on the I/O thread:
72 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 75 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
73 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
74 77
75 bool pending_read_; 78 bool pending_read_;
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
166 return true; 169 return true;
167 } 170 }
168 171
169 return RawChannel::OnReadMessageForRawChannel(message_view); 172 return RawChannel::OnReadMessageForRawChannel(message_view);
170 } 173 }
171 174
172 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { 175 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
173 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 176 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
174 DCHECK(!pending_read_); 177 DCHECK(!pending_read_);
175 178
176 char* buffer = NULL; 179 IOResult rv = ReadImpl(bytes_read);
yzshen1 2014/08/12 21:21:17 optional: I think for windows code, sometimes we u
177 size_t bytes_to_read = 0; 180 if (rv != IO_SUCCEEDED && rv != IO_PENDING) {
178 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
179
180 size_t old_num_platform_handles = read_platform_handles_.size();
181 ssize_t read_result = embedder::PlatformChannelRecvmsg(
182 fd_.get(), buffer, bytes_to_read, &read_platform_handles_);
183 if (read_platform_handles_.size() > old_num_platform_handles) {
184 DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
185 embedder::kPlatformChannelMaxNumHandles);
186
187 // We should never accumulate more than |TransportData::kMaxPlatformHandles
188 // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
189 // possible because we could have accumulated all the handles for a message,
190 // then received the message data plus the first set of handles for the next
191 // message in the subsequent |recvmsg()|.)
192 if (read_platform_handles_.size() >
193 (TransportData::kMaxPlatformHandles +
194 embedder::kPlatformChannelMaxNumHandles)) {
195 LOG(ERROR) << "Received too many platform handles";
196 embedder::CloseAllPlatformHandles(&read_platform_handles_);
197 read_platform_handles_.clear();
198 return IO_FAILED_UNKNOWN;
199 }
200 }
201
202 if (read_result > 0) {
203 *bytes_read = static_cast<size_t>(read_result);
204 return IO_SUCCEEDED;
205 }
206
207 // |read_result == 0| means "end of file".
208 if (read_result == 0) {
209 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. 181 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
210 read_watcher_.reset(); 182 read_watcher_.reset();
211 return IO_FAILED_SHUTDOWN;
212 } 183 }
213
214 if (errno == EAGAIN || errno == EWOULDBLOCK)
215 return ScheduleRead();
216
217 IOResult rv = IO_FAILED_UNKNOWN;
218 if (errno == ECONNRESET)
219 rv = IO_FAILED_BROKEN;
220 else
221 PLOG(WARNING) << "recvmsg";
222
223 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
224 read_watcher_.reset();
225 return rv; 184 return rv;
226 } 185 }
227 186
228 RawChannel::IOResult RawChannelPosix::ScheduleRead() { 187 RawChannel::IOResult RawChannelPosix::ScheduleRead() {
229 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 188 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
230 DCHECK(!pending_read_); 189 DCHECK(!pending_read_);
231 190
232 pending_read_ = true; 191 pending_read_ = true;
233 192
234 return IO_PENDING; 193 return IO_PENDING;
(...skipping 203 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 DCHECK(pending_write_); 397 DCHECK(pending_write_);
439 398
440 pending_write_ = false; 399 pending_write_ = false;
441 io_result = WriteNoLock(&platform_handles_written, &bytes_written); 400 io_result = WriteNoLock(&platform_handles_written, &bytes_written);
442 } 401 }
443 402
444 if (io_result != IO_PENDING) 403 if (io_result != IO_PENDING)
445 OnWriteCompleted(io_result, platform_handles_written, bytes_written); 404 OnWriteCompleted(io_result, platform_handles_written, bytes_written);
446 } 405 }
447 406
407 RawChannel::IOResult RawChannelPosix::ReadImpl(size_t* bytes_read) {
408 char* buffer = NULL;
409 size_t bytes_to_read = 0;
410 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
411
412 size_t old_num_platform_handles = read_platform_handles_.size();
413 ssize_t read_result = embedder::PlatformChannelRecvmsg(
414 fd_.get(), buffer, bytes_to_read, &read_platform_handles_);
415 if (read_platform_handles_.size() > old_num_platform_handles) {
416 DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
417 embedder::kPlatformChannelMaxNumHandles);
418
419 // We should never accumulate more than |TransportData::kMaxPlatformHandles
420 // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
421 // possible because we could have accumulated all the handles for a message,
422 // then received the message data plus the first set of handles for the next
423 // message in the subsequent |recvmsg()|.)
424 if (read_platform_handles_.size() >
425 (TransportData::kMaxPlatformHandles +
426 embedder::kPlatformChannelMaxNumHandles)) {
427 LOG(ERROR) << "Received too many platform handles";
428 embedder::CloseAllPlatformHandles(&read_platform_handles_);
429 read_platform_handles_.clear();
430 return IO_FAILED_UNKNOWN;
431 }
432 }
433
434 if (read_result > 0) {
435 *bytes_read = static_cast<size_t>(read_result);
436 return IO_SUCCEEDED;
437 }
438
439 // |read_result == 0| means "end of file".
440 if (read_result == 0)
441 return IO_FAILED_SHUTDOWN;
442
443 if (errno == EAGAIN || errno == EWOULDBLOCK)
444 return ScheduleRead();
445
446 if (errno == ECONNRESET)
447 return IO_FAILED_BROKEN;
448
449 PLOG(WARNING) << "recvmsg";
450 return IO_FAILED_UNKNOWN;
451 }
452
448 void RawChannelPosix::WaitToWrite() { 453 void RawChannelPosix::WaitToWrite() {
449 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 454 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
450 455
451 DCHECK(write_watcher_); 456 DCHECK(write_watcher_);
452 457
453 if (!message_loop_for_io()->WatchFileDescriptor( 458 if (!message_loop_for_io()->WatchFileDescriptor(
454 fd_.get().fd, 459 fd_.get().fd,
455 false, 460 false,
456 base::MessageLoopForIO::WATCH_WRITE, 461 base::MessageLoopForIO::WATCH_WRITE,
457 write_watcher_.get(), 462 write_watcher_.get(),
(...skipping 14 matching lines...) Expand all
472 477
473 // Static factory method declared in raw_channel.h. 478 // Static factory method declared in raw_channel.h.
474 // static 479 // static
475 scoped_ptr<RawChannel> RawChannel::Create( 480 scoped_ptr<RawChannel> RawChannel::Create(
476 embedder::ScopedPlatformHandle handle) { 481 embedder::ScopedPlatformHandle handle) {
477 return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass())); 482 return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass()));
478 } 483 }
479 484
480 } // namespace system 485 } // namespace system
481 } // namespace mojo 486 } // namespace mojo
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698