Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1072)

Unified Diff: net/url_request/url_request_job.cc

Issue 1662763002: [ON HOLD] Implement pull-based design for content decoding (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Refactor common logic Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/url_request/url_request_job.cc
diff --git a/net/url_request/url_request_job.cc b/net/url_request/url_request_job.cc
index cbefc9e3f06845da0fd14d2364426f4978dc93a8..01c899d54dbd454871f19bebc81a66ee4336fb3b 100644
--- a/net/url_request/url_request_job.cc
+++ b/net/url_request/url_request_job.cc
@@ -7,6 +7,7 @@
#include <utility>
#include "base/bind.h"
+#include "base/callback_helpers.h"
#include "base/compiler_specific.h"
#include "base/location.h"
#include "base/metrics/histogram_macros.h"
@@ -34,11 +35,11 @@ namespace net {
namespace {
// Callback for TYPE_URL_REQUEST_FILTERS_SET net-internals event.
-scoped_ptr<base::Value> FiltersSetCallback(
- Filter* filter,
+scoped_ptr<base::Value> StreamSourceSetCallback(
+ StreamSource* stream_source,
NetLogCaptureMode /* capture_mode */) {
scoped_ptr<base::DictionaryValue> event_params(new base::DictionaryValue());
- event_params->SetString("filters", filter->OrderedFilterList());
+ event_params->SetString("filters", stream_source->OrderedStreamSourceList());
return std::move(event_params);
}
@@ -63,12 +64,58 @@ std::string ComputeMethodForRedirect(const std::string& method,
} // namespace
+// StreamSources own the previous StreamSource in the chain, but the ultimate
+// source is URLRequestJob, which has other ownership semantics, so this class
+// is a proxy for URLRequestJob that is owned by the first filter (in dataflow
+// order).
+class URLRequestJob::URLRequestJobStreamSource : public StreamSource {
+ public:
+ URLRequestJobStreamSource(const base::WeakPtr<URLRequestJob>& job)
+ : StreamSource(StreamSource::SOURCE_NONE, nullptr), job_(job) {}
+
+ ~URLRequestJobStreamSource() override {}
+
+ void OnReadComplete(IOBuffer* dest_buffer,
+ size_t dest_buffer_size,
+ Error error,
+ size_t bytes_read) {
+ DCHECK_NE(ERR_IO_PENDING, error);
+ DCHECK_EQ(dest_buffer, pending_read_buffer_.get());
+ DCHECK(!callback_.is_null());
+
+ pending_read_buffer_ = nullptr;
+ base::ResetAndReturn(&callback_).Run(error, bytes_read);
+ }
+
+ // StreamSource implementation:
+ Error ReadInternal(IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read) override {
+ URLRequestJob* job = job_.get();
+ DCHECK(job);
mmenke 2016/03/04 21:15:57 If we can DCHECK on it, I don't think it needs to
xunjieli 2016/04/20 19:16:11 Done.
+
+ // If ReadRawData() returns true, the underlying data source has
+ // synchronously succeeded, which might be an EOF.
+ int bytes_read_raw = 0;
+ Error error = job->ReadRawDataHelper(
+ dest_buffer, buffer_size, &bytes_read_raw,
+ base::Bind(&URLRequestJobStreamSource::OnReadComplete,
+ base::Unretained(this), base::Unretained(dest_buffer),
+ buffer_size));
+ if (error == OK)
+ *bytes_read = base::checked_cast<size_t>(bytes_read_raw);
+
+ return error;
+ }
+
+ private:
+ const base::WeakPtr<URLRequestJob> job_;
+};
+
URLRequestJob::URLRequestJob(URLRequest* request,
NetworkDelegate* network_delegate)
: request_(request),
done_(false),
- prefilter_bytes_read_(0),
- postfilter_bytes_read_(0),
filter_needs_more_output_space_(false),
filtered_read_buffer_len_(0),
has_handled_response_(false),
@@ -76,6 +123,8 @@ URLRequestJob::URLRequestJob(URLRequest* request,
network_delegate_(network_delegate),
last_notified_total_received_bytes_(0),
last_notified_total_sent_bytes_(0),
+ raw_bytes_read_(0),
+ postfilter_bytes_read_(0),
weak_factory_(this) {
base::PowerMonitor* power_monitor = base::PowerMonitor::Get();
if (power_monitor)
@@ -111,6 +160,9 @@ void URLRequestJob::Kill() {
// This function calls ReadRawData to get stream data. If a filter exists, it
// passes the data to the attached filter. It then returns the output from
// filter back to the caller.
+// This method passes reads down the filter chain, where they eventually end up
+// at URLRequestJobStreamSource::Read, which calls back into
+// URLRequestJob::ReadRawData.
bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
DCHECK_LT(buf_size, 1000000); // Sanity check.
DCHECK(buf);
@@ -121,26 +173,19 @@ bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
Error error = OK;
*bytes_read = 0;
- // Skip Filter if not present.
- if (!filter_) {
- error = ReadRawDataHelper(buf, buf_size, bytes_read);
- } else {
- // Save the caller's buffers while we do IO
- // in the filter's buffers.
- filtered_read_buffer_ = buf;
- filtered_read_buffer_len_ = buf_size;
-
- error = ReadFilteredData(bytes_read);
-
- // Synchronous EOF from the filter.
- if (error == OK && *bytes_read == 0)
- DoneReading();
- }
+ size_t bytes_read_n = 0;
+ error = source_->Read(buf, buf_size, &bytes_read_n,
+ base::Bind(&URLRequestJob::SourceReadComplete,
+ weak_factory_.GetWeakPtr()));
+ *bytes_read = bytes_read_n;
if (error == OK) {
+ postfilter_bytes_read_ += bytes_read_n;
// If URLRequestJob read zero bytes, the job is at EOF.
- if (*bytes_read == 0)
+ if (*bytes_read == 0) {
+ DoneReading();
NotifyDone(URLRequestStatus());
+ }
} else if (error == ERR_IO_PENDING) {
SetStatus(URLRequestStatus::FromError(ERR_IO_PENDING));
} else {
@@ -150,6 +195,33 @@ bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
return error == OK;
}
+void URLRequestJob::SourceReadComplete(Error error, size_t bytes_read) {
+ DCHECK_NE(ERR_IO_PENDING, error);
+ DCHECK(error == OK || bytes_read == 0);
+
+ // Synchronize the URLRequest state machine with the URLRequestJob state
+ // machine. If this read succeeded, either the request is at EOF and the
+ // URLRequest state machine goes to 'finished', or it is not and the
+ // URLRequest state machine goes to 'success'. If the read failed, the
+ // URLRequest state machine goes directly to 'finished'. If filtered data is
+ // pending, then there's nothing to do, since the status of the request is
+ // already pending.
+ //
+ // Update the URLRequest's status first, so that NotifyReadCompleted has an
+ // accurate view of the request.
+ if (error == OK && bytes_read > 0) {
+ postfilter_bytes_read_ += bytes_read;
+ SetStatus(URLRequestStatus());
+ } else {
+ NotifyDone(URLRequestStatus::FromError(error));
+ }
+ if (error == OK) {
+ if (bytes_read == 0)
+ DoneReading();
+ request_->NotifyReadCompleted(bytes_read);
+ }
+}
+
void URLRequestJob::StopCaching() {
// Nothing to do here.
}
@@ -198,8 +270,10 @@ void URLRequestJob::PopulateNetErrorDetails(NetErrorDetails* details) const {
return;
}
-Filter* URLRequestJob::SetupFilter() const {
- return NULL;
+scoped_ptr<StreamSource> URLRequestJob::SetupSource() {
+ scoped_ptr<URLRequestJobStreamSource> source(
+ new URLRequestJobStreamSource(weak_factory_.GetWeakPtr()));
+ return std::move(source);
}
bool URLRequestJob::IsRedirectResponse(GURL* location,
@@ -444,10 +518,12 @@ void URLRequestJob::NotifyHeadersComplete() {
}
has_handled_response_ = true;
- if (request_->status().is_success())
- filter_.reset(SetupFilter());
+ if (request_->status().is_success()) {
+ // filter_.reset(SetupFilter());
+ source_ = SetupSource();
+ }
- if (!filter_.get()) {
+ if (source_->type() == StreamSource::SOURCE_NONE) {
std::string content_length;
request_->GetResponseHeaderByName("content-length", &content_length);
if (!content_length.empty())
@@ -455,7 +531,7 @@ void URLRequestJob::NotifyHeadersComplete() {
} else {
request_->net_log().AddEvent(
NetLog::TYPE_URL_REQUEST_FILTERS_SET,
- base::Bind(&FiltersSetCallback, base::Unretained(filter_.get())));
+ base::Bind(&StreamSourceSetCallback, base::Unretained(source_.get())));
}
request_->NotifyResponseStarted();
@@ -497,60 +573,28 @@ void URLRequestJob::ReadRawDataComplete(int result) {
GatherRawReadStats(error, bytes_read);
- if (filter_.get() && error == OK) {
- // |bytes_read| being 0 indicates an EOF was received. ReadFilteredData
- // can incorrectly return ERR_IO_PENDING when 0 bytes are passed to it, so
- // just don't call into the filter in that case.
- int filter_bytes_read = 0;
- if (bytes_read > 0) {
- // Tell the filter that it has more data.
- PushInputToFilter(bytes_read);
-
- // Filter the data.
- error = ReadFilteredData(&filter_bytes_read);
- }
-
- if (error == OK && !filter_bytes_read)
- DoneReading();
-
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- bytes_read = filter_bytes_read;
- } else {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- }
-
- // Synchronize the URLRequest state machine with the URLRequestJob state
- // machine. If this read succeeded, either the request is at EOF and the
- // URLRequest state machine goes to 'finished', or it is not and the
- // URLRequest state machine goes to 'success'. If the read failed, the
- // URLRequest state machine goes directly to 'finished'. If filtered data is
- // pending, then there's nothing to do, since the status of the request is
- // already pending.
- //
- // Update the URLRequest's status first, so that NotifyReadCompleted has an
- // accurate view of the request.
- if (error == OK && bytes_read > 0) {
- SetStatus(URLRequestStatus());
- } else if (error != ERR_IO_PENDING) {
- NotifyDone(URLRequestStatus::FromError(error));
+ // Notify StreamSource.
+ if (error == OK) {
+ DCHECK(!read_raw_callback_.is_null());
+ StreamSource::OnReadCompleteCallback cb = read_raw_callback_;
+ read_raw_callback_.Reset();
+ cb.Run(OK, bytes_read);
mmenke 2016/03/04 21:15:57 Can just use: base::ResetAndReturn(read_raw_callb
xunjieli 2016/04/20 19:16:10 Done.
}
- // NotifyReadCompleted should be called after SetStatus or NotifyDone updates
- // the status.
- if (error == OK)
- request_->NotifyReadCompleted(bytes_read);
-
// |this| may be destroyed at this point.
}
+#if 0
+void URLRequestJob::OnRawReadComplete(int bytes_read) {
+ if (bytes_read > 0) {
+ }
+ DCHECK(!read_raw_callback_.is_null());
+ StreamSource::OnReadCompleteCallback cb = read_raw_callback_;
+ read_raw_callback_.Reset();
+ cb.Run(OK, bytes_read);
+}
+#endif
mmenke 2016/03/04 21:15:57 Should remove all the commented out code.
xunjieli 2016/04/20 19:16:11 Done.
+
void URLRequestJob::NotifyStartError(const URLRequestStatus &status) {
DCHECK(!has_handled_response_);
DCHECK(request_->status().is_io_pending());
@@ -597,7 +641,7 @@ void URLRequestJob::NotifyDone(const URLRequestStatus &status) {
if (request_->status().is_success()) {
int response_code = GetResponseCode();
if (400 <= response_code && response_code <= 599) {
- bool page_has_content = (postfilter_bytes_read_ != 0);
+ bool page_has_content = (postfilter_bytes_read() != 0);
if (request_->load_flags() & net::LOAD_MAIN_FRAME) {
UMA_HISTOGRAM_BOOLEAN("Net.ErrorResponseHasContentMainFrame",
page_has_content);
@@ -663,131 +707,6 @@ void URLRequestJob::DoneReading() {
void URLRequestJob::DoneReadingRedirectResponse() {
}
-void URLRequestJob::PushInputToFilter(int bytes_read) {
- DCHECK(filter_);
- filter_->FlushStreamBuffer(bytes_read);
-}
-
-Error URLRequestJob::ReadFilteredData(int* bytes_read) {
- DCHECK(filter_);
- DCHECK(filtered_read_buffer_.get());
- DCHECK_GT(filtered_read_buffer_len_, 0);
- DCHECK_LT(filtered_read_buffer_len_, 1000000); // Sanity check.
- DCHECK(!raw_read_buffer_);
-
- *bytes_read = 0;
- Error error = ERR_FAILED;
-
- for (;;) {
- if (is_done())
- return OK;
-
- if (!filter_needs_more_output_space_ && !filter_->stream_data_len()) {
- // We don't have any raw data to work with, so read from the transaction.
- int filtered_data_read;
- error = ReadRawDataForFilter(&filtered_data_read);
- // If ReadRawDataForFilter returned some data, fall through to the case
- // below; otherwise, return early.
- if (error != OK || filtered_data_read == 0)
- return error;
- filter_->FlushStreamBuffer(filtered_data_read);
- }
-
- if ((filter_->stream_data_len() || filter_needs_more_output_space_) &&
- !is_done()) {
- // Get filtered data.
- int filtered_data_len = filtered_read_buffer_len_;
- int output_buffer_size = filtered_data_len;
- Filter::FilterStatus status =
- filter_->ReadData(filtered_read_buffer_->data(), &filtered_data_len);
-
- if (filter_needs_more_output_space_ && !filtered_data_len) {
- // filter_needs_more_output_space_ was mistaken... there are no more
- // bytes and we should have at least tried to fill up the filter's input
- // buffer. Correct the state, and try again.
- filter_needs_more_output_space_ = false;
- continue;
- }
- filter_needs_more_output_space_ =
- (filtered_data_len == output_buffer_size);
-
- switch (status) {
- case Filter::FILTER_DONE: {
- filter_needs_more_output_space_ = false;
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_NEED_MORE_DATA: {
- // We have finished filtering all data currently in the buffer.
- // There might be some space left in the output buffer. One can
- // consider reading more data from the stream to feed the filter
- // and filling up the output buffer. This leads to more complicated
- // buffer management and data notification mechanisms.
- // We can revisit this issue if there is a real perf need.
- if (filtered_data_len > 0) {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- } else {
- // Read again since we haven't received enough data yet (e.g., we
- // may not have a complete gzip header yet).
- continue;
- }
- break;
- }
- case Filter::FILTER_OK: {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_ERROR: {
- DVLOG(1) << __FUNCTION__ << "() "
- << "\"" << request_->url().spec() << "\""
- << " Filter Error";
- filter_needs_more_output_space_ = false;
- error = ERR_CONTENT_DECODING_FAILED;
- UMA_HISTOGRAM_ENUMERATION("Net.ContentDecodingFailed.FilterType",
- filter_->type(), Filter::FILTER_TYPE_MAX);
mmenke 2016/03/04 21:15:57 Are we just getting rid of this histogram?
Randy Smith (Not in Mondays) 2016/03/09 23:03:56 I'd like to keep this histogram; it's a pain when
xunjieli 2016/04/20 19:16:11 Done. moved to stream_source.cc so we can log the
- break;
- }
- default: {
- NOTREACHED();
- filter_needs_more_output_space_ = false;
- error = ERR_FAILED;
- break;
- }
- }
-
- // If logging all bytes is enabled, log the filtered bytes read.
- if (error == OK && filtered_data_len > 0 &&
- request()->net_log().IsCapturing()) {
- request()->net_log().AddByteTransferEvent(
- NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, filtered_data_len,
- filtered_read_buffer_->data());
mmenke 2016/03/04 21:15:57 Think we want to keep this log event.
xunjieli 2016/04/20 19:16:11 Done.
- }
- } else {
- // we are done, or there is no data left.
- error = OK;
- }
- break;
- }
-
- if (error == OK) {
- // When we successfully finished a read, we no longer need to save the
- // caller's buffers. Release our reference.
- filtered_read_buffer_ = NULL;
- filtered_read_buffer_len_ = 0;
- }
- return error;
-}
-
-void URLRequestJob::DestroyFilters() {
- filter_.reset();
-}
-
const URLRequestStatus URLRequestJob::GetStatus() {
return request_->status();
}
@@ -806,31 +725,23 @@ void URLRequestJob::SetProxyServer(const HostPortPair& proxy_server) {
request_->proxy_server_ = proxy_server;
}
-Error URLRequestJob::ReadRawDataForFilter(int* bytes_read) {
- Error error = ERR_FAILED;
- DCHECK(bytes_read);
- DCHECK(filter_.get());
-
- *bytes_read = 0;
+int64_t URLRequestJob::prefilter_bytes_read() const {
+ return base::checked_cast<int64_t>(raw_bytes_read_);
+}
- // Get more pre-filtered data if needed.
- // TODO(mbelshe): is it possible that the filter needs *MORE* data
- // when there is some data already in the buffer?
- if (!filter_->stream_data_len() && !is_done()) {
- IOBuffer* stream_buffer = filter_->stream_buffer();
- int stream_buffer_size = filter_->stream_buffer_size();
- error = ReadRawDataHelper(stream_buffer, stream_buffer_size, bytes_read);
- }
- return error;
+int64_t URLRequestJob::postfilter_bytes_read() const {
+ return postfilter_bytes_read_;
mmenke 2016/03/04 21:15:57 Making prefilter_bytes_read use a checked_cast, an
xunjieli 2016/04/20 19:16:10 Done.
}
-Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
- int buf_size,
- int* bytes_read) {
+Error URLRequestJob::ReadRawDataHelper(
+ IOBuffer* buf,
+ int buf_size,
+ int* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) {
DCHECK(!raw_read_buffer_);
- // Keep a pointer to the read buffer, so we have access to it in
- // GatherRawReadStats() in the event that the read completes asynchronously.
+ // Keep a pointer to the read buffer, so URLRequestJob::GatherRawReadStats()
+ // has access to it to log stats.
raw_read_buffer_ = buf;
Error error;
ConvertResultToError(ReadRawData(buf, buf_size), &error, bytes_read);
@@ -839,6 +750,8 @@ Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
// If the read completes synchronously, either success or failure, invoke
// GatherRawReadStats so we can account for the completed read.
GatherRawReadStats(error, *bytes_read);
+ } else {
+ read_raw_callback_ = callback;
}
return error;
}
@@ -857,9 +770,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
raw_read_buffer_ = nullptr;
return;
}
- // If |filter_| is non-NULL, bytes will be logged after it is applied
- // instead.
- if (!filter_.get() && bytes_read > 0 && request()->net_log().IsCapturing()) {
+ if (bytes_read > 0 && request()->net_log().IsCapturing()) {
request()->net_log().AddByteTransferEvent(
NetLog::TYPE_URL_REQUEST_JOB_BYTES_READ, bytes_read,
raw_read_buffer_->data());
@@ -873,7 +784,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
void URLRequestJob::RecordBytesRead(int bytes_read) {
DCHECK_GT(bytes_read, 0);
- prefilter_bytes_read_ += bytes_read;
+ raw_bytes_read_ += base::checked_cast<size_t>(bytes_read);
// On first read, notify NetworkQualityEstimator that response headers have
// been received.
@@ -883,18 +794,16 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
// first raw read of the response body. This is used as the signal that
// response headers have been received.
if (request_->context()->network_quality_estimator() &&
- prefilter_bytes_read_ == bytes_read) {
+ prefilter_bytes_read() == bytes_read) {
request_->context()->network_quality_estimator()->NotifyHeadersReceived(
*request_);
}
- if (!filter_.get())
- postfilter_bytes_read_ += bytes_read;
DVLOG(2) << __FUNCTION__ << "() "
<< "\"" << request_->url().spec() << "\""
<< " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
+ << " pre total = " << prefilter_bytes_read()
+ << " post total = " << postfilter_bytes_read();
UpdatePacketReadTimes(); // Facilitate stats recording if it is active.
// Notify observers if any additional network usage has occurred. Note that
@@ -905,10 +814,6 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
MaybeNotifyNetworkBytes();
}
-bool URLRequestJob::FilterHasData() {
- return filter_.get() && filter_->stream_data_len();
-}
-
void URLRequestJob::UpdatePacketReadTimes() {
}

Powered by Google App Engine
This is Rietveld 408576698