Chromium Code Reviews| Index: content/child/blob_storage/blob_transport_controller.cc |
| diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc |
| index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..653df4c8b871ebfe03933d5ba44fd2d33debed0a 100644 |
| --- a/content/child/blob_storage/blob_transport_controller.cc |
| +++ b/content/child/blob_storage/blob_transport_controller.cc |
| @@ -4,24 +4,40 @@ |
| #include "content/child/blob_storage/blob_transport_controller.h" |
| +#include <limits> |
| +#include <memory> |
| #include <utility> |
| #include <vector> |
| +#include "base/bind.h" |
| +#include "base/bind_helpers.h" |
| +#include "base/callback.h" |
| +#include "base/files/file.h" |
| #include "base/lazy_instance.h" |
| +#include "base/location.h" |
| +#include "base/memory/ptr_util.h" |
| #include "base/memory/scoped_vector.h" |
| #include "base/memory/shared_memory.h" |
| +#include "base/metrics/histogram_macros.h" |
| +#include "base/optional.h" |
| #include "base/single_thread_task_runner.h" |
| #include "base/stl_util.h" |
| +#include "base/task_runner.h" |
| +#include "base/task_runner_util.h" |
| +#include "base/thread_task_runner_handle.h" |
| +#include "base/time/time.h" |
| #include "content/child/blob_storage/blob_consolidation.h" |
| #include "content/child/child_process.h" |
| #include "content/child/thread_safe_sender.h" |
| #include "content/common/fileapi/webblob_messages.h" |
| +#include "ipc/ipc_message.h" |
| #include "ipc/ipc_sender.h" |
| #include "storage/common/blob_storage/blob_item_bytes_request.h" |
| #include "storage/common/blob_storage/blob_item_bytes_response.h" |
| #include "storage/common/data_element.h" |
| #include "third_party/WebKit/public/platform/Platform.h" |
| +using base::File; |
| using base::SharedMemory; |
| using base::SharedMemoryHandle; |
| using storage::BlobItemBytesRequest; |
| @@ -30,10 +46,11 @@ using storage::IPCBlobItemRequestStrategy; |
| using storage::DataElement; |
| using storage::kBlobStorageIPCThresholdBytes; |
| -namespace content { |
| - |
| +using storage::BlobItemBytesResponse; |
| +using storage::BlobItemBytesRequest; |
| using storage::IPCBlobCreationCancelCode; |
| +namespace content { |
| using ConsolidatedItem = BlobConsolidation::ConsolidatedItem; |
| using ReadStatus = BlobConsolidation::ReadStatus; |
| @@ -52,6 +69,84 @@ void DecChildProcessRefCount() { |
| blink::Platform::current()->suddenTerminationChanged(true); |
| ChildProcess::current()->ReleaseProcess(); |
| } |
| + |
| +void DecChildProcessRefCountTimes(size_t times) { |
| + for (size_t i = 0; i < times; i++) { |
| + blink::Platform::current()->suddenTerminationChanged(true); |
| + ChildProcess::current()->ReleaseProcess(); |
| + } |
| +} |
| + |
| +bool WriteSingleChunk(base::File* file, const char* memory, size_t size) { |
| + size_t written = 0; |
| + size_t max_int = static_cast<size_t>(std::numeric_limits<int>::max()); |
| + while (written < size) { |
| + size_t writing_size = std::min(max_int, size - written); |
| + int actual_written = |
| + file->WriteAtCurrentPos(memory, static_cast<int>(writing_size)); |
| + if (actual_written < 0) { |
| + UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileWriteFailed", true); |
| + return false; |
| + } |
| + written += writing_size; |
|
michaeln
2016/04/28 00:16:56
should this be written += actual_written?
and wha
dmurph
2016/05/09 19:55:47
how would that happen? We wouldn't be calling file
michaeln
2016/05/09 22:24:52
right, let me be more clear
What if (actual_writt
michaeln
2016/05/09 23:01:08
nm, i see you changed it to += actual_written :)
|
| + } |
| + return true; |
| +} |
| + |
| +base::Optional<base::Time> WriteSingleRequestToDisk( |
| + const BlobConsolidation* consolidation, |
| + const BlobItemBytesRequest& request, |
| + const IPC::PlatformFileForTransit& transit_file) { |
| + File file = IPC::PlatformFileForTransitToFile(transit_file); |
| + if (!file.IsValid()) |
| + return base::nullopt; |
| + CHECK_LE(request.handle_offset, |
| + static_cast<uint64_t>(std::numeric_limits<int64_t>::max())); |
| + int64_t seek_distance = |
| + file.Seek(File::FROM_BEGIN, static_cast<int64_t>(request.handle_offset)); |
| + if (seek_distance < 0) { |
| + UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileSeekFailed", true); |
| + return base::nullopt; |
| + } |
| + BlobConsolidation::ReadStatus status = consolidation->VisitMemory( |
| + request.renderer_item_index, request.renderer_item_offset, request.size, |
| + base::Bind(&WriteSingleChunk, &file)); |
| + if (status != BlobConsolidation::ReadStatus::OK) |
| + return base::nullopt; |
| + File::Info info; |
| + file.GetInfo(&info); |
| + // We need to release the file so we don't automatically close the file. |
|
michaeln
2016/04/28 00:16:57
i'm curious, when do the file handles get closed?
dmurph
2016/05/09 19:55:47
Good point. I changed this to accept a file, and I
|
| + file.TakePlatformFile(); |
| + return base::make_optional(info.last_modified); |
| +} |
| + |
| +// This returns either the responses, or if they're empty, an error code. |
| +std::pair<std::vector<storage::BlobItemBytesResponse>, |
| + IPCBlobCreationCancelCode> |
| +WriteDiskRequests( |
| + scoped_refptr<BlobConsolidation> consolidation, |
| + std::unique_ptr<std::vector<BlobItemBytesRequest>> requests, |
| + const std::vector<IPC::PlatformFileForTransit>& file_handles) { |
| + std::vector<BlobItemBytesResponse> responses; |
| + std::vector<base::Time> last_modified_times; |
| + last_modified_times.resize(file_handles.size()); |
| + for (const auto& request : *requests) { |
| + base::Optional<base::Time> last_modified = WriteSingleRequestToDisk( |
| + consolidation.get(), request, file_handles[request.handle_index]); |
| + if (!last_modified) { |
| + return std::make_pair(std::vector<storage::BlobItemBytesResponse>(), |
| + IPCBlobCreationCancelCode::FILE_WRITE_FAILED); |
| + } |
| + last_modified_times[request.handle_index] = last_modified.value(); |
| + } |
| + for (const auto& request : *requests) { |
| + responses.push_back(BlobItemBytesResponse(request.request_number)); |
| + responses.back().time_file_modified = |
| + last_modified_times[request.handle_index]; |
| + } |
| + return std::make_pair(responses, IPCBlobCreationCancelCode::UNKNOWN); |
| +} |
| + |
| } // namespace |
| BlobTransportController* BlobTransportController::GetInstance() { |
| @@ -62,7 +157,7 @@ BlobTransportController* BlobTransportController::GetInstance() { |
| void BlobTransportController::InitiateBlobTransfer( |
| const std::string& uuid, |
| const std::string& content_type, |
| - std::unique_ptr<BlobConsolidation> consolidation, |
| + scoped_refptr<BlobConsolidation> consolidation, |
| scoped_refptr<ThreadSafeSender> sender, |
| base::SingleThreadTaskRunner* io_runner, |
| scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
| @@ -82,8 +177,7 @@ void BlobTransportController::InitiateBlobTransfer( |
| FROM_HERE, |
| base::Bind(&BlobTransportController::StoreBlobDataForRequests, |
| base::Unretained(BlobTransportController::GetInstance()), uuid, |
| - base::Passed(std::move(consolidation)), |
| - base::Passed(std::move(main_runner)))); |
| + base::Passed(&consolidation), base::Passed(&main_runner))); |
| // TODO(dmurph): Merge register and start messages. |
| sender->Send(new BlobStorageMsg_RegisterBlobUUID(uuid, content_type, "", |
| referenced_blobs)); |
| @@ -95,29 +189,102 @@ void BlobTransportController::OnMemoryRequest( |
| const std::vector<storage::BlobItemBytesRequest>& requests, |
| std::vector<base::SharedMemoryHandle>* memory_handles, |
| const std::vector<IPC::PlatformFileForTransit>& file_handles, |
| + base::TaskRunner* file_runner, |
| IPC::Sender* sender) { |
| - std::vector<storage::BlobItemBytesResponse> responses; |
| - ResponsesStatus status = |
| - GetResponses(uuid, requests, memory_handles, file_handles, &responses); |
| - |
| - switch (status) { |
| - case ResponsesStatus::BLOB_NOT_FOUND: |
| - // sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid, |
| - // IPCBlobCreationCancelCode::UNKNOWN)); |
| - return; |
| - case ResponsesStatus::SHARED_MEMORY_MAP_FAILED: |
| - // This would happen if the renderer process doesn't have enough memory |
| - // to map the shared memory, which is possible if we don't have much |
| - // memory. If this scenario happens often, we could delay the response |
| - // until we have enough memory. For now we just fail. |
| - CHECK(false) << "Unable to map shared memory to send blob " << uuid |
| - << "."; |
| - break; |
| - case ResponsesStatus::SUCCESS: |
| - break; |
| + std::vector<BlobItemBytesResponse> responses; |
| + auto it = blob_storage_.find(uuid); |
| + // Ignore invalid messages. |
| + if (it == blob_storage_.end()) |
| + return; |
| + |
| + BlobConsolidation* consolidation = it->second.get(); |
| + const auto& consolidated_items = consolidation->consolidated_items(); |
| + |
| + std::unique_ptr<std::vector<BlobItemBytesRequest>> file_requests( |
| + new std::vector<BlobItemBytesRequest>()); |
| + |
| + // Since we can be writing to the same shared memory handle from multiple |
| + // requests, we keep them in a vector and lazily create them. |
| + ScopedVector<SharedMemory> opened_memory; |
| + opened_memory.resize(memory_handles->size()); |
| + for (const BlobItemBytesRequest& request : requests) { |
| + DCHECK_LT(request.renderer_item_index, consolidated_items.size()) |
| + << "Invalid item index"; |
| + |
| + const ConsolidatedItem& item = |
| + consolidated_items[request.renderer_item_index]; |
| + DCHECK_LE(request.renderer_item_offset + request.size, item.length) |
| + << "Invalid data range"; |
| + DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
| + |
| + switch (request.transport_strategy) { |
| + case IPCBlobItemRequestStrategy::IPC: { |
| + responses.push_back(BlobItemBytesResponse(request.request_number)); |
| + BlobItemBytesResponse& response = responses.back(); |
| + ReadStatus status = consolidation->ReadMemory( |
| + request.renderer_item_index, request.renderer_item_offset, |
| + request.size, response.allocate_mutable_data(request.size)); |
| + DCHECK(status == ReadStatus::OK) |
| + << "Error reading from consolidated blob: " |
| + << static_cast<int>(status); |
| + break; |
| + } |
| + case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
| + responses.push_back(BlobItemBytesResponse(request.request_number)); |
| + DCHECK_LT(request.handle_index, memory_handles->size()) |
| + << "Invalid handle index."; |
| + SharedMemory* memory = opened_memory[request.handle_index]; |
| + if (!memory) { |
| + SharedMemoryHandle& handle = (*memory_handles)[request.handle_index]; |
| + DCHECK(SharedMemory::IsHandleValid(handle)); |
| + std::unique_ptr<SharedMemory> shared_memory( |
| + new SharedMemory(handle, false)); |
| + |
| + if (!shared_memory->Map(request.size)) { |
| + // This would happen if the renderer process doesn't have enough |
| + // memory |
|
michaeln
2016/04/28 00:16:56
nit: line wraps are arbitrary
dmurph
2016/05/09 19:55:47
Done.
|
| + // to map the shared memory, which is possible if we don't have much |
| + // memory. If this scenario happens often, we could delay the |
| + // response |
| + // until we have enough memory. For now we just fail. |
| + CHECK(false) << "Unable to map shared memory to send blob " << uuid |
| + << "."; |
| + return; |
| + } |
| + memory = shared_memory.get(); |
| + opened_memory[request.handle_index] = shared_memory.release(); |
| + } |
| + CHECK(memory->memory()) << "Couldn't map memory for blob transfer."; |
| + ReadStatus status = consolidation->ReadMemory( |
| + request.renderer_item_index, request.renderer_item_offset, |
| + request.size, |
| + static_cast<char*>(memory->memory()) + request.handle_offset); |
| + DCHECK(status == ReadStatus::OK) |
| + << "Error reading from consolidated blob: " |
| + << static_cast<int>(status); |
| + break; |
| + } |
| + case IPCBlobItemRequestStrategy::FILE: |
| + DCHECK_LT(request.handle_index, file_handles.size()) |
| + << "Invalid handle index."; |
| + file_requests->push_back(request); |
| + break; |
| + case IPCBlobItemRequestStrategy::UNKNOWN: |
| + NOTREACHED(); |
| + break; |
| + } |
| + } |
| + if (!file_requests->empty()) { |
| + base::PostTaskAndReplyWithResult( |
| + file_runner, FROM_HERE, |
| + base::Bind(&WriteDiskRequests, make_scoped_refptr(consolidation), |
| + base::Passed(&file_requests), file_handles), |
| + base::Bind(&BlobTransportController::OnFileWriteComplete, |
| + weak_factory_.GetWeakPtr(), sender, uuid)); |
| } |
| - sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
| + if (!responses.empty()) |
| + sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
| } |
| void BlobTransportController::OnCancel( |
| @@ -132,6 +299,17 @@ void BlobTransportController::OnDone(const std::string& uuid) { |
| ReleaseBlobConsolidation(uuid); |
| } |
| +void BlobTransportController::CancelAllBlobTransfers() { |
| + weak_factory_.InvalidateWeakPtrs(); |
| + if (!blob_storage_.empty() && main_thread_runner_) { |
| + main_thread_runner_->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DecChildProcessRefCountTimes, blob_storage_.size())); |
| + } |
| + main_thread_runner_ = nullptr; |
| + blob_storage_.clear(); |
| +} |
| + |
| // static |
| void BlobTransportController::GetDescriptions( |
| BlobConsolidation* consolidation, |
| @@ -186,21 +364,28 @@ void BlobTransportController::GetDescriptions( |
| } |
| } |
| -BlobTransportController::BlobTransportController() {} |
| +BlobTransportController::BlobTransportController() : weak_factory_(this) {} |
| BlobTransportController::~BlobTransportController() {} |
| -void BlobTransportController::ClearForTesting() { |
| - if (!blob_storage_.empty() && main_thread_runner_) { |
| - main_thread_runner_->PostTask(FROM_HERE, |
| - base::Bind(&DecChildProcessRefCount)); |
| +void BlobTransportController::OnFileWriteComplete( |
| + IPC::Sender* sender, |
| + const std::string& uuid, |
| + const std::pair<std::vector<BlobItemBytesResponse>, |
| + IPCBlobCreationCancelCode>& result) { |
| + if (blob_storage_.find(uuid) == blob_storage_.end()) |
| + return; |
| + if (!result.first.empty()) { |
| + sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, result.first)); |
| + return; |
| } |
| - blob_storage_.clear(); |
| + sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid, result.second)); |
| + ReleaseBlobConsolidation(uuid); |
| } |
| void BlobTransportController::StoreBlobDataForRequests( |
| const std::string& uuid, |
| - std::unique_ptr<BlobConsolidation> consolidation, |
| + scoped_refptr<BlobConsolidation> consolidation, |
| scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
| if (!main_thread_runner_.get()) { |
| main_thread_runner_ = std::move(main_runner); |
| @@ -208,81 +393,6 @@ void BlobTransportController::StoreBlobDataForRequests( |
| blob_storage_[uuid] = std::move(consolidation); |
| } |
| -BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
| - const std::string& uuid, |
| - const std::vector<BlobItemBytesRequest>& requests, |
| - std::vector<SharedMemoryHandle>* memory_handles, |
| - const std::vector<IPC::PlatformFileForTransit>& file_handles, |
| - std::vector<BlobItemBytesResponse>* out) { |
| - DCHECK(out->empty()); |
| - auto it = blob_storage_.find(uuid); |
| - if (it == blob_storage_.end()) |
| - return ResponsesStatus::BLOB_NOT_FOUND; |
| - |
| - BlobConsolidation* consolidation = it->second.get(); |
| - const auto& consolidated_items = consolidation->consolidated_items(); |
| - |
| - // Since we can be writing to the same shared memory handle from multiple |
| - // requests, we keep them in a vector and lazily create them. |
| - ScopedVector<SharedMemory> opened_memory; |
| - opened_memory.resize(memory_handles->size()); |
| - for (const BlobItemBytesRequest& request : requests) { |
| - DCHECK_LT(request.renderer_item_index, consolidated_items.size()) |
| - << "Invalid item index"; |
| - |
| - const ConsolidatedItem& item = |
| - consolidated_items[request.renderer_item_index]; |
| - DCHECK_LE(request.renderer_item_offset + request.size, item.length) |
| - << "Invalid data range"; |
| - DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
| - |
| - out->push_back(BlobItemBytesResponse(request.request_number)); |
| - switch (request.transport_strategy) { |
| - case IPCBlobItemRequestStrategy::IPC: { |
| - BlobItemBytesResponse& response = out->back(); |
| - ReadStatus status = consolidation->ReadMemory( |
| - request.renderer_item_index, request.renderer_item_offset, |
| - request.size, response.allocate_mutable_data(request.size)); |
| - DCHECK(status == ReadStatus::OK) |
| - << "Error reading from consolidated blob: " |
| - << static_cast<int>(status); |
| - break; |
| - } |
| - case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
| - DCHECK_LT(request.handle_index, memory_handles->size()) |
| - << "Invalid handle index."; |
| - SharedMemory* memory = opened_memory[request.handle_index]; |
| - if (!memory) { |
| - SharedMemoryHandle& handle = (*memory_handles)[request.handle_index]; |
| - DCHECK(SharedMemory::IsHandleValid(handle)); |
| - std::unique_ptr<SharedMemory> shared_memory( |
| - new SharedMemory(handle, false)); |
| - if (!shared_memory->Map(request.size)) |
| - return ResponsesStatus::SHARED_MEMORY_MAP_FAILED; |
| - memory = shared_memory.get(); |
| - opened_memory[request.handle_index] = shared_memory.release(); |
| - } |
| - CHECK(memory->memory()) << "Couldn't map memory for blob transfer."; |
| - ReadStatus status = consolidation->ReadMemory( |
| - request.renderer_item_index, request.renderer_item_offset, |
| - request.size, |
| - static_cast<char*>(memory->memory()) + request.handle_offset); |
| - DCHECK(status == ReadStatus::OK) |
| - << "Error reading from consolidated blob: " |
| - << static_cast<int>(status); |
| - break; |
| - } |
| - case IPCBlobItemRequestStrategy::FILE: |
| - NOTREACHED() << "TODO(dmurph): Not implemented."; |
| - break; |
| - case IPCBlobItemRequestStrategy::UNKNOWN: |
| - NOTREACHED(); |
| - break; |
| - } |
| - } |
| - return ResponsesStatus::SUCCESS; |
| -} |
| - |
| void BlobTransportController::ReleaseBlobConsolidation( |
| const std::string& uuid) { |
| if (blob_storage_.erase(uuid)) { |