Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(272)

Side by Side Diff: chromeos/process_proxy/process_output_watcher.cc

Issue 1258193002: User MessageLoopForIO::WatchFileDescriptor in proces_output_watcher (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chromeos/process_proxy/process_output_watcher.h" 5 #include "chromeos/process_proxy/process_output_watcher.h"
6 6
7 #include <sys/ioctl.h>
8 #include <sys/select.h>
9 #include <unistd.h>
10
11 #include <algorithm> 7 #include <algorithm>
12 #include <cstdio> 8 #include <cstdio>
13 #include <cstring> 9 #include <cstring>
14 10 #include "base/bind.h"
11 #include "base/location.h"
15 #include "base/logging.h" 12 #include "base/logging.h"
16 #include "base/posix/eintr_wrapper.h" 13 #include "base/posix/eintr_wrapper.h"
14 #include "base/single_thread_task_runner.h"
17 #include "base/third_party/icu/icu_utf.h" 15 #include "base/third_party/icu/icu_utf.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/time/time.h"
18 18
19 namespace { 19 namespace {
20 20
21 void InitReadFdSet(int out_fd, int stop_fd, fd_set* set) {
22 FD_ZERO(set);
23 if (out_fd != -1)
24 FD_SET(out_fd, set);
25 FD_SET(stop_fd, set);
26 }
27
28 void CloseFd(int* fd) {
29 if (*fd >= 0) {
30 if (IGNORE_EINTR(close(*fd)) != 0)
31 DPLOG(WARNING) << "close fd " << *fd << " failed.";
32 }
33 *fd = -1;
34 }
35
36 // Gets byte size for a UTF8 character given it's leading byte. The character 21 // Gets byte size for a UTF8 character given it's leading byte. The character
37 // size is encoded as number of leading '1' bits in the character's leading 22 // size is encoded as number of leading '1' bits in the character's leading
38 // byte. If the most significant bit is '0', the character is a valid ASCII 23 // byte. If the most significant bit is '0', the character is a valid ASCII
39 // and it's byte size is 1. 24 // and it's byte size is 1.
40 // The method returns 1 if the provided byte is invalid leading byte. 25 // The method returns 1 if the provided byte is invalid leading byte.
41 size_t UTF8SizeFromLeadingByte(uint8 leading_byte) { 26 size_t UTF8SizeFromLeadingByte(uint8 leading_byte) {
42 size_t byte_count = 0; 27 size_t byte_count = 0;
43 uint8 mask = 1 << 7; 28 uint8 mask = 1 << 7;
44 uint8 error_mask = 1 << (7 - CBU8_MAX_LENGTH); 29 uint8 error_mask = 1 << (7 - CBU8_MAX_LENGTH);
45 while (leading_byte & mask) { 30 while (leading_byte & mask) {
46 if (mask & error_mask) 31 if (mask & error_mask)
47 return 1; 32 return 1;
48 mask >>= 1; 33 mask >>= 1;
49 ++byte_count; 34 ++byte_count;
50 } 35 }
51 return byte_count ? byte_count : 1; 36 return byte_count ? byte_count : 1;
52 } 37 }
53 38
54 } // namespace 39 } // namespace
55 40
56 namespace chromeos { 41 namespace chromeos {
57 42
58 // static
59 bool ProcessOutputWatcher::VerifyFileDescriptor(int fd) {
60 return (fd >= 0) && (fd < FD_SETSIZE);
61 }
62
63 ProcessOutputWatcher::ProcessOutputWatcher( 43 ProcessOutputWatcher::ProcessOutputWatcher(
64 int out_fd, 44 int out_fd,
65 int stop_fd,
66 const ProcessOutputCallback& callback) 45 const ProcessOutputCallback& callback)
67 : read_buffer_size_(0), 46 : read_buffer_size_(0),
68 out_fd_(out_fd), 47 process_output_file_(out_fd),
69 stop_fd_(stop_fd), 48 on_read_callback_(callback),
70 on_read_callback_(callback) { 49 weak_factory_(this) {
71 CHECK(VerifyFileDescriptor(out_fd_)); 50 CHECK_GE(out_fd, 0);
72 CHECK(VerifyFileDescriptor(stop_fd_));
73 max_fd_ = std::max(out_fd_, stop_fd_);
74 // We want to be sure we will be able to add 0 at the end of the input, so -1. 51 // We want to be sure we will be able to add 0 at the end of the input, so -1.
75 read_buffer_capacity_ = arraysize(read_buffer_) - 1; 52 read_buffer_capacity_ = arraysize(read_buffer_) - 1;
76 } 53 }
77 54
55 ProcessOutputWatcher::~ProcessOutputWatcher() {}
56
78 void ProcessOutputWatcher::Start() { 57 void ProcessOutputWatcher::Start() {
79 WatchProcessOutput(); 58 WatchProcessOutput();
80 OnStop();
81 } 59 }
82 60
83 ProcessOutputWatcher::~ProcessOutputWatcher() { 61 void ProcessOutputWatcher::OnFileCanReadWithoutBlocking(int fd) {
84 CloseFd(&out_fd_); 62 DCHECK_EQ(process_output_file_.GetPlatformFile(), fd);
85 CloseFd(&stop_fd_); 63
64 output_file_watcher_.StopWatchingFileDescriptor();
65 ReadFromFd(fd);
66 }
67
68 void ProcessOutputWatcher::OnFileCanWriteWithoutBlocking(int fd) {
69 NOTREACHED();
86 } 70 }
87 71
88 void ProcessOutputWatcher::WatchProcessOutput() { 72 void ProcessOutputWatcher::WatchProcessOutput() {
89 while (true) { 73 base::MessageLoopForIO::current()->WatchFileDescriptor(
90 // This has to be reset with every watch cycle. 74 process_output_file_.GetPlatformFile(), false,
91 fd_set rfds; 75 base::MessageLoopForIO::WATCH_READ, &output_file_watcher_, this);
92 DCHECK_GE(stop_fd_, 0);
93 InitReadFdSet(out_fd_, stop_fd_, &rfds);
94
95 int select_result =
96 HANDLE_EINTR(select(max_fd_ + 1, &rfds, NULL, NULL, NULL));
97
98 if (select_result < 0) {
99 DPLOG(WARNING) << "select failed";
100 return;
101 }
102
103 // Check if we were stopped.
104 if (FD_ISSET(stop_fd_, &rfds)) {
105 return;
106 }
107
108 if (out_fd_ != -1 && FD_ISSET(out_fd_, &rfds)) {
109 ReadFromFd(PROCESS_OUTPUT_TYPE_OUT, &out_fd_);
110 }
111 }
112 } 76 }
113 77
114 void ProcessOutputWatcher::ReadFromFd(ProcessOutputType type, int* fd) { 78 void ProcessOutputWatcher::ReadFromFd(int fd) {
115 // We don't want to necessary read pipe until it is empty so we don't starve 79 // We don't want to necessary read pipe until it is empty so we don't starve
116 // other streams in case data is written faster than we read it. If there is 80 // other streams in case data is written faster than we read it. If there is
117 // more than read_buffer_size_ bytes in pipe, it will be read in the next 81 // more than read_buffer_size_ bytes in pipe, it will be read in the next
118 // iteration. 82 // iteration.
119 DCHECK_GT(read_buffer_capacity_, read_buffer_size_); 83 DCHECK_GT(read_buffer_capacity_, read_buffer_size_);
120 ssize_t bytes_read = 84 ssize_t bytes_read =
121 HANDLE_EINTR(read(*fd, 85 HANDLE_EINTR(read(fd, &read_buffer_[read_buffer_size_],
122 &read_buffer_[read_buffer_size_],
123 read_buffer_capacity_ - read_buffer_size_)); 86 read_buffer_capacity_ - read_buffer_size_));
87
88 if (bytes_read > 0) {
89 ReportOutput(PROCESS_OUTPUT_TYPE_OUT, bytes_read);
90
91 // Delay next read to make the process less likely to flood IPC channel
92 // when output is reported to terminal extension via terminalPrivate API
93 // (which is the only client of this code).
94 // TODO(tbarzic): Properly fix this!! Provide a mechanism for clients to
95 // ack reported output and continue watching the process when ack is
96 // received. https://crbug.com/398901
97 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
98 FROM_HERE, base::Bind(&ProcessOutputWatcher::WatchProcessOutput,
99 weak_factory_.GetWeakPtr()),
100 base::TimeDelta::FromMilliseconds(10));
101
102 return;
103 }
104
124 if (bytes_read < 0) 105 if (bytes_read < 0)
125 DPLOG(WARNING) << "read from buffer failed"; 106 DPLOG(WARNING) << "read from buffer failed";
126 107
127 if (bytes_read > 0)
128 ReportOutput(type, bytes_read);
129
130 // If there is nothing on the output the watched process has exited (slave end 108 // If there is nothing on the output the watched process has exited (slave end
131 // of pty is closed). 109 // of pty is closed).
132 if (bytes_read <= 0) { 110 on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, "");
133 // Slave pseudo terminal has been closed, we won't need master fd anymore.
134 CloseFd(fd);
135 111
136 // We have lost contact with the process, so report it. 112 // Cancel pending |WatchProcessOutput| calls.
137 on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, ""); 113 weak_factory_.InvalidateWeakPtrs();
138 }
139 } 114 }
140 115
141 size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() { 116 size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() {
142 // Find the last non-trailing character byte. This byte should be used to 117 // Find the last non-trailing character byte. This byte should be used to
143 // infer the last UTF8 character length. 118 // infer the last UTF8 character length.
144 int last_lead_byte = read_buffer_size_ - 1; 119 int last_lead_byte = read_buffer_size_ - 1;
145 while (true) { 120 while (true) {
146 // If the series of trailing bytes is too long, something's not right. 121 // If the series of trailing bytes is too long, something's not right.
147 // Report the whole output, without waiting for further character bytes. 122 // Report the whole output, without waiting for further character bytes.
148 if (read_buffer_size_ - last_lead_byte > CBU8_MAX_LENGTH) 123 if (read_buffer_size_ - last_lead_byte > CBU8_MAX_LENGTH)
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 // Move the bytes that were left behind to the beginning of the buffer and 159 // Move the bytes that were left behind to the beginning of the buffer and
185 // update the buffer size accordingly. 160 // update the buffer size accordingly.
186 if (output_to_report < read_buffer_size_) { 161 if (output_to_report < read_buffer_size_) {
187 for (size_t i = output_to_report; i < read_buffer_size_; ++i) { 162 for (size_t i = output_to_report; i < read_buffer_size_; ++i) {
188 read_buffer_[i - output_to_report] = read_buffer_[i]; 163 read_buffer_[i - output_to_report] = read_buffer_[i];
189 } 164 }
190 } 165 }
191 read_buffer_size_ -= output_to_report; 166 read_buffer_size_ -= output_to_report;
192 } 167 }
193 168
194 void ProcessOutputWatcher::OnStop() {
195 delete this;
196 }
197
198 } // namespace chromeos 169 } // namespace chromeos
OLDNEW
« no previous file with comments | « chromeos/process_proxy/process_output_watcher.h ('k') | chromeos/process_proxy/process_output_watcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698