Chromium Code Reviews| Index: content/child/blob_storage/blob_transport_controller.cc |
| diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc |
| index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..c0157bd017b055443d7da8c4a01041333fb25504 100644 |
| --- a/content/child/blob_storage/blob_transport_controller.cc |
| +++ b/content/child/blob_storage/blob_transport_controller.cc |
| @@ -4,14 +4,21 @@ |
| #include "content/child/blob_storage/blob_transport_controller.h" |
| +#include <limits> |
| #include <utility> |
| #include <vector> |
| +#include "base/bind.h" |
| +#include "base/bind_helpers.h" |
| +#include "base/callback.h" |
| +#include "base/files/file.h" |
| #include "base/lazy_instance.h" |
| +#include "base/location.h" |
| #include "base/memory/scoped_vector.h" |
| #include "base/memory/shared_memory.h" |
| #include "base/single_thread_task_runner.h" |
| #include "base/stl_util.h" |
| +#include "base/task_runner.h" |
| #include "content/child/blob_storage/blob_consolidation.h" |
| #include "content/child/child_process.h" |
| #include "content/child/thread_safe_sender.h" |
| @@ -22,6 +29,7 @@ |
| #include "storage/common/data_element.h" |
| #include "third_party/WebKit/public/platform/Platform.h" |
| +using base::File; |
| using base::SharedMemory; |
| using base::SharedMemoryHandle; |
| using storage::BlobItemBytesRequest; |
| @@ -30,10 +38,12 @@ using storage::IPCBlobItemRequestStrategy; |
| using storage::DataElement; |
| using storage::kBlobStorageIPCThresholdBytes; |
| -namespace content { |
| - |
| +using storage::BlobItemBytesResponse; |
| +using storage::BlobItemBytesRequest; |
| using storage::IPCBlobCreationCancelCode; |
| +namespace content { |
| + |
| using ConsolidatedItem = BlobConsolidation::ConsolidatedItem; |
| using ReadStatus = BlobConsolidation::ReadStatus; |
| @@ -52,6 +62,73 @@ void DecChildProcessRefCount() { |
| blink::Platform::current()->suddenTerminationChanged(true); |
| ChildProcess::current()->ReleaseProcess(); |
| } |
| + |
| +bool WriteSingleChunk(base::File* file, |
| + size_t total_read, |
| + const char* memory, |
| + size_t size) { |
| + size_t written = 0; |
| + size_t max_int = static_cast<size_t>(std::numeric_limits<int>::max()); |
| + while (written < size) { |
| + size_t writing_size = std::min(max_int, size - written); |
| + int actual_written = |
| + file->WriteAtCurrentPos(memory, static_cast<int>(writing_size)); |
| + if (actual_written < 0) { |
| + LOG(ERROR) << "Error writing to file " << actual_written; |
| + return false; |
| + } |
| + written += writing_size; |
| + } |
| + return true; |
| +} |
| + |
| +bool WriteSingleRequestToDisk(const BlobConsolidation* consolidation, |
| + const BlobItemBytesRequest& request, |
| + const IPC::PlatformFileForTransit& transit_file) { |
| + File file = IPC::PlatformFileForTransitToFile(transit_file); |
| + if (!file.IsValid()) |
| + return false; |
| + CHECK_LE(request.handle_offset, |
| + static_cast<uint64_t>(std::numeric_limits<int64_t>::max())); |
| + int64_t seek_distance = |
| + file.Seek(File::FROM_BEGIN, static_cast<int64_t>(request.handle_offset)); |
| + if (seek_distance < 0) { |
| + LOG(ERROR) << "Error seeking " << request.handle_offset << " in file, got " |
| + << seek_distance |
| + << ". File error: " << File::ErrorToString(file.error_details()); |
| + return false; |
| + } |
| + BlobConsolidation::ReadStatus status = consolidation->VisitMemory( |
| + request.renderer_item_index, request.renderer_item_offset, request.size, |
| + base::Bind(&WriteSingleChunk, &file)); |
| + // We need to release the file so we don't automatically close the file. |
| + file.TakePlatformFile(); |
| + return status == ReadStatus::OK; |
| +} |
| + |
| +void WriteDiskRequests( |
| + scoped_refptr<BlobConsolidation> consolidation, |
| + std::vector<BlobItemBytesRequest>* requests, |
| + const std::vector<IPC::PlatformFileForTransit>& file_handles, |
| + const BlobTransportController::ResponseCallback& response_callback, |
| + const BlobTransportController::CancelCallback& error_callback, |
| + scoped_refptr<base::TaskRunner> io_task_runner) { |
| + for (const auto& request : *requests) { |
| + bool success = WriteSingleRequestToDisk(consolidation.get(), request, |
| + file_handles[request.handle_index]); |
| + if (!success) { |
| + io_task_runner->PostTask( |
| + FROM_HERE, base::Bind(error_callback, |
| + IPCBlobCreationCancelCode::FILE_WRITE_FAILED)); |
| + return; |
| + } |
| + io_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(response_callback, |
| + std::vector<BlobItemBytesResponse>{ |
| + BlobItemBytesResponse(request.request_number)})); |
| + } |
| +} |
| } // namespace |
| BlobTransportController* BlobTransportController::GetInstance() { |
| @@ -62,7 +139,7 @@ BlobTransportController* BlobTransportController::GetInstance() { |
| void BlobTransportController::InitiateBlobTransfer( |
| const std::string& uuid, |
| const std::string& content_type, |
| - std::unique_ptr<BlobConsolidation> consolidation, |
| + scoped_refptr<BlobConsolidation> consolidation, |
| scoped_refptr<ThreadSafeSender> sender, |
| base::SingleThreadTaskRunner* io_runner, |
| scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
| @@ -95,10 +172,17 @@ void BlobTransportController::OnMemoryRequest( |
| const std::vector<storage::BlobItemBytesRequest>& requests, |
| std::vector<base::SharedMemoryHandle>* memory_handles, |
| const std::vector<IPC::PlatformFileForTransit>& file_handles, |
| - IPC::Sender* sender) { |
| + IPC::Sender* sender, |
| + scoped_refptr<base::TaskRunner> io_runner, |
| + base::TaskRunner* file_runner, |
| + const ResponseCallback& async_response_callback, |
| + const CancelCallback& async_cancel_callback) { |
| std::vector<storage::BlobItemBytesResponse> responses; |
| - ResponsesStatus status = |
| - GetResponses(uuid, requests, memory_handles, file_handles, &responses); |
| + ResponsesStatus status = GetResponses( |
| + uuid, requests, memory_handles, file_handles, &responses, |
| + std::move(io_runner), file_runner, async_response_callback, |
| + base::Bind(&BlobTransportController::CancelAsyncBlobTransfer, |
| + base::Unretained(this), async_cancel_callback, uuid)); |
| switch (status) { |
| case ResponsesStatus::BLOB_NOT_FOUND: |
| @@ -114,10 +198,12 @@ void BlobTransportController::OnMemoryRequest( |
| << "."; |
| break; |
| case ResponsesStatus::SUCCESS: |
| + case ResponsesStatus::PENDING_IO: |
| break; |
| } |
| - sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
| + if (!responses.empty()) |
| + sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
| } |
| void BlobTransportController::OnCancel( |
| @@ -198,9 +284,17 @@ void BlobTransportController::ClearForTesting() { |
| blob_storage_.clear(); |
| } |
| +void BlobTransportController::CancelAsyncBlobTransfer( |
| + const CancelCallback& async_cancel_callback, |
| + const std::string& uuid, |
| + storage::IPCBlobCreationCancelCode code) { |
| + blob_storage_.erase(uuid); |
| + async_cancel_callback.Run(code); |
| +} |
| + |
| void BlobTransportController::StoreBlobDataForRequests( |
| const std::string& uuid, |
| - std::unique_ptr<BlobConsolidation> consolidation, |
| + scoped_refptr<BlobConsolidation> consolidation, |
| scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
| if (!main_thread_runner_.get()) { |
| main_thread_runner_ = std::move(main_runner); |
| @@ -213,7 +307,11 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| const std::vector<BlobItemBytesRequest>& requests, |
| std::vector<SharedMemoryHandle>* memory_handles, |
| const std::vector<IPC::PlatformFileForTransit>& file_handles, |
| - std::vector<BlobItemBytesResponse>* out) { |
| + std::vector<BlobItemBytesResponse>* out, |
| + scoped_refptr<base::TaskRunner> io_runner, |
| + base::TaskRunner* file_runner, |
| + const ResponseCallback& response_callback, |
| + const CancelCallback& error_callback) { |
| DCHECK(out->empty()); |
| auto it = blob_storage_.find(uuid); |
| if (it == blob_storage_.end()) |
| @@ -222,6 +320,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| BlobConsolidation* consolidation = it->second.get(); |
| const auto& consolidated_items = consolidation->consolidated_items(); |
| + scoped_ptr<std::vector<BlobItemBytesRequest>> file_requests( |
| + new std::vector<BlobItemBytesRequest>()); |
| + |
| // Since we can be writing to the same shared memory handle from multiple |
| // requests, we keep them in a vector and lazily create them. |
| ScopedVector<SharedMemory> opened_memory; |
| @@ -236,9 +337,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| << "Invalid data range"; |
| DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
| - out->push_back(BlobItemBytesResponse(request.request_number)); |
| switch (request.transport_strategy) { |
| case IPCBlobItemRequestStrategy::IPC: { |
| + out->push_back(BlobItemBytesResponse(request.request_number)); |
| BlobItemBytesResponse& response = out->back(); |
| ReadStatus status = consolidation->ReadMemory( |
| request.renderer_item_index, request.renderer_item_offset, |
| @@ -249,6 +350,7 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| break; |
| } |
| case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
| + out->push_back(BlobItemBytesResponse(request.request_number)); |
| DCHECK_LT(request.handle_index, memory_handles->size()) |
| << "Invalid handle index."; |
| SharedMemory* memory = opened_memory[request.handle_index]; |
| @@ -273,14 +375,24 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| break; |
| } |
| case IPCBlobItemRequestStrategy::FILE: |
| - NOTREACHED() << "TODO(dmurph): Not implemented."; |
| + DCHECK_LT(request.handle_index, file_handles.size()) |
| + << "Invalid handle index."; |
| + file_requests->push_back(request); |
| break; |
| case IPCBlobItemRequestStrategy::UNKNOWN: |
| NOTREACHED(); |
| break; |
| } |
| } |
| - return ResponsesStatus::SUCCESS; |
| + if (file_requests->empty()) { |
| + return ResponsesStatus::SUCCESS; |
| + } |
| + file_runner->PostTask( |
| + FROM_HERE, base::Bind(&WriteDiskRequests, consolidation, |
| + base::Owned(file_requests.release()), file_handles, |
|
kinuko
2016/04/15 15:02:32
I think base::Passed(&file_requests) would be more
dmurph
2016/04/20 21:15:34
done.
|
| + response_callback, error_callback, |
| + base::Passed(std::move(io_runner)))); |
| + return ResponsesStatus::PENDING_IO; |
| } |
| void BlobTransportController::ReleaseBlobConsolidation( |