Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1573)

Unified Diff: net/url_request/url_request_job.cc

Issue 1662763002: [ON HOLD] Implement pull-based design for content decoding (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/url_request/url_request_job.cc
diff --git a/net/url_request/url_request_job.cc b/net/url_request/url_request_job.cc
index e46d3f13eb9a2f9c222505f2d14585582a64b9c3..bef3d51fc2cb23a2a2a7b6fc56548400c1fc3e83 100644
--- a/net/url_request/url_request_job.cc
+++ b/net/url_request/url_request_job.cc
@@ -63,12 +63,91 @@ std::string ComputeMethodForRedirect(const std::string& method,
} // namespace
+// StreamSources own the previous StreamSource in the chain, but the ultimate
+// source is URLRequestJob, which has other ownership semantics, so this class
+// is a proxy for URLRequestJob that is owned by the first filter (in dataflow
+// order).
+// last filter in the chain.
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 nit: Extra comment line.
xunjieli 2016/03/03 23:00:09 Done. Removed. Used that last comment to remind my
+class URLRequestJobStreamSource
+ : public StreamSource,
+ public base::SupportsWeakPtr<URLRequestJobStreamSource> {
mmenke 2016/02/18 22:58:28 I don't think you ever vend any weak pointers, so
xunjieli 2016/03/03 23:00:09 Done.
+ public:
+ URLRequestJobStreamSource(const base::WeakPtr<URLRequestJob>& job)
+ : total_bytes_output_(0), job_(job) {}
+
+ ~URLRequestJobStreamSource() override {}
+ // StreamSource implementation:
+ Error Read(IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) override;
+ size_t GetBytesOutput() const override;
+
+ private:
+ friend class base::RefCounted<URLRequestJobStreamSource>;
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 I'm confused--the pattern I'm used to is that if a
xunjieli 2016/03/03 23:00:09 Done.
+
+ void OnRawReadComplete(const StreamSource::OnReadCompleteCallback& callback,
+ Error error,
+ size_t bytes_read);
+
+ size_t total_bytes_output_;
+ const base::WeakPtr<URLRequestJob> job_;
+};
+
+// The documentation for URLRequestJob::ReadRawData() is helpful for
+// understanding this, since that method has a complex contract.
+Error URLRequestJobStreamSource::Read(
+ IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) {
+ URLRequestJob* job = job_.get();
+ DCHECK(job);
+
+ // This is a bit of a hack. This should just be callback, but URLRequestJob
+ // needs to maintain counts of prefilter and postfilter bytes for use by
+ // subclasses, and this class (URLRequestJobStreamSource) is the only one that
+ // knows how many prefilter bytes were returned by a synchronous read, so the
+ // prefilter byte count is stored in this class.
+ OnReadCompleteCallback wrapped_callback = base::Bind(
+ &URLRequestJobStreamSource::OnRawReadComplete,
+ // This object is solely owned by the next filter in the chain. The filter
+ // chain cannot be destroyed while there is a pending read, which means
+ // this object also cannot be destroyed while there is a pending read.
+ base::Unretained(this), callback);
+ // If ReadRawData() returns true, the underlying data source has synchronously
+ // succeeded, which might be an EOF.
+ int bytes_read_raw;
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 nit: I'd init this to zero.
xunjieli 2016/03/03 23:00:09 Done.
+ Error error = job->ReadRawDataHelper(dest_buffer, buffer_size,
+ &bytes_read_raw, wrapped_callback);
+ // LOG(ERROR) << "job->ReadRawDataHelper error:" << error <<
+ // "*******************";
+ if (error == OK) {
+ *bytes_read = base::checked_cast<size_t>(bytes_read_raw);
+ total_bytes_output_ += base::checked_cast<size_t>(bytes_read_raw);
+ return OK;
+ }
+
+ return error;
+}
+
+void URLRequestJobStreamSource::OnRawReadComplete(
+ const StreamSource::OnReadCompleteCallback& callback,
+ Error error,
+ size_t bytes_read) {
+ if (error == OK)
+ total_bytes_output_ += bytes_read;
+ callback.Run(error, bytes_read);
+}
+
+size_t URLRequestJobStreamSource::GetBytesOutput() const {
+ return total_bytes_output_;
+}
+
URLRequestJob::URLRequestJob(URLRequest* request,
NetworkDelegate* network_delegate)
: request_(request),
done_(false),
- prefilter_bytes_read_(0),
- postfilter_bytes_read_(0),
filter_needs_more_output_space_(false),
filtered_read_buffer_len_(0),
has_handled_response_(false),
@@ -76,6 +155,7 @@ URLRequestJob::URLRequestJob(URLRequest* request,
network_delegate_(network_delegate),
last_notified_total_received_bytes_(0),
last_notified_total_sent_bytes_(0),
+ raw_bytes_read_(0),
weak_factory_(this) {
base::PowerMonitor* power_monitor = base::PowerMonitor::Get();
if (power_monitor)
@@ -111,6 +191,9 @@ void URLRequestJob::Kill() {
// This function calls ReadRawData to get stream data. If a filter exists, it
// passes the data to the attached filter. It then returns the output from
// filter back to the caller.
+// This method passes reads down the filter chain, where they eventually end up
+// at URLRequestJobStreamSource::Read, which calls back into
+// URLRequestJob::ReadRawData.
bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
DCHECK_LT(buf_size, 1000000); // Sanity check.
DCHECK(buf);
@@ -121,26 +204,18 @@ bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
Error error = OK;
*bytes_read = 0;
- // Skip Filter if not present.
- if (!filter_) {
- error = ReadRawDataHelper(buf, buf_size, bytes_read);
- } else {
- // Save the caller's buffers while we do IO
- // in the filter's buffers.
- filtered_read_buffer_ = buf;
- filtered_read_buffer_len_ = buf_size;
-
- error = ReadFilteredData(bytes_read);
-
- // Synchronous EOF from the filter.
- if (error == OK && *bytes_read == 0)
- DoneReading();
- }
+ size_t bytes_read_n = 0;
+ error = source_->Read(buf, buf_size, &bytes_read_n,
+ base::Bind(&URLRequestJob::SourceReadComplete,
+ weak_factory_.GetWeakPtr()));
+ *bytes_read = bytes_read_n;
if (error == OK) {
// If URLRequestJob read zero bytes, the job is at EOF.
- if (*bytes_read == 0)
+ if (*bytes_read == 0) {
+ DoneReading();
NotifyDone(URLRequestStatus());
+ }
} else if (error == ERR_IO_PENDING) {
SetStatus(URLRequestStatus::FromError(ERR_IO_PENDING));
} else {
@@ -150,6 +225,34 @@ bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
return error == OK;
}
+// Callback for asynchronous reads from |source_|. See the documentation for
+// |Read| above for the contract of this method.
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 nit: I think the documentation for |Read| is in th
xunjieli 2016/03/03 23:00:09 Done.
+void URLRequestJob::SourceReadComplete(Error error, size_t bytes_read) {
+ DCHECK_NE(ERR_IO_PENDING, error);
+ DCHECK(error == OK || bytes_read == 0);
+
+ // Synchronize the URLRequest state machine with the URLRequestJob state
+ // machine. If this read succeeded, either the request is at EOF and the
+ // URLRequest state machine goes to 'finished', or it is not and the
+ // URLRequest state machine goes to 'success'. If the read failed, the
+ // URLRequest state machine goes directly to 'finished'. If filtered data is
+ // pending, then there's nothing to do, since the status of the request is
+ // already pending.
+ //
+ // Update the URLRequest's status first, so that NotifyReadCompleted has an
+ // accurate view of the request.
+ if (error == OK && bytes_read > 0) {
+ SetStatus(URLRequestStatus());
+ } else {
+ NotifyDone(URLRequestStatus::FromError(error));
+ }
+ if (error == OK) {
+ if (bytes_read == 0)
+ DoneReading();
+ request_->NotifyReadCompleted(bytes_read);
+ }
+}
+
void URLRequestJob::StopCaching() {
// Nothing to do here.
}
@@ -198,9 +301,17 @@ void URLRequestJob::PopulateNetErrorDetails(NetErrorDetails* details) const {
return;
}
+#if 0
Filter* URLRequestJob::SetupFilter() const {
return NULL;
}
+#endif
+
+scoped_ptr<StreamSource> URLRequestJob::SetupSource() {
+ scoped_ptr<URLRequestJobStreamSource> source(
+ new URLRequestJobStreamSource(weak_factory_.GetWeakPtr()));
+ return std::move(source);
+}
bool URLRequestJob::IsRedirectResponse(GURL* location,
int* http_status_code) {
@@ -445,8 +556,10 @@ void URLRequestJob::NotifyHeadersComplete() {
}
has_handled_response_ = true;
- if (request_->status().is_success())
- filter_.reset(SetupFilter());
+ if (request_->status().is_success()) {
+ // filter_.reset(SetupFilter());
+ source_ = SetupSource();
+ }
if (!filter_.get()) {
std::string content_length;
@@ -498,60 +611,28 @@ void URLRequestJob::ReadRawDataComplete(int result) {
GatherRawReadStats(error, bytes_read);
- if (filter_.get() && error == OK) {
- // |bytes_read| being 0 indicates an EOF was received. ReadFilteredData
- // can incorrectly return ERR_IO_PENDING when 0 bytes are passed to it, so
- // just don't call into the filter in that case.
- int filter_bytes_read = 0;
- if (bytes_read > 0) {
- // Tell the filter that it has more data.
- PushInputToFilter(bytes_read);
-
- // Filter the data.
- error = ReadFilteredData(&filter_bytes_read);
- }
-
- if (error == OK && !filter_bytes_read)
- DoneReading();
-
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- bytes_read = filter_bytes_read;
- } else {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- }
-
- // Synchronize the URLRequest state machine with the URLRequestJob state
- // machine. If this read succeeded, either the request is at EOF and the
- // URLRequest state machine goes to 'finished', or it is not and the
- // URLRequest state machine goes to 'success'. If the read failed, the
- // URLRequest state machine goes directly to 'finished'. If filtered data is
- // pending, then there's nothing to do, since the status of the request is
- // already pending.
- //
- // Update the URLRequest's status first, so that NotifyReadCompleted has an
- // accurate view of the request.
- if (error == OK && bytes_read > 0) {
- SetStatus(URLRequestStatus());
- } else if (error != ERR_IO_PENDING) {
- NotifyDone(URLRequestStatus::FromError(error));
+ // Notify StreamSource.
+ if (error == OK) {
+ DCHECK(!read_raw_callback_.is_null());
+ StreamSource::OnReadCompleteCallback cb = read_raw_callback_;
+ read_raw_callback_.Reset();
+ cb.Run(OK, bytes_read);
}
- // NotifyReadCompleted should be called after SetStatus or NotifyDone updates
- // the status.
- if (error == OK)
- request_->NotifyReadCompleted(bytes_read);
-
// |this| may be destroyed at this point.
}
+#if 0
+void URLRequestJob::OnRawReadComplete(int bytes_read) {
+ if (bytes_read > 0) {
+ }
+ DCHECK(!read_raw_callback_.is_null());
+ StreamSource::OnReadCompleteCallback cb = read_raw_callback_;
+ read_raw_callback_.Reset();
+ cb.Run(OK, bytes_read);
+}
+#endif
+
void URLRequestJob::NotifyStartError(const URLRequestStatus &status) {
DCHECK(!has_handled_response_);
DCHECK(request_->status().is_io_pending());
@@ -598,7 +679,7 @@ void URLRequestJob::NotifyDone(const URLRequestStatus &status) {
if (request_->status().is_success()) {
int response_code = GetResponseCode();
if (400 <= response_code && response_code <= 599) {
- bool page_has_content = (postfilter_bytes_read_ != 0);
+ bool page_has_content = (postfilter_bytes_read() != 0);
if (request_->load_flags() & net::LOAD_MAIN_FRAME) {
UMA_HISTOGRAM_BOOLEAN("Net.ErrorResponseHasContentMainFrame",
page_has_content);
@@ -669,120 +750,6 @@ void URLRequestJob::PushInputToFilter(int bytes_read) {
filter_->FlushStreamBuffer(bytes_read);
}
-Error URLRequestJob::ReadFilteredData(int* bytes_read) {
- DCHECK(filter_);
- DCHECK(filtered_read_buffer_.get());
- DCHECK_GT(filtered_read_buffer_len_, 0);
- DCHECK_LT(filtered_read_buffer_len_, 1000000); // Sanity check.
- DCHECK(!raw_read_buffer_);
-
- *bytes_read = 0;
- Error error = ERR_FAILED;
-
- for (;;) {
- if (is_done())
- return OK;
-
- if (!filter_needs_more_output_space_ && !filter_->stream_data_len()) {
- // We don't have any raw data to work with, so read from the transaction.
- int filtered_data_read;
- error = ReadRawDataForFilter(&filtered_data_read);
- // If ReadRawDataForFilter returned some data, fall through to the case
- // below; otherwise, return early.
- if (error != OK || filtered_data_read == 0)
- return error;
- filter_->FlushStreamBuffer(filtered_data_read);
- }
-
- if ((filter_->stream_data_len() || filter_needs_more_output_space_) &&
- !is_done()) {
- // Get filtered data.
- int filtered_data_len = filtered_read_buffer_len_;
- int output_buffer_size = filtered_data_len;
- Filter::FilterStatus status =
- filter_->ReadData(filtered_read_buffer_->data(), &filtered_data_len);
-
- if (filter_needs_more_output_space_ && !filtered_data_len) {
- // filter_needs_more_output_space_ was mistaken... there are no more
- // bytes and we should have at least tried to fill up the filter's input
- // buffer. Correct the state, and try again.
- filter_needs_more_output_space_ = false;
- continue;
- }
- filter_needs_more_output_space_ =
- (filtered_data_len == output_buffer_size);
-
- switch (status) {
- case Filter::FILTER_DONE: {
- filter_needs_more_output_space_ = false;
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_NEED_MORE_DATA: {
- // We have finished filtering all data currently in the buffer.
- // There might be some space left in the output buffer. One can
- // consider reading more data from the stream to feed the filter
- // and filling up the output buffer. This leads to more complicated
- // buffer management and data notification mechanisms.
- // We can revisit this issue if there is a real perf need.
- if (filtered_data_len > 0) {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- } else {
- // Read again since we haven't received enough data yet (e.g., we
- // may not have a complete gzip header yet).
- continue;
- }
- break;
- }
- case Filter::FILTER_OK: {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_ERROR: {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " Filter Error";
- filter_needs_more_output_space_ = false;
- error = ERR_CONTENT_DECODING_FAILED;
- break;
- }
- default: {
- NOTREACHED();
- filter_needs_more_output_space_ = false;
- error = ERR_FAILED;
- break;
- }
- }
-
- // If logging all bytes is enabled, log the filtered bytes read.
- if (error == OK && filtered_data_len > 0 &&
- request()->net_log().IsCapturing()) {
- request()->net_log().AddByteTransferEvent(
- NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, filtered_data_len,
- filtered_read_buffer_->data());
- }
- } else {
- // we are done, or there is no data left.
- error = OK;
- }
- break;
- }
-
- if (error == OK) {
- // When we successfully finished a read, we no longer need to save the
- // caller's buffers. Release our reference.
- filtered_read_buffer_ = NULL;
- filtered_read_buffer_len_ = 0;
- }
- return error;
-}
-
void URLRequestJob::DestroyFilters() {
filter_.reset();
}
@@ -805,27 +772,19 @@ void URLRequestJob::SetProxyServer(const HostPortPair& proxy_server) {
request_->proxy_server_ = proxy_server;
}
-Error URLRequestJob::ReadRawDataForFilter(int* bytes_read) {
- Error error = ERR_FAILED;
- DCHECK(bytes_read);
- DCHECK(filter_.get());
-
- *bytes_read = 0;
+int64_t URLRequestJob::prefilter_bytes_read() const {
+ return base::checked_cast<int64_t>(raw_bytes_read_);
+}
- // Get more pre-filtered data if needed.
- // TODO(mbelshe): is it possible that the filter needs *MORE* data
- // when there is some data already in the buffer?
- if (!filter_->stream_data_len() && !is_done()) {
- IOBuffer* stream_buffer = filter_->stream_buffer();
- int stream_buffer_size = filter_->stream_buffer_size();
- error = ReadRawDataHelper(stream_buffer, stream_buffer_size, bytes_read);
- }
- return error;
+int64_t URLRequestJob::postfilter_bytes_read() const {
+ return source_ ? source_->GetBytesOutput() : 0;
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 nit: Comment line indicating when source will be n
xunjieli 2016/03/03 23:00:09 Done. Changed so that we are tracking postfilter_b
}
-Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
- int buf_size,
- int* bytes_read) {
+Error URLRequestJob::ReadRawDataHelper(
+ IOBuffer* buf,
+ int buf_size,
+ int* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) {
DCHECK(!raw_read_buffer_);
// Keep a pointer to the read buffer, so we have access to it in
@@ -838,6 +797,12 @@ Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
// If the read completes synchronously, either success or failure, invoke
// GatherRawReadStats so we can account for the completed read.
GatherRawReadStats(error, *bytes_read);
+ } else {
+ // Keep a pointer to the read buffer, so we have access to it in the
+ // OnRawReadComplete() callback in the event that the read completes
Randy Smith (Not in Mondays) 2016/02/08 23:28:42 nit: I think this comment is actually referring to
xunjieli 2016/03/03 23:00:09 Done. Good catch!
+ // asynchronously.
+ // raw_read_buffer_ = buf;
+ read_raw_callback_ = callback;
}
return error;
}
@@ -872,7 +837,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
void URLRequestJob::RecordBytesRead(int bytes_read) {
DCHECK_GT(bytes_read, 0);
- prefilter_bytes_read_ += bytes_read;
+ raw_bytes_read_ += base::checked_cast<size_t>(bytes_read);
// On first read, notify NetworkQualityEstimator that response headers have
// been received.
@@ -882,18 +847,16 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
// first raw read of the response body. This is used as the signal that
// response headers have been received.
if (request_->context()->network_quality_estimator() &&
- prefilter_bytes_read_ == bytes_read) {
+ prefilter_bytes_read() == bytes_read) {
request_->context()->network_quality_estimator()->NotifyHeadersReceived(
*request_);
}
- if (!filter_.get())
- postfilter_bytes_read_ += bytes_read;
DVLOG(2) << __FUNCTION__ << "() "
<< "\"" << request_->url().spec() << "\""
<< " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
+ << " pre total = " << prefilter_bytes_read()
+ << " post total = " << postfilter_bytes_read();
UpdatePacketReadTimes(); // Facilitate stats recording if it is active.
// Notify observers if any additional network usage has occurred. Note that

Powered by Google App Engine
This is Rietveld 408576698