Index: content/child/blob_storage/blob_transport_controller.cc |
diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc |
index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..54006347c8de3fb48422c0db579e049eb665c4a8 100644 |
--- a/content/child/blob_storage/blob_transport_controller.cc |
+++ b/content/child/blob_storage/blob_transport_controller.cc |
@@ -4,24 +4,41 @@ |
#include "content/child/blob_storage/blob_transport_controller.h" |
+#include <limits> |
+#include <memory> |
#include <utility> |
#include <vector> |
+#include "base/bind.h" |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "base/files/file.h" |
#include "base/lazy_instance.h" |
+#include "base/location.h" |
+#include "base/memory/ptr_util.h" |
#include "base/memory/scoped_vector.h" |
#include "base/memory/shared_memory.h" |
+#include "base/metrics/histogram_macros.h" |
+#include "base/numerics/safe_conversions.h" |
+#include "base/optional.h" |
#include "base/single_thread_task_runner.h" |
#include "base/stl_util.h" |
+#include "base/task_runner.h" |
+#include "base/task_runner_util.h" |
+#include "base/thread_task_runner_handle.h" |
+#include "base/time/time.h" |
#include "content/child/blob_storage/blob_consolidation.h" |
#include "content/child/child_process.h" |
#include "content/child/thread_safe_sender.h" |
#include "content/common/fileapi/webblob_messages.h" |
+#include "ipc/ipc_message.h" |
#include "ipc/ipc_sender.h" |
#include "storage/common/blob_storage/blob_item_bytes_request.h" |
#include "storage/common/blob_storage/blob_item_bytes_response.h" |
#include "storage/common/data_element.h" |
#include "third_party/WebKit/public/platform/Platform.h" |
+using base::File; |
using base::SharedMemory; |
using base::SharedMemoryHandle; |
using storage::BlobItemBytesRequest; |
@@ -30,10 +47,11 @@ using storage::IPCBlobItemRequestStrategy; |
using storage::DataElement; |
using storage::kBlobStorageIPCThresholdBytes; |
-namespace content { |
- |
+using storage::BlobItemBytesResponse; |
+using storage::BlobItemBytesRequest; |
using storage::IPCBlobCreationCancelCode; |
+namespace content { |
using ConsolidatedItem = BlobConsolidation::ConsolidatedItem; |
using ReadStatus = BlobConsolidation::ReadStatus; |
@@ -52,6 +70,86 @@ void DecChildProcessRefCount() { |
blink::Platform::current()->suddenTerminationChanged(true); |
ChildProcess::current()->ReleaseProcess(); |
} |
+ |
+void DecChildProcessRefCountTimes(size_t times) { |
+ for (size_t i = 0; i < times; i++) { |
+ DecChildProcessRefCount(); |
+ } |
+} |
+ |
+bool WriteSingleChunk(base::File* file, const char* memory, size_t size) { |
+ size_t written = 0; |
+ while (written < size) { |
+ size_t writing_size = base::saturated_cast<int>(size - written); |
+ int actual_written = |
+ file->WriteAtCurrentPos(memory, static_cast<int>(writing_size)); |
+ bool write_failed = actual_written < 0; |
+ UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileWriteFailed", write_failed); |
+ if (write_failed) |
+ return false; |
+ written += actual_written; |
+ } |
+ return true; |
+} |
+ |
+base::Optional<base::Time> WriteSingleRequestToDisk( |
+ const BlobConsolidation* consolidation, |
+ const BlobItemBytesRequest& request, |
+ File* file) { |
+ if (!file->IsValid()) |
+ return base::nullopt; |
+ int64_t seek_distance = file->Seek( |
+ File::FROM_BEGIN, base::checked_cast<int64_t>(request.handle_offset)); |
+ bool seek_failed = seek_distance < 0; |
+ UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileSeekFailed", seek_failed); |
+ if (seek_failed) { |
+ return base::nullopt; |
+ } |
+ BlobConsolidation::ReadStatus status = consolidation->VisitMemory( |
+ request.renderer_item_index, request.renderer_item_offset, request.size, |
+ base::Bind(&WriteSingleChunk, file)); |
+ if (status != BlobConsolidation::ReadStatus::OK) |
+ return base::nullopt; |
+ File::Info info; |
+ file->GetInfo(&info); |
+ return base::make_optional(info.last_modified); |
+} |
+ |
+// This returns either the responses, or if they're empty, an error code. |
+std::pair<std::vector<storage::BlobItemBytesResponse>, |
+ IPCBlobCreationCancelCode> |
+WriteDiskRequests( |
+ scoped_refptr<BlobConsolidation> consolidation, |
+ std::unique_ptr<std::vector<BlobItemBytesRequest>> requests, |
+ const std::vector<IPC::PlatformFileForTransit>& file_handles) { |
+ std::vector<BlobItemBytesResponse> responses; |
+ std::vector<base::Time> last_modified_times; |
+ last_modified_times.resize(file_handles.size()); |
+ // We grab ownership of the file handles here. When this vector is destroyed |
+ // it will close the files. |
+ std::vector<File> files; |
+ files.reserve(file_handles.size()); |
+ for (const auto& file_handle : file_handles) { |
+ files.emplace_back(IPC::PlatformFileForTransitToFile(file_handle)); |
+ } |
+ for (const auto& request : *requests) { |
+ base::Optional<base::Time> last_modified = WriteSingleRequestToDisk( |
+ consolidation.get(), request, &files[request.handle_index]); |
+ if (!last_modified) { |
+ return std::make_pair(std::vector<storage::BlobItemBytesResponse>(), |
+ IPCBlobCreationCancelCode::FILE_WRITE_FAILED); |
+ } |
+ last_modified_times[request.handle_index] = last_modified.value(); |
+ } |
+ for (const auto& request : *requests) { |
+ responses.push_back(BlobItemBytesResponse(request.request_number)); |
+ responses.back().time_file_modified = |
+ last_modified_times[request.handle_index]; |
+ } |
+ |
+ return std::make_pair(responses, IPCBlobCreationCancelCode::UNKNOWN); |
+} |
+ |
} // namespace |
BlobTransportController* BlobTransportController::GetInstance() { |
@@ -62,7 +160,7 @@ BlobTransportController* BlobTransportController::GetInstance() { |
void BlobTransportController::InitiateBlobTransfer( |
const std::string& uuid, |
const std::string& content_type, |
- std::unique_ptr<BlobConsolidation> consolidation, |
+ scoped_refptr<BlobConsolidation> consolidation, |
scoped_refptr<ThreadSafeSender> sender, |
base::SingleThreadTaskRunner* io_runner, |
scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
@@ -82,8 +180,7 @@ void BlobTransportController::InitiateBlobTransfer( |
FROM_HERE, |
base::Bind(&BlobTransportController::StoreBlobDataForRequests, |
base::Unretained(BlobTransportController::GetInstance()), uuid, |
- base::Passed(std::move(consolidation)), |
- base::Passed(std::move(main_runner)))); |
+ base::Passed(&consolidation), base::Passed(&main_runner))); |
// TODO(dmurph): Merge register and start messages. |
sender->Send(new BlobStorageMsg_RegisterBlobUUID(uuid, content_type, "", |
referenced_blobs)); |
@@ -95,29 +192,100 @@ void BlobTransportController::OnMemoryRequest( |
const std::vector<storage::BlobItemBytesRequest>& requests, |
std::vector<base::SharedMemoryHandle>* memory_handles, |
const std::vector<IPC::PlatformFileForTransit>& file_handles, |
+ base::TaskRunner* file_runner, |
IPC::Sender* sender) { |
- std::vector<storage::BlobItemBytesResponse> responses; |
- ResponsesStatus status = |
- GetResponses(uuid, requests, memory_handles, file_handles, &responses); |
- |
- switch (status) { |
- case ResponsesStatus::BLOB_NOT_FOUND: |
- // sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid, |
- // IPCBlobCreationCancelCode::UNKNOWN)); |
- return; |
- case ResponsesStatus::SHARED_MEMORY_MAP_FAILED: |
- // This would happen if the renderer process doesn't have enough memory |
- // to map the shared memory, which is possible if we don't have much |
- // memory. If this scenario happens often, we could delay the response |
- // until we have enough memory. For now we just fail. |
- CHECK(false) << "Unable to map shared memory to send blob " << uuid |
- << "."; |
- break; |
- case ResponsesStatus::SUCCESS: |
- break; |
+ std::vector<BlobItemBytesResponse> responses; |
+ auto it = blob_storage_.find(uuid); |
+ // Ignore invalid messages. |
+ if (it == blob_storage_.end()) |
+ return; |
+ |
+ BlobConsolidation* consolidation = it->second.get(); |
+ const auto& consolidated_items = consolidation->consolidated_items(); |
+ |
+ std::unique_ptr<std::vector<BlobItemBytesRequest>> file_requests( |
+ new std::vector<BlobItemBytesRequest>()); |
+ |
+ // Since we can be writing to the same shared memory handle from multiple |
+ // requests, we keep them in a vector and lazily create them. |
+ ScopedVector<SharedMemory> opened_memory; |
+ opened_memory.resize(memory_handles->size()); |
+ for (const BlobItemBytesRequest& request : requests) { |
+ DCHECK_LT(request.renderer_item_index, consolidated_items.size()) |
+ << "Invalid item index"; |
+ |
+ const ConsolidatedItem& item = |
+ consolidated_items[request.renderer_item_index]; |
+ DCHECK_LE(request.renderer_item_offset + request.size, item.length) |
+ << "Invalid data range"; |
+ DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
+ |
+ switch (request.transport_strategy) { |
+ case IPCBlobItemRequestStrategy::IPC: { |
+ responses.push_back(BlobItemBytesResponse(request.request_number)); |
+ BlobItemBytesResponse& response = responses.back(); |
+ ReadStatus status = consolidation->ReadMemory( |
+ request.renderer_item_index, request.renderer_item_offset, |
+ request.size, response.allocate_mutable_data(request.size)); |
+ DCHECK(status == ReadStatus::OK) |
+ << "Error reading from consolidated blob: " |
+ << static_cast<int>(status); |
+ break; |
+ } |
+ case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
+ responses.push_back(BlobItemBytesResponse(request.request_number)); |
+ DCHECK_LT(request.handle_index, memory_handles->size()) |
+ << "Invalid handle index."; |
+ SharedMemory* memory = opened_memory[request.handle_index]; |
+ if (!memory) { |
+ SharedMemoryHandle& handle = (*memory_handles)[request.handle_index]; |
+ DCHECK(SharedMemory::IsHandleValid(handle)); |
+ std::unique_ptr<SharedMemory> shared_memory( |
+ new SharedMemory(handle, false)); |
+ |
+ if (!shared_memory->Map(request.size)) { |
+ // This would happen if the renderer process doesn't have enough |
+ // memory to map the shared memory, which is possible if we don't |
+ // have much memory. If this scenario happens often, we could delay |
+ // the response until we have enough memory. For now we just fail. |
+ CHECK(false) << "Unable to map shared memory to send blob " << uuid |
+ << "."; |
+ return; |
+ } |
+ memory = shared_memory.get(); |
+ opened_memory[request.handle_index] = shared_memory.release(); |
+ } |
+ CHECK(memory->memory()) << "Couldn't map memory for blob transfer."; |
+ ReadStatus status = consolidation->ReadMemory( |
+ request.renderer_item_index, request.renderer_item_offset, |
+ request.size, |
+ static_cast<char*>(memory->memory()) + request.handle_offset); |
+ DCHECK(status == ReadStatus::OK) |
+ << "Error reading from consolidated blob: " |
+ << static_cast<int>(status); |
+ break; |
+ } |
+ case IPCBlobItemRequestStrategy::FILE: |
+ DCHECK_LT(request.handle_index, file_handles.size()) |
+ << "Invalid handle index."; |
+ file_requests->push_back(request); |
+ break; |
+ case IPCBlobItemRequestStrategy::UNKNOWN: |
+ NOTREACHED(); |
+ break; |
+ } |
+ } |
+ if (!file_requests->empty()) { |
+ base::PostTaskAndReplyWithResult( |
+ file_runner, FROM_HERE, |
+ base::Bind(&WriteDiskRequests, make_scoped_refptr(consolidation), |
+ base::Passed(&file_requests), file_handles), |
+ base::Bind(&BlobTransportController::OnFileWriteComplete, |
+ weak_factory_.GetWeakPtr(), sender, uuid)); |
} |
- sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
+ if (!responses.empty()) |
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
} |
void BlobTransportController::OnCancel( |
@@ -132,6 +300,17 @@ void BlobTransportController::OnDone(const std::string& uuid) { |
ReleaseBlobConsolidation(uuid); |
} |
+void BlobTransportController::CancelAllBlobTransfers() { |
+ weak_factory_.InvalidateWeakPtrs(); |
+ if (!blob_storage_.empty() && main_thread_runner_) { |
+ main_thread_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&DecChildProcessRefCountTimes, blob_storage_.size())); |
+ } |
+ main_thread_runner_ = nullptr; |
+ blob_storage_.clear(); |
+} |
+ |
// static |
void BlobTransportController::GetDescriptions( |
BlobConsolidation* consolidation, |
@@ -186,21 +365,28 @@ void BlobTransportController::GetDescriptions( |
} |
} |
-BlobTransportController::BlobTransportController() {} |
+BlobTransportController::BlobTransportController() : weak_factory_(this) {} |
BlobTransportController::~BlobTransportController() {} |
-void BlobTransportController::ClearForTesting() { |
- if (!blob_storage_.empty() && main_thread_runner_) { |
- main_thread_runner_->PostTask(FROM_HERE, |
- base::Bind(&DecChildProcessRefCount)); |
+void BlobTransportController::OnFileWriteComplete( |
+ IPC::Sender* sender, |
+ const std::string& uuid, |
+ const std::pair<std::vector<BlobItemBytesResponse>, |
+ IPCBlobCreationCancelCode>& result) { |
+ if (blob_storage_.find(uuid) == blob_storage_.end()) |
+ return; |
+ if (!result.first.empty()) { |
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, result.first)); |
+ return; |
} |
- blob_storage_.clear(); |
+ sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid, result.second)); |
+ ReleaseBlobConsolidation(uuid); |
} |
void BlobTransportController::StoreBlobDataForRequests( |
const std::string& uuid, |
- std::unique_ptr<BlobConsolidation> consolidation, |
+ scoped_refptr<BlobConsolidation> consolidation, |
scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
if (!main_thread_runner_.get()) { |
main_thread_runner_ = std::move(main_runner); |
@@ -208,81 +394,6 @@ void BlobTransportController::StoreBlobDataForRequests( |
blob_storage_[uuid] = std::move(consolidation); |
} |
-BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
- const std::string& uuid, |
- const std::vector<BlobItemBytesRequest>& requests, |
- std::vector<SharedMemoryHandle>* memory_handles, |
- const std::vector<IPC::PlatformFileForTransit>& file_handles, |
- std::vector<BlobItemBytesResponse>* out) { |
- DCHECK(out->empty()); |
- auto it = blob_storage_.find(uuid); |
- if (it == blob_storage_.end()) |
- return ResponsesStatus::BLOB_NOT_FOUND; |
- |
- BlobConsolidation* consolidation = it->second.get(); |
- const auto& consolidated_items = consolidation->consolidated_items(); |
- |
- // Since we can be writing to the same shared memory handle from multiple |
- // requests, we keep them in a vector and lazily create them. |
- ScopedVector<SharedMemory> opened_memory; |
- opened_memory.resize(memory_handles->size()); |
- for (const BlobItemBytesRequest& request : requests) { |
- DCHECK_LT(request.renderer_item_index, consolidated_items.size()) |
- << "Invalid item index"; |
- |
- const ConsolidatedItem& item = |
- consolidated_items[request.renderer_item_index]; |
- DCHECK_LE(request.renderer_item_offset + request.size, item.length) |
- << "Invalid data range"; |
- DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
- |
- out->push_back(BlobItemBytesResponse(request.request_number)); |
- switch (request.transport_strategy) { |
- case IPCBlobItemRequestStrategy::IPC: { |
- BlobItemBytesResponse& response = out->back(); |
- ReadStatus status = consolidation->ReadMemory( |
- request.renderer_item_index, request.renderer_item_offset, |
- request.size, response.allocate_mutable_data(request.size)); |
- DCHECK(status == ReadStatus::OK) |
- << "Error reading from consolidated blob: " |
- << static_cast<int>(status); |
- break; |
- } |
- case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
- DCHECK_LT(request.handle_index, memory_handles->size()) |
- << "Invalid handle index."; |
- SharedMemory* memory = opened_memory[request.handle_index]; |
- if (!memory) { |
- SharedMemoryHandle& handle = (*memory_handles)[request.handle_index]; |
- DCHECK(SharedMemory::IsHandleValid(handle)); |
- std::unique_ptr<SharedMemory> shared_memory( |
- new SharedMemory(handle, false)); |
- if (!shared_memory->Map(request.size)) |
- return ResponsesStatus::SHARED_MEMORY_MAP_FAILED; |
- memory = shared_memory.get(); |
- opened_memory[request.handle_index] = shared_memory.release(); |
- } |
- CHECK(memory->memory()) << "Couldn't map memory for blob transfer."; |
- ReadStatus status = consolidation->ReadMemory( |
- request.renderer_item_index, request.renderer_item_offset, |
- request.size, |
- static_cast<char*>(memory->memory()) + request.handle_offset); |
- DCHECK(status == ReadStatus::OK) |
- << "Error reading from consolidated blob: " |
- << static_cast<int>(status); |
- break; |
- } |
- case IPCBlobItemRequestStrategy::FILE: |
- NOTREACHED() << "TODO(dmurph): Not implemented."; |
- break; |
- case IPCBlobItemRequestStrategy::UNKNOWN: |
- NOTREACHED(); |
- break; |
- } |
- } |
- return ResponsesStatus::SUCCESS; |
-} |
- |
void BlobTransportController::ReleaseBlobConsolidation( |
const std::string& uuid) { |
if (blob_storage_.erase(uuid)) { |