Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(623)

Unified Diff: net/url_request/url_request_job.cc

Issue 1662763002: [ON HOLD] Implement pull-based design for content decoding (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address comments Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/url_request/url_request_job.cc
diff --git a/net/url_request/url_request_job.cc b/net/url_request/url_request_job.cc
index 71f203a4a9e49661ea0f544aa1fc06aae184efcb..e2ce27b67af29c05db6303da455589ebe7890c92 100644
--- a/net/url_request/url_request_job.cc
+++ b/net/url_request/url_request_job.cc
@@ -7,6 +7,7 @@
#include <utility>
#include "base/bind.h"
+#include "base/callback_helpers.h"
#include "base/compiler_specific.h"
#include "base/location.h"
#include "base/metrics/histogram_macros.h"
@@ -25,7 +26,7 @@
#include "net/base/net_errors.h"
#include "net/base/network_delegate.h"
#include "net/base/network_quality_estimator.h"
-#include "net/filter/filter.h"
+#include "net/filter/stream_source_util.h"
#include "net/http/http_response_headers.h"
#include "net/url_request/url_request_context.h"
@@ -34,12 +35,13 @@ namespace net {
namespace {
// Callback for TYPE_URL_REQUEST_FILTERS_SET net-internals event.
-std::unique_ptr<base::Value> FiltersSetCallback(
- Filter* filter,
+std::unique_ptr<base::Value> StreamSourceSetCallback(
+ StreamSource* stream_source,
NetLogCaptureMode /* capture_mode */) {
std::unique_ptr<base::DictionaryValue> event_params(
new base::DictionaryValue());
- event_params->SetString("filters", filter->OrderedFilterList());
+ event_params->SetString(
+ "filters", StreamSourceUtil::OrderedStreamSourceList(stream_source));
return std::move(event_params);
}
@@ -64,19 +66,51 @@ std::string ComputeMethodForRedirect(const std::string& method,
} // namespace
+// StreamSources own the previous StreamSource in the chain, but the ultimate
+// source is URLRequestJob, which has other ownership semantics, so this class
+// is a proxy for URLRequestJob that is owned by the first filter (in dataflow
+// order).
+class URLRequestJob::URLRequestJobStreamSource : public StreamSource {
+ public:
+ URLRequestJobStreamSource(URLRequestJob* job)
+ : StreamSource(StreamSource::TYPE_NONE, nullptr), job_(job) {}
+
+ ~URLRequestJobStreamSource() override {}
+
+ // StreamSource implementation:
+ Error Read(IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read,
+ const OnReadCompleteCallback& callback) override {
+ DCHECK(job_);
+
+ // If ReadRawData() returns true, the underlying data source has
+ // synchronously succeeded, which might be an EOF.
+ int bytes_read_raw = 0;
+ Error error = job_->ReadRawDataHelper(dest_buffer, buffer_size,
+ &bytes_read_raw, callback);
+ if (error == OK)
+ *bytes_read = base::checked_cast<size_t>(bytes_read_raw);
+
+ return error;
+ }
+
+ private:
+ URLRequestJob* job_;
+};
+
URLRequestJob::URLRequestJob(URLRequest* request,
NetworkDelegate* network_delegate)
: request_(request),
done_(false),
- prefilter_bytes_read_(0),
- postfilter_bytes_read_(0),
filter_needs_more_output_space_(false),
- filtered_read_buffer_len_(0),
has_handled_response_(false),
expected_content_size_(-1),
network_delegate_(network_delegate),
last_notified_total_received_bytes_(0),
last_notified_total_sent_bytes_(0),
+ raw_bytes_read_(0),
+ postfilter_bytes_read_(0),
weak_factory_(this) {
base::PowerMonitor* power_monitor = base::PowerMonitor::Get();
if (power_monitor)
@@ -112,36 +146,35 @@ void URLRequestJob::Kill() {
// This function calls ReadRawData to get stream data. If a filter exists, it
// passes the data to the attached filter. It then returns the output from
// filter back to the caller.
+// This method passes reads down the filter chain, where they eventually end up
+// at URLRequestJobStreamSource::Read, which calls back into
+// URLRequestJob::ReadRawData.
bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
DCHECK_LT(buf_size, 1000000); // Sanity check.
DCHECK(buf);
DCHECK(bytes_read);
- DCHECK(filtered_read_buffer_.get() == NULL);
- DCHECK_EQ(0, filtered_read_buffer_len_);
Error error = OK;
*bytes_read = 0;
- // Skip Filter if not present.
- if (!filter_) {
- error = ReadRawDataHelper(buf, buf_size, bytes_read);
- } else {
- // Save the caller's buffers while we do IO
- // in the filter's buffers.
- filtered_read_buffer_ = buf;
- filtered_read_buffer_len_ = buf_size;
-
- error = ReadFilteredData(bytes_read);
-
- // Synchronous EOF from the filter.
- if (error == OK && *bytes_read == 0)
- DoneReading();
- }
+ size_t bytes_read_n = 0;
+ error = source_->Read(buf, buf_size, &bytes_read_n,
+ base::Bind(&URLRequestJob::SourceReadComplete,
+ weak_factory_.GetWeakPtr()));
+ *bytes_read = bytes_read_n;
Randy Smith (Not in Mondays) 2016/04/26 21:54:02 Why not just pass bytes_read to source_->Read()?
xunjieli 2016/07/20 21:00:48 Done.
if (error == OK) {
+ postfilter_bytes_read_ += bytes_read_n;
+ if (request()->net_log().IsCapturing()) {
+ request()->net_log().AddByteTransferEvent(
+ NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, bytes_read_n,
+ buf->data());
+ }
// If URLRequestJob read zero bytes, the job is at EOF.
- if (*bytes_read == 0)
+ if (*bytes_read == 0) {
+ DoneReading();
NotifyDone(URLRequestStatus());
+ }
} else if (error == ERR_IO_PENDING) {
SetStatus(URLRequestStatus::FromError(ERR_IO_PENDING));
} else {
@@ -151,6 +184,33 @@ bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
return error == OK;
}
+void URLRequestJob::SourceReadComplete(Error error, size_t bytes_read) {
Randy Smith (Not in Mondays) 2016/04/26 21:54:02 nit, idea (i.e. not even as much force as suggesti
xunjieli 2016/07/20 21:00:48 Done.
+ DCHECK_NE(ERR_IO_PENDING, error);
+ DCHECK(error == OK || bytes_read == 0);
+
+ // Synchronize the URLRequest state machine with the URLRequestJob state
+ // machine. If this read succeeded, either the request is at EOF and the
+ // URLRequest state machine goes to 'finished', or it is not and the
+ // URLRequest state machine goes to 'success'. If the read failed, the
+ // URLRequest state machine goes directly to 'finished'. If filtered data is
+ // pending, then there's nothing to do, since the status of the request is
+ // already pending.
+ //
+ // Update the URLRequest's status first, so that NotifyReadCompleted has an
+ // accurate view of the request.
+ if (error == OK && bytes_read > 0) {
+ postfilter_bytes_read_ += bytes_read;
+ SetStatus(URLRequestStatus());
+ } else {
+ NotifyDone(URLRequestStatus::FromError(error));
+ }
+ if (error == OK) {
+ if (bytes_read == 0)
+ DoneReading();
+ request_->NotifyReadCompleted(bytes_read);
+ }
+}
+
void URLRequestJob::StopCaching() {
// Nothing to do here.
}
@@ -199,8 +259,10 @@ void URLRequestJob::PopulateNetErrorDetails(NetErrorDetails* details) const {
return;
}
-Filter* URLRequestJob::SetupFilter() const {
- return NULL;
+std::unique_ptr<StreamSource> URLRequestJob::SetupSource() {
+ std::unique_ptr<URLRequestJobStreamSource> source(
+ new URLRequestJobStreamSource(this));
+ return std::move(source);
}
bool URLRequestJob::IsRedirectResponse(GURL* location,
@@ -445,10 +507,11 @@ void URLRequestJob::NotifyHeadersComplete() {
}
has_handled_response_ = true;
- if (request_->status().is_success())
- filter_.reset(SetupFilter());
+ if (request_->status().is_success()) {
+ source_ = SetupSource();
+ }
Randy Smith (Not in Mondays) 2016/04/26 21:54:02 nit: Why the curly braces?
xunjieli 2016/07/20 21:00:48 Done.
- if (!filter_.get()) {
+ if (source_->type() == StreamSource::TYPE_NONE) {
std::string content_length;
request_->GetResponseHeaderByName("content-length", &content_length);
if (!content_length.empty())
@@ -456,7 +519,7 @@ void URLRequestJob::NotifyHeadersComplete() {
} else {
request_->net_log().AddEvent(
NetLog::TYPE_URL_REQUEST_FILTERS_SET,
- base::Bind(&FiltersSetCallback, base::Unretained(filter_.get())));
+ base::Bind(&StreamSourceSetCallback, base::Unretained(source_.get())));
}
request_->NotifyResponseStarted();
@@ -498,57 +561,12 @@ void URLRequestJob::ReadRawDataComplete(int result) {
GatherRawReadStats(error, bytes_read);
- if (filter_.get() && error == OK) {
- // |bytes_read| being 0 indicates an EOF was received. ReadFilteredData
- // can incorrectly return ERR_IO_PENDING when 0 bytes are passed to it, so
- // just don't call into the filter in that case.
- int filter_bytes_read = 0;
- if (bytes_read > 0) {
- // Tell the filter that it has more data.
- PushInputToFilter(bytes_read);
-
- // Filter the data.
- error = ReadFilteredData(&filter_bytes_read);
- }
-
- if (error == OK && !filter_bytes_read)
- DoneReading();
-
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- bytes_read = filter_bytes_read;
- } else {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- }
-
- // Synchronize the URLRequest state machine with the URLRequestJob state
- // machine. If this read succeeded, either the request is at EOF and the
- // URLRequest state machine goes to 'finished', or it is not and the
- // URLRequest state machine goes to 'success'. If the read failed, the
- // URLRequest state machine goes directly to 'finished'. If filtered data is
- // pending, then there's nothing to do, since the status of the request is
- // already pending.
- //
- // Update the URLRequest's status first, so that NotifyReadCompleted has an
- // accurate view of the request.
- if (error == OK && bytes_read > 0) {
- SetStatus(URLRequestStatus());
- } else if (error != ERR_IO_PENDING) {
- NotifyDone(URLRequestStatus::FromError(error));
+ // Notify StreamSource.
+ if (error == OK) {
+ DCHECK(!read_raw_callback_.is_null());
Randy Smith (Not in Mondays) 2016/04/26 21:54:02 Why only if error == OK? Don't we want to pass th
xunjieli 2016/07/20 21:00:48 Done.
+ base::ResetAndReturn(&read_raw_callback_).Run(OK, bytes_read);
}
- // NotifyReadCompleted should be called after SetStatus or NotifyDone updates
- // the status.
- if (error == OK)
- request_->NotifyReadCompleted(bytes_read);
-
// |this| may be destroyed at this point.
}
@@ -598,7 +616,7 @@ void URLRequestJob::NotifyDone(const URLRequestStatus &status) {
if (request_->status().is_success()) {
int response_code = GetResponseCode();
if (400 <= response_code && response_code <= 599) {
- bool page_has_content = (postfilter_bytes_read_ != 0);
+ bool page_has_content = (postfilter_bytes_read() != 0);
if (request_->load_flags() & net::LOAD_MAIN_FRAME) {
UMA_HISTOGRAM_BOOLEAN("Net.ErrorResponseHasContentMainFrame",
page_has_content);
@@ -664,131 +682,6 @@ void URLRequestJob::DoneReading() {
void URLRequestJob::DoneReadingRedirectResponse() {
}
-void URLRequestJob::PushInputToFilter(int bytes_read) {
- DCHECK(filter_);
- filter_->FlushStreamBuffer(bytes_read);
-}
-
-Error URLRequestJob::ReadFilteredData(int* bytes_read) {
- DCHECK(filter_);
- DCHECK(filtered_read_buffer_.get());
- DCHECK_GT(filtered_read_buffer_len_, 0);
- DCHECK_LT(filtered_read_buffer_len_, 1000000); // Sanity check.
- DCHECK(!raw_read_buffer_);
-
- *bytes_read = 0;
- Error error = ERR_FAILED;
-
- for (;;) {
- if (is_done())
- return OK;
-
- if (!filter_needs_more_output_space_ && !filter_->stream_data_len()) {
- // We don't have any raw data to work with, so read from the transaction.
- int filtered_data_read;
- error = ReadRawDataForFilter(&filtered_data_read);
- // If ReadRawDataForFilter returned some data, fall through to the case
- // below; otherwise, return early.
- if (error != OK || filtered_data_read == 0)
- return error;
- filter_->FlushStreamBuffer(filtered_data_read);
- }
-
- if ((filter_->stream_data_len() || filter_needs_more_output_space_) &&
- !is_done()) {
- // Get filtered data.
- int filtered_data_len = filtered_read_buffer_len_;
- int output_buffer_size = filtered_data_len;
- Filter::FilterStatus status =
- filter_->ReadData(filtered_read_buffer_->data(), &filtered_data_len);
-
- if (filter_needs_more_output_space_ && !filtered_data_len) {
- // filter_needs_more_output_space_ was mistaken... there are no more
- // bytes and we should have at least tried to fill up the filter's input
- // buffer. Correct the state, and try again.
- filter_needs_more_output_space_ = false;
- continue;
- }
- filter_needs_more_output_space_ =
- (filtered_data_len == output_buffer_size);
-
- switch (status) {
- case Filter::FILTER_DONE: {
- filter_needs_more_output_space_ = false;
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_NEED_MORE_DATA: {
- // We have finished filtering all data currently in the buffer.
- // There might be some space left in the output buffer. One can
- // consider reading more data from the stream to feed the filter
- // and filling up the output buffer. This leads to more complicated
- // buffer management and data notification mechanisms.
- // We can revisit this issue if there is a real perf need.
- if (filtered_data_len > 0) {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- } else {
- // Read again since we haven't received enough data yet (e.g., we
- // may not have a complete gzip header yet).
- continue;
- }
- break;
- }
- case Filter::FILTER_OK: {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_ERROR: {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " Filter Error";
- filter_needs_more_output_space_ = false;
- error = ERR_CONTENT_DECODING_FAILED;
- UMA_HISTOGRAM_ENUMERATION("Net.ContentDecodingFailed.FilterType",
- filter_->type(), Filter::FILTER_TYPE_MAX);
- break;
- }
- default: {
- NOTREACHED();
- filter_needs_more_output_space_ = false;
- error = ERR_FAILED;
- break;
- }
- }
-
- // If logging all bytes is enabled, log the filtered bytes read.
- if (error == OK && filtered_data_len > 0 &&
- request()->net_log().IsCapturing()) {
- request()->net_log().AddByteTransferEvent(
- NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, filtered_data_len,
- filtered_read_buffer_->data());
- }
- } else {
- // we are done, or there is no data left.
- error = OK;
- }
- break;
- }
-
- if (error == OK) {
- // When we successfully finished a read, we no longer need to save the
- // caller's buffers. Release our reference.
- filtered_read_buffer_ = NULL;
- filtered_read_buffer_len_ = 0;
- }
- return error;
-}
-
-void URLRequestJob::DestroyFilters() {
- filter_.reset();
-}
-
const URLRequestStatus URLRequestJob::GetStatus() {
return request_->status();
}
@@ -807,31 +700,23 @@ void URLRequestJob::SetProxyServer(const HostPortPair& proxy_server) {
request_->proxy_server_ = proxy_server;
}
-Error URLRequestJob::ReadRawDataForFilter(int* bytes_read) {
- Error error = ERR_FAILED;
- DCHECK(bytes_read);
- DCHECK(filter_.get());
-
- *bytes_read = 0;
+int64_t URLRequestJob::prefilter_bytes_read() const {
+ return base::checked_cast<int64_t>(raw_bytes_read_);
+}
- // Get more pre-filtered data if needed.
- // TODO(mbelshe): is it possible that the filter needs *MORE* data
- // when there is some data already in the buffer?
- if (!filter_->stream_data_len() && !is_done()) {
- IOBuffer* stream_buffer = filter_->stream_buffer();
- int stream_buffer_size = filter_->stream_buffer_size();
- error = ReadRawDataHelper(stream_buffer, stream_buffer_size, bytes_read);
- }
- return error;
+int64_t URLRequestJob::postfilter_bytes_read() const {
+ return base::checked_cast<int64_t>(postfilter_bytes_read_);
}
-Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
- int buf_size,
- int* bytes_read) {
+Error URLRequestJob::ReadRawDataHelper(
+ IOBuffer* buf,
+ int buf_size,
+ int* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) {
DCHECK(!raw_read_buffer_);
- // Keep a pointer to the read buffer, so we have access to it in
- // GatherRawReadStats() in the event that the read completes asynchronously.
+ // Keep a pointer to the read buffer, so URLRequestJob::GatherRawReadStats()
+ // has access to it to log stats.
raw_read_buffer_ = buf;
Error error;
ConvertResultToError(ReadRawData(buf, buf_size), &error, bytes_read);
@@ -840,6 +725,8 @@ Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
// If the read completes synchronously, either success or failure, invoke
// GatherRawReadStats so we can account for the completed read.
GatherRawReadStats(error, *bytes_read);
+ } else {
+ read_raw_callback_ = callback;
}
return error;
}
@@ -858,9 +745,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
raw_read_buffer_ = nullptr;
return;
}
- // If |filter_| is non-NULL, bytes will be logged after it is applied
- // instead.
- if (!filter_.get() && bytes_read > 0 && request()->net_log().IsCapturing()) {
+ if (bytes_read > 0 && request()->net_log().IsCapturing()) {
request()->net_log().AddByteTransferEvent(
NetLog::TYPE_URL_REQUEST_JOB_BYTES_READ, bytes_read,
raw_read_buffer_->data());
@@ -874,7 +759,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
void URLRequestJob::RecordBytesRead(int bytes_read) {
DCHECK_GT(bytes_read, 0);
- prefilter_bytes_read_ += bytes_read;
+ raw_bytes_read_ += base::checked_cast<size_t>(bytes_read);
// On first read, notify NetworkQualityEstimator that response headers have
// been received.
@@ -884,18 +769,16 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
// first raw read of the response body. This is used as the signal that
// response headers have been received.
if (request_->context()->network_quality_estimator() &&
- prefilter_bytes_read_ == bytes_read) {
+ prefilter_bytes_read() == bytes_read) {
request_->context()->network_quality_estimator()->NotifyHeadersReceived(
*request_);
}
- if (!filter_.get())
- postfilter_bytes_read_ += bytes_read;
DVLOG(2) << __FUNCTION__ << "() "
<< "\"" << request_->url().spec() << "\""
<< " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
+ << " pre total = " << prefilter_bytes_read()
+ << " post total = " << postfilter_bytes_read();
UpdatePacketReadTimes(); // Facilitate stats recording if it is active.
// Notify observers if any additional network usage has occurred. Note that
@@ -906,10 +789,6 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
MaybeNotifyNetworkBytes();
}
-bool URLRequestJob::FilterHasData() {
- return filter_.get() && filter_->stream_data_len();
-}
-
void URLRequestJob::UpdatePacketReadTimes() {
}

Powered by Google App Engine
This is Rietveld 408576698