Index: content/browser/service_worker/service_worker_url_request_job.cc |
diff --git a/content/browser/service_worker/service_worker_url_request_job.cc b/content/browser/service_worker/service_worker_url_request_job.cc |
index a60c41600af8f872ec32d63111eb9e10a38366fa..98a23aa85539c8655820c2a0a7e5fc61e351b5c2 100644 |
--- a/content/browser/service_worker/service_worker_url_request_job.cc |
+++ b/content/browser/service_worker/service_worker_url_request_job.cc |
@@ -41,6 +41,7 @@ |
#include "content/public/browser/resource_request_info.h" |
#include "content/public/browser/service_worker_context.h" |
#include "content/public/common/referrer.h" |
+#include "mojo/public/cpp/system/simple_watcher.h" |
#include "net/base/net_errors.h" |
#include "net/http/http_request_headers.h" |
#include "net/http/http_response_headers.h" |
@@ -126,6 +127,107 @@ std::vector<int64_t> GetFileSizesOnBlockingPool( |
} // namespace |
+class ServiceWorkerDataPipeReader { |
+ public: |
+ ServiceWorkerDataPipeReader( |
+ ServiceWorkerURLRequestJob* owner, |
+ scoped_refptr<ServiceWorkerVersion> streaming_version, |
+ mojo::ScopedDataPipeConsumerHandle stream) |
+ : owner_(owner), |
+ streaming_version_(streaming_version), |
+ handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL), |
+ stream_(std::move(stream)) { |
+ TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", |
+ this, "Url", owner->request()->url().spec()); |
+ streaming_version_->AddStreamingURLRequestJob(owner_); |
+ } |
+ ~ServiceWorkerDataPipeReader() { |
+ // LOG(ERROR) << "~ServiceWorkerDataPipeReader"; |
+ if (streaming_version_) { |
+ streaming_version_->RemoveStreamingURLRequestJob(owner_); |
+ streaming_version_ = nullptr; |
+ } |
+ TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", |
+ this); |
+ } |
+ void Start() { |
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
+ this, "Start"); |
+ // LOG(ERROR) << "ServiceWorkerDataPipeReader::Start"; |
+ handle_watcher_.Watch( |
+ stream_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotReadable, |
+ base::Unretained(this))); |
+ handle_watcher_.ArmOrNotify(); |
+ owner_->OnResponseStarted(); |
+ } |
+ void OnHandleGotReadable(MojoResult) { |
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
+ this, "OnHandleGotReadable"); |
+ // LOG(ERROR) << "OnHandleGotReadable"; |
+ if (!stream_pending_buffer_) |
+ return; |
+ uint32_t size_to_pass = stream_pending_buffer_size_; |
+ MojoResult mojo_result = |
+ mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(), |
+ &size_to_pass, MOJO_READ_DATA_FLAG_NONE); |
+ if (mojo_result == MOJO_RESULT_SHOULD_WAIT) { |
+ handle_watcher_.ArmOrNotify(); |
+ return; |
+ } |
+ stream_pending_buffer_ = nullptr; |
+ stream_pending_buffer_size_ = 0; |
+ |
+ switch (mojo_result) { |
+ case MOJO_RESULT_OK: |
+ owner_->OnReadRawDataComplete(size_to_pass); |
+ return; |
+ case MOJO_RESULT_FAILED_PRECONDITION: |
+ handle_watcher_.Cancel(); |
+ owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE); |
+ owner_->OnReadRawDataComplete(0); |
+ return; |
+ case MOJO_RESULT_BUSY: |
+ case MOJO_RESULT_RESOURCE_EXHAUSTED: |
+ owner_->OnReadRawDataComplete(net::ERR_FAILED); |
+ return; |
+ } |
+ } |
+ int ReadRawData(net::IOBuffer* buf, int buf_size) { |
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
+ this, "ReadRawData"); |
+ uint32_t size_to_pass = buf_size; |
+ MojoResult mojo_result = mojo::ReadDataRaw( |
+ stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE); |
+ switch (mojo_result) { |
+ case MOJO_RESULT_OK: |
+ return size_to_pass; |
+ case MOJO_RESULT_FAILED_PRECONDITION: |
+ handle_watcher_.Cancel(); |
+ owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE); |
+ return 0; |
+ case MOJO_RESULT_BUSY: |
+ return net::ERR_FAILED; |
+ case MOJO_RESULT_SHOULD_WAIT: |
+ stream_pending_buffer_ = buf; |
+ stream_pending_buffer_size_ = buf_size; |
+ handle_watcher_.ArmOrNotify(); |
+ return net::ERR_IO_PENDING; |
+ case MOJO_RESULT_RESOURCE_EXHAUSTED: |
+ return net::ERR_FAILED; |
+ } |
+ return net::ERR_FAILED; |
+ } |
+ |
+ private: |
+ ServiceWorkerURLRequestJob* owner_; |
+ scoped_refptr<ServiceWorkerVersion> streaming_version_; |
+ scoped_refptr<net::IOBuffer> stream_pending_buffer_; |
+ int stream_pending_buffer_size_ = 0; |
+ mojo::SimpleWatcher handle_watcher_; |
+ mojo::ScopedDataPipeConsumerHandle stream_; |
+}; |
+ |
// Sets the size on each DataElement in the request body that is a file with |
// unknown size. This ensures ServiceWorkerURLRequestJob::CreateRequestBodyBlob |
// can successfuly create a blob from the data elements, as files with unknown |
@@ -261,6 +363,7 @@ ServiceWorkerURLRequestJob::ServiceWorkerURLRequestJob( |
ServiceWorkerURLRequestJob::~ServiceWorkerURLRequestJob() { |
stream_reader_.reset(); |
+ data_pipe_reader_.reset(); |
file_size_resolver_.reset(); |
if (!ShouldRecordResult()) |
@@ -312,6 +415,7 @@ void ServiceWorkerURLRequestJob::Start() { |
void ServiceWorkerURLRequestJob::Kill() { |
net::URLRequestJob::Kill(); |
stream_reader_.reset(); |
+ data_pipe_reader_.reset(); |
fetch_dispatcher_.reset(); |
blob_reader_.reset(); |
weak_factory_.InvalidateWeakPtrs(); |
@@ -325,6 +429,7 @@ net::LoadState ServiceWorkerURLRequestJob::GetLoadState() const { |
bool ServiceWorkerURLRequestJob::GetCharset(std::string* charset) { |
if (!http_info()) |
return false; |
+ // LOG(ERROR) << "http_info()->headers " << http_info()->headers; |
return http_info()->headers->GetCharset(charset); |
} |
@@ -368,6 +473,8 @@ int ServiceWorkerURLRequestJob::ReadRawData(net::IOBuffer* buf, int buf_size) { |
if (stream_reader_) |
return stream_reader_->ReadRawData(buf, buf_size); |
+ if (data_pipe_reader_) |
+ return data_pipe_reader_->ReadRawData(buf, buf_size); |
if (blob_reader_) |
return blob_reader_->ReadRawData(buf, buf_size); |
@@ -563,6 +670,7 @@ void ServiceWorkerURLRequestJob::DidDispatchFetchEvent( |
ServiceWorkerStatusCode status, |
ServiceWorkerFetchEventResult fetch_result, |
const ServiceWorkerResponse& response, |
+ mojo::ScopedDataPipeConsumerHandle stream, |
const scoped_refptr<ServiceWorkerVersion>& version) { |
// Do not clear |fetch_dispatcher_| if it has dispatched a navigation preload |
// request to keep the mojom::URLLoader related objects in it, because the |
@@ -638,6 +746,17 @@ void ServiceWorkerURLRequestJob::DidDispatchFetchEvent( |
return; |
} |
+ if (stream.is_valid()) { |
+ // LOG(ERROR) << "response.status_code " << response.status_code; |
+ // LOG(ERROR) << "stream.is_valid()"; |
+ SetResponseBodyType(STREAM); |
+ SetResponse(response); |
+ data_pipe_reader_.reset( |
+ new ServiceWorkerDataPipeReader(this, version, std::move(stream))); |
+ data_pipe_reader_->Start(); |
+ return; |
+ } |
+ |
// Set up a request for reading the blob. |
if (!response.blob_uuid.empty() && blob_storage_context_) { |
SetResponseBodyType(BLOB); |