Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(728)

Unified Diff: net/url_request/url_request_job.cc

Issue 1662763002: [ON HOLD] Implement pull-based design for content decoding (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebased Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/url_request/url_request_job.h ('k') | net/url_request/url_request_job_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/url_request/url_request_job.cc
diff --git a/net/url_request/url_request_job.cc b/net/url_request/url_request_job.cc
index e5e6e0e8898543f46d4d6090eb8adf2e18df6737..8bd986848e9502dcab72c9de345b1f6b547ac20b 100644
--- a/net/url_request/url_request_job.cc
+++ b/net/url_request/url_request_job.cc
@@ -7,8 +7,10 @@
#include <utility>
#include "base/bind.h"
+#include "base/callback_helpers.h"
#include "base/compiler_specific.h"
#include "base/location.h"
+#include "base/memory/ptr_util.h"
#include "base/metrics/histogram_macros.h"
#include "base/power_monitor/power_monitor.h"
#include "base/profiler/scoped_tracker.h"
@@ -25,7 +27,6 @@
#include "net/base/load_states.h"
#include "net/base/net_errors.h"
#include "net/base/network_delegate.h"
-#include "net/filter/filter.h"
#include "net/http/http_response_headers.h"
#include "net/nqe/network_quality_estimator.h"
#include "net/url_request/url_request_context.h"
@@ -35,12 +36,12 @@ namespace net {
namespace {
// Callback for TYPE_URL_REQUEST_FILTERS_SET net-internals event.
-std::unique_ptr<base::Value> FiltersSetCallback(
- Filter* filter,
+std::unique_ptr<base::Value> SourceStreamSetCallback(
+ SourceStream* source_stream,
NetLogCaptureMode /* capture_mode */) {
std::unique_ptr<base::DictionaryValue> event_params(
new base::DictionaryValue());
- event_params->SetString("filters", filter->OrderedFilterList());
+ event_params->SetString("filters", source_stream->OrderedTypeStringList());
return std::move(event_params);
}
@@ -115,14 +116,44 @@ URLRequest::ReferrerPolicy ProcessReferrerPolicyHeaderOnRedirect(
} // namespace
+// Each SourceStreams own the previous SourceStream in the chain, but the
+// ultimate source is URLRequestJob, which has other ownership semantics, so
+// this class is a proxy for URLRequestJob that is owned by the first stream
+// (in dataflow order).
+class URLRequestJob::URLRequestJobSourceStream : public SourceStream {
+ public:
+ explicit URLRequestJobSourceStream(URLRequestJob* job)
+ : SourceStream(SourceStream::TYPE_NONE), job_(job) {
+ DCHECK(job_);
+ }
+
+ ~URLRequestJobSourceStream() override {}
+
+ // SourceStream implementation:
+ int Read(IOBuffer* dest_buffer,
+ size_t buffer_size,
+ const CompletionCallback& callback) override {
+ DCHECK(job_);
+ return job_->ReadRawDataHelper(dest_buffer, buffer_size, callback);
+ }
+
+ std::string OrderedTypeStringList() const override { return ""; }
+
+ private:
+ // It is safe to keep a raw pointer because |job_| owns the last stream which
+ // indirectly owns |this|. Therefore, |job_| will not be destroyed when |this|
+ // is alive.
+ URLRequestJob* const job_;
+
+ DISALLOW_COPY_AND_ASSIGN(URLRequestJobSourceStream);
+};
+
URLRequestJob::URLRequestJob(URLRequest* request,
NetworkDelegate* network_delegate)
: request_(request),
done_(false),
prefilter_bytes_read_(0),
postfilter_bytes_read_(0),
- filter_needs_more_output_space_(false),
- filtered_read_buffer_len_(0),
has_handled_response_(false),
expected_content_size_(-1),
network_delegate_(network_delegate),
@@ -163,43 +194,30 @@ void URLRequestJob::Kill() {
// This function calls ReadRawData to get stream data. If a filter exists, it
// passes the data to the attached filter. It then returns the output from
// filter back to the caller.
+// This method passes reads down the filter chain, where they eventually end up
+// at URLRequestJobSourceStream::Read, which calls back into
+// URLRequestJob::ReadRawData.
bool URLRequestJob::Read(IOBuffer* buf, int buf_size, int *bytes_read) {
DCHECK_LT(buf_size, 1000000); // Sanity check.
DCHECK(buf);
DCHECK(bytes_read);
- DCHECK(!filtered_read_buffer_);
- DCHECK_EQ(0, filtered_read_buffer_len_);
-
- Error error = OK;
*bytes_read = 0;
- // Skip Filter if not present.
- if (!filter_) {
- error = ReadRawDataHelper(buf, buf_size, bytes_read);
- } else {
- // Save the caller's buffers while we do IO
- // in the filter's buffers.
- filtered_read_buffer_ = buf;
- filtered_read_buffer_len_ = buf_size;
+ pending_read_buffer_ = buf;
+ int result = source_stream_->Read(
+ buf, buf_size, base::Bind(&URLRequestJob::SourceStreamReadComplete,
+ weak_factory_.GetWeakPtr(), false));
+ if (result == ERR_IO_PENDING) {
+ SetStatus(URLRequestStatus::FromError(ERR_IO_PENDING));
+ return false;
+ }
- error = ReadFilteredData(bytes_read);
+ SourceStreamReadComplete(true, result);
- // Synchronous EOF from the filter.
- if (error == OK && *bytes_read == 0)
- DoneReading();
- }
+ if (result > 0)
+ *bytes_read = result;
- if (error == OK) {
- // If URLRequestJob read zero bytes, the job is at EOF.
- if (*bytes_read == 0)
- NotifyDone(URLRequestStatus());
- } else if (error == ERR_IO_PENDING) {
- SetStatus(URLRequestStatus::FromError(ERR_IO_PENDING));
- } else {
- NotifyDone(URLRequestStatus::FromError(error));
- *bytes_read = -1;
- }
- return error == OK;
+ return result >= OK;
}
void URLRequestJob::StopCaching() {
@@ -246,10 +264,6 @@ void URLRequestJob::PopulateNetErrorDetails(NetErrorDetails* details) const {
return;
}
-std::unique_ptr<Filter> URLRequestJob::SetupFilter() const {
- return nullptr;
-}
-
bool URLRequestJob::IsRedirectResponse(GURL* location,
int* http_status_code) {
// For non-HTTP jobs, headers will be null.
@@ -496,18 +510,27 @@ void URLRequestJob::NotifyHeadersComplete() {
}
has_handled_response_ = true;
- if (request_->status().is_success())
- filter_ = SetupFilter();
-
- if (!filter_.get()) {
- std::string content_length;
- request_->GetResponseHeaderByName("content-length", &content_length);
- if (!content_length.empty())
- base::StringToInt64(content_length, &expected_content_size_);
- } else {
- request_->net_log().AddEvent(
- NetLog::TYPE_URL_REQUEST_FILTERS_SET,
- base::Bind(&FiltersSetCallback, base::Unretained(filter_.get())));
+ if (request_->status().is_success()) {
+ DCHECK(!source_stream_);
+ source_stream_ = SetUpSourceStream();
+
+ if (source_stream_ == nullptr) {
+ NotifyDone(URLRequestStatus(URLRequestStatus::FAILED,
+ ERR_CONTENT_DECODING_INIT_FAILED));
+ return;
+ }
+
+ if (source_stream_->type() == SourceStream::TYPE_NONE) {
+ std::string content_length;
+ request_->GetResponseHeaderByName("content-length", &content_length);
+ if (!content_length.empty())
+ base::StringToInt64(content_length, &expected_content_size_);
+ } else {
+ request_->net_log().AddEvent(
+ NetLog::TYPE_URL_REQUEST_FILTERS_SET,
+ base::Bind(&SourceStreamSetCallback,
+ base::Unretained(source_stream_.get())));
+ }
}
request_->NotifyResponseStarted();
@@ -527,6 +550,7 @@ void URLRequestJob::ConvertResultToError(int result, Error* error, int* count) {
void URLRequestJob::ReadRawDataComplete(int result) {
DCHECK(request_->status().is_io_pending());
+ DCHECK_NE(ERR_IO_PENDING, result);
// TODO(cbentzel): Remove ScopedTracker below once crbug.com/475755 is fixed.
tracked_objects::ScopedTracker tracking_profile(
@@ -541,63 +565,12 @@ void URLRequestJob::ReadRawDataComplete(int result) {
// The headers should be complete before reads complete
DCHECK(has_handled_response_);
- Error error;
- int bytes_read;
- ConvertResultToError(result, &error, &bytes_read);
-
- DCHECK_NE(ERR_IO_PENDING, error);
-
- GatherRawReadStats(error, bytes_read);
-
- if (filter_.get() && error == OK) {
- // |bytes_read| being 0 indicates an EOF was received. ReadFilteredData
- // can incorrectly return ERR_IO_PENDING when 0 bytes are passed to it, so
- // just don't call into the filter in that case.
- int filter_bytes_read = 0;
- if (bytes_read > 0) {
- // Tell the filter that it has more data.
- PushInputToFilter(bytes_read);
-
- // Filter the data.
- error = ReadFilteredData(&filter_bytes_read);
- }
-
- if (error == OK && !filter_bytes_read)
- DoneReading();
-
- DVLOG(1) << __func__ << "() \"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- bytes_read = filter_bytes_read;
- } else {
- DVLOG(1) << __func__ << "() \"" << request_->url().spec() << "\""
- << " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
- }
-
- // Synchronize the URLRequest state machine with the URLRequestJob state
- // machine. If this read succeeded, either the request is at EOF and the
- // URLRequest state machine goes to 'finished', or it is not and the
- // URLRequest state machine goes to 'success'. If the read failed, the
- // URLRequest state machine goes directly to 'finished'. If filtered data is
- // pending, then there's nothing to do, since the status of the request is
- // already pending.
- //
- // Update the URLRequest's status first, so that NotifyReadCompleted has an
- // accurate view of the request.
- if (error == OK && bytes_read > 0) {
- SetStatus(URLRequestStatus());
- } else if (error != ERR_IO_PENDING) {
- NotifyDone(URLRequestStatus::FromError(error));
- }
+ GatherRawReadStats(result);
- // NotifyReadCompleted should be called after SetStatus or NotifyDone updates
- // the status.
- if (error == OK)
- request_->NotifyReadCompleted(bytes_read);
+ // Notify SourceStream.
+ DCHECK(!read_raw_callback_.is_null());
+ base::ResetAndReturn(&read_raw_callback_).Run(result);
// |this| may be destroyed at this point.
}
@@ -647,7 +620,7 @@ void URLRequestJob::NotifyDone(const URLRequestStatus &status) {
if (request_->status().is_success()) {
int response_code = GetResponseCode();
if (400 <= response_code && response_code <= 599) {
- bool page_has_content = (postfilter_bytes_read_ != 0);
+ bool page_has_content = (postfilter_bytes_read() != 0);
if (request_->load_flags() & net::LOAD_MAIN_FRAME) {
UMA_HISTOGRAM_BOOLEAN("Net.ErrorResponseHasContentMainFrame",
page_has_content);
@@ -713,128 +686,8 @@ void URLRequestJob::DoneReading() {
void URLRequestJob::DoneReadingRedirectResponse() {
}
-void URLRequestJob::PushInputToFilter(int bytes_read) {
- DCHECK(filter_);
- filter_->FlushStreamBuffer(bytes_read);
-}
-
-Error URLRequestJob::ReadFilteredData(int* bytes_read) {
- DCHECK(filter_);
- DCHECK(filtered_read_buffer_.get());
- DCHECK_GT(filtered_read_buffer_len_, 0);
- DCHECK_LT(filtered_read_buffer_len_, 1000000); // Sanity check.
- DCHECK(!raw_read_buffer_);
-
- *bytes_read = 0;
- Error error = ERR_FAILED;
-
- for (;;) {
- if (is_done())
- return OK;
-
- if (!filter_needs_more_output_space_ && !filter_->stream_data_len()) {
- // We don't have any raw data to work with, so read from the transaction.
- int filtered_data_read;
- error = ReadRawDataForFilter(&filtered_data_read);
- // If ReadRawDataForFilter returned some data, fall through to the case
- // below; otherwise, return early.
- if (error != OK || filtered_data_read == 0)
- return error;
- filter_->FlushStreamBuffer(filtered_data_read);
- }
-
- if ((filter_->stream_data_len() || filter_needs_more_output_space_) &&
- !is_done()) {
- // Get filtered data.
- int filtered_data_len = filtered_read_buffer_len_;
- int output_buffer_size = filtered_data_len;
- Filter::FilterStatus status =
- filter_->ReadData(filtered_read_buffer_->data(), &filtered_data_len);
-
- if (filter_needs_more_output_space_ && !filtered_data_len) {
- // filter_needs_more_output_space_ was mistaken... there are no more
- // bytes and we should have at least tried to fill up the filter's input
- // buffer. Correct the state, and try again.
- filter_needs_more_output_space_ = false;
- continue;
- }
- filter_needs_more_output_space_ =
- (filtered_data_len == output_buffer_size);
-
- switch (status) {
- case Filter::FILTER_DONE: {
- filter_needs_more_output_space_ = false;
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_NEED_MORE_DATA: {
- // We have finished filtering all data currently in the buffer.
- // There might be some space left in the output buffer. One can
- // consider reading more data from the stream to feed the filter
- // and filling up the output buffer. This leads to more complicated
- // buffer management and data notification mechanisms.
- // We can revisit this issue if there is a real perf need.
- if (filtered_data_len > 0) {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- } else {
- // Read again since we haven't received enough data yet (e.g., we
- // may not have a complete gzip header yet).
- continue;
- }
- break;
- }
- case Filter::FILTER_OK: {
- *bytes_read = filtered_data_len;
- postfilter_bytes_read_ += filtered_data_len;
- error = OK;
- break;
- }
- case Filter::FILTER_ERROR: {
- DVLOG(1) << __func__ << "() \"" << request_->url().spec() << "\""
- << " Filter Error";
- filter_needs_more_output_space_ = false;
- error = ERR_CONTENT_DECODING_FAILED;
- UMA_HISTOGRAM_ENUMERATION("Net.ContentDecodingFailed.FilterType",
- filter_->type(), Filter::FILTER_TYPE_MAX);
- break;
- }
- default: {
- NOTREACHED();
- filter_needs_more_output_space_ = false;
- error = ERR_FAILED;
- break;
- }
- }
-
- // If logging all bytes is enabled, log the filtered bytes read.
- if (error == OK && filtered_data_len > 0 &&
- request()->net_log().IsCapturing()) {
- request()->net_log().AddByteTransferEvent(
- NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, filtered_data_len,
- filtered_read_buffer_->data());
- }
- } else {
- // we are done, or there is no data left.
- error = OK;
- }
- break;
- }
-
- if (error == OK) {
- // When we successfully finished a read, we no longer need to save the
- // caller's buffers. Release our reference.
- filtered_read_buffer_ = NULL;
- filtered_read_buffer_len_ = 0;
- }
- return error;
-}
-
-void URLRequestJob::DestroyFilters() {
- filter_.reset();
+std::unique_ptr<SourceStream> URLRequestJob::SetUpSourceStream() {
+ return base::MakeUnique<URLRequestJobSourceStream>(this);
}
const URLRequestStatus URLRequestJob::GetStatus() {
@@ -855,41 +708,57 @@ void URLRequestJob::SetProxyServer(const HostPortPair& proxy_server) {
request_->proxy_server_ = proxy_server;
}
-Error URLRequestJob::ReadRawDataForFilter(int* bytes_read) {
- Error error = ERR_FAILED;
- DCHECK(bytes_read);
- DCHECK(filter_.get());
+void URLRequestJob::SourceStreamReadComplete(bool synchronous, int result) {
+ DCHECK_NE(ERR_IO_PENDING, result);
- *bytes_read = 0;
+ if (result > 0 && request()->net_log().IsCapturing()) {
+ request()->net_log().AddByteTransferEvent(
+ NetLog::TYPE_URL_REQUEST_JOB_FILTERED_BYTES_READ, result,
+ pending_read_buffer_->data());
+ }
+ pending_read_buffer_ = nullptr;
+
+ if (result < 0) {
+ NotifyDone(URLRequestStatus::FromError(result));
+ return;
+ }
- // Get more pre-filtered data if needed.
- // TODO(mbelshe): is it possible that the filter needs *MORE* data
- // when there is some data already in the buffer?
- if (!filter_->stream_data_len() && !is_done()) {
- IOBuffer* stream_buffer = filter_->stream_buffer();
- int stream_buffer_size = filter_->stream_buffer_size();
- error = ReadRawDataHelper(stream_buffer, stream_buffer_size, bytes_read);
+ if (result > 0) {
+ postfilter_bytes_read_ += result;
+ SetStatus(URLRequestStatus());
+ if (!synchronous)
+ request_->NotifyReadCompleted(result);
+ return;
}
- return error;
+
+ // result == 0
+ DoneReading();
+ NotifyDone(URLRequestStatus());
+ if (!synchronous)
+ request_->NotifyReadCompleted(result);
}
-Error URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
- int buf_size,
- int* bytes_read) {
+int URLRequestJob::ReadRawDataHelper(IOBuffer* buf,
+ int buf_size,
+ const CompletionCallback& callback) {
DCHECK(!raw_read_buffer_);
- // Keep a pointer to the read buffer, so we have access to it in
- // GatherRawReadStats() in the event that the read completes asynchronously.
+ // Keep a pointer to the read buffer, so URLRequestJob::GatherRawReadStats()
+ // has access to it to log stats.
raw_read_buffer_ = buf;
- Error error;
- ConvertResultToError(ReadRawData(buf, buf_size), &error, bytes_read);
- if (error != ERR_IO_PENDING) {
+ // TODO(xunjieli): Make ReadRawData take in a callback rather than requiring
+ // subclass to call ReadRawDataComplete upon asynchronous completion.
+ int result = ReadRawData(buf, buf_size);
+
+ if (result != ERR_IO_PENDING) {
// If the read completes synchronously, either success or failure, invoke
// GatherRawReadStats so we can account for the completed read.
- GatherRawReadStats(error, *bytes_read);
+ GatherRawReadStats(result);
+ } else {
+ read_raw_callback_ = callback;
}
- return error;
+ return result;
}
void URLRequestJob::FollowRedirect(const RedirectInfo& redirect_info) {
@@ -898,23 +767,16 @@ void URLRequestJob::FollowRedirect(const RedirectInfo& redirect_info) {
NotifyDone(URLRequestStatus(URLRequestStatus::FAILED, rv));
}
-void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
+void URLRequestJob::GatherRawReadStats(int bytes_read) {
DCHECK(raw_read_buffer_ || bytes_read == 0);
- DCHECK_NE(ERR_IO_PENDING, error);
-
- if (error != OK) {
- raw_read_buffer_ = nullptr;
- return;
- }
- // If |filter_| is non-NULL, bytes will be logged after it is applied
- // instead.
- if (!filter_.get() && bytes_read > 0 && request()->net_log().IsCapturing()) {
- request()->net_log().AddByteTransferEvent(
- NetLog::TYPE_URL_REQUEST_JOB_BYTES_READ, bytes_read,
- raw_read_buffer_->data());
- }
+ DCHECK_NE(ERR_IO_PENDING, bytes_read);
if (bytes_read > 0) {
+ if (request()->net_log().IsCapturing()) {
+ request()->net_log().AddByteTransferEvent(
+ NetLog::TYPE_URL_REQUEST_JOB_BYTES_READ, bytes_read,
+ raw_read_buffer_->data());
+ }
RecordBytesRead(bytes_read);
}
raw_read_buffer_ = nullptr;
@@ -922,7 +784,7 @@ void URLRequestJob::GatherRawReadStats(Error error, int bytes_read) {
void URLRequestJob::RecordBytesRead(int bytes_read) {
DCHECK_GT(bytes_read, 0);
- prefilter_bytes_read_ += bytes_read;
+ prefilter_bytes_read_ += base::checked_cast<size_t>(bytes_read);
// On first read, notify NetworkQualityEstimator that response headers have
// been received.
@@ -932,17 +794,16 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
// first raw read of the response body. This is used as the signal that
// response headers have been received.
if (request_->context()->network_quality_estimator() &&
- prefilter_bytes_read_ == bytes_read) {
+ prefilter_bytes_read() == bytes_read) {
request_->context()->network_quality_estimator()->NotifyHeadersReceived(
*request_);
}
- if (!filter_.get())
- postfilter_bytes_read_ += bytes_read;
- DVLOG(2) << __func__ << "() \"" << request_->url().spec() << "\""
+ DVLOG(2) << __FUNCTION__ << "() "
+ << "\"" << request_->url().spec() << "\""
<< " pre bytes read = " << bytes_read
- << " pre total = " << prefilter_bytes_read_
- << " post total = " << postfilter_bytes_read_;
+ << " pre total = " << prefilter_bytes_read()
+ << " post total = " << postfilter_bytes_read();
UpdatePacketReadTimes(); // Facilitate stats recording if it is active.
// Notify observers if any additional network usage has occurred. Note that
@@ -953,10 +814,6 @@ void URLRequestJob::RecordBytesRead(int bytes_read) {
MaybeNotifyNetworkBytes();
}
-bool URLRequestJob::FilterHasData() {
- return filter_.get() && filter_->stream_data_len();
-}
-
void URLRequestJob::UpdatePacketReadTimes() {
}
« no previous file with comments | « net/url_request/url_request_job.h ('k') | net/url_request/url_request_job_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698