Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(295)

Unified Diff: content/child/blob_storage/blob_transport_controller.cc

Issue 1414123002: [BlobAsync] Renderer support for blob file writing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@blob-hookup
Patch Set: and one more rebase error :/ Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/blob_storage/blob_transport_controller.cc
diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc
index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..1c817aa52f9c17549a8f4bd54fc95638abde5e89 100644
--- a/content/child/blob_storage/blob_transport_controller.cc
+++ b/content/child/blob_storage/blob_transport_controller.cc
@@ -4,24 +4,41 @@
#include "content/child/blob_storage/blob_transport_controller.h"
+#include <limits>
+#include <memory>
#include <utility>
#include <vector>
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/files/file.h"
#include "base/lazy_instance.h"
+#include "base/location.h"
+#include "base/memory/ptr_util.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/shared_memory.h"
+#include "base/metrics/histogram_macros.h"
+#include "base/numerics/safe_conversions.h"
+#include "base/optional.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
+#include "base/task_runner.h"
+#include "base/task_runner_util.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "base/time/time.h"
#include "content/child/blob_storage/blob_consolidation.h"
#include "content/child/child_process.h"
#include "content/child/thread_safe_sender.h"
#include "content/common/fileapi/webblob_messages.h"
+#include "ipc/ipc_message.h"
#include "ipc/ipc_sender.h"
#include "storage/common/blob_storage/blob_item_bytes_request.h"
#include "storage/common/blob_storage/blob_item_bytes_response.h"
#include "storage/common/data_element.h"
#include "third_party/WebKit/public/platform/Platform.h"
+using base::File;
using base::SharedMemory;
using base::SharedMemoryHandle;
using storage::BlobItemBytesRequest;
@@ -30,10 +47,11 @@ using storage::IPCBlobItemRequestStrategy;
using storage::DataElement;
using storage::kBlobStorageIPCThresholdBytes;
-namespace content {
-
+using storage::BlobItemBytesResponse;
+using storage::BlobItemBytesRequest;
using storage::IPCBlobCreationCancelCode;
+namespace content {
using ConsolidatedItem = BlobConsolidation::ConsolidatedItem;
using ReadStatus = BlobConsolidation::ReadStatus;
@@ -52,6 +70,86 @@ void DecChildProcessRefCount() {
blink::Platform::current()->suddenTerminationChanged(true);
ChildProcess::current()->ReleaseProcess();
}
+
+void DecChildProcessRefCountTimes(size_t times) {
+ for (size_t i = 0; i < times; i++) {
+ DecChildProcessRefCount();
+ }
+}
+
+bool WriteSingleChunk(base::File* file, const char* memory, size_t size) {
+ size_t written = 0;
+ while (written < size) {
+ size_t writing_size = base::saturated_cast<int>(size - written);
+ int actual_written =
+ file->WriteAtCurrentPos(memory, static_cast<int>(writing_size));
+ bool write_failed = actual_written < 0;
+ UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileWriteFailed", write_failed);
+ if (write_failed)
+ return false;
+ written += actual_written;
+ }
+ return true;
+}
+
+base::Optional<base::Time> WriteSingleRequestToDisk(
+ const BlobConsolidation* consolidation,
+ const BlobItemBytesRequest& request,
+ File* file) {
+ if (!file->IsValid())
+ return base::nullopt;
+ int64_t seek_distance = file->Seek(
+ File::FROM_BEGIN, base::checked_cast<int64_t>(request.handle_offset));
+ bool seek_failed = seek_distance < 0;
+ UMA_HISTOGRAM_BOOLEAN("Storage.Blob.RendererFileSeekFailed", seek_failed);
+ if (seek_failed) {
+ return base::nullopt;
+ }
+ BlobConsolidation::ReadStatus status = consolidation->VisitMemory(
+ request.renderer_item_index, request.renderer_item_offset, request.size,
+ base::Bind(&WriteSingleChunk, file));
+ if (status != BlobConsolidation::ReadStatus::OK)
+ return base::nullopt;
+ File::Info info;
+ file->GetInfo(&info);
+ return base::make_optional(info.last_modified);
+}
+
+// This returns either the responses, or if they're empty, an error code.
+std::pair<std::vector<storage::BlobItemBytesResponse>,
+ IPCBlobCreationCancelCode>
+WriteDiskRequests(
+ scoped_refptr<BlobConsolidation> consolidation,
+ std::unique_ptr<std::vector<BlobItemBytesRequest>> requests,
+ const std::vector<IPC::PlatformFileForTransit>& file_handles) {
+ std::vector<BlobItemBytesResponse> responses;
+ std::vector<base::Time> last_modified_times;
+ last_modified_times.resize(file_handles.size());
+ // We grab ownership of the file handles here. When this vector is destroyed
+ // it will close the files.
+ std::vector<File> files;
+ files.reserve(file_handles.size());
+ for (const auto& file_handle : file_handles) {
+ files.emplace_back(IPC::PlatformFileForTransitToFile(file_handle));
+ }
+ for (const auto& request : *requests) {
+ base::Optional<base::Time> last_modified = WriteSingleRequestToDisk(
+ consolidation.get(), request, &files[request.handle_index]);
+ if (!last_modified) {
+ return std::make_pair(std::vector<storage::BlobItemBytesResponse>(),
+ IPCBlobCreationCancelCode::FILE_WRITE_FAILED);
+ }
+ last_modified_times[request.handle_index] = last_modified.value();
+ }
+ for (const auto& request : *requests) {
+ responses.push_back(BlobItemBytesResponse(request.request_number));
+ responses.back().time_file_modified =
+ last_modified_times[request.handle_index];
+ }
+
+ return std::make_pair(responses, IPCBlobCreationCancelCode::UNKNOWN);
+}
+
} // namespace
BlobTransportController* BlobTransportController::GetInstance() {
@@ -62,7 +160,7 @@ BlobTransportController* BlobTransportController::GetInstance() {
void BlobTransportController::InitiateBlobTransfer(
const std::string& uuid,
const std::string& content_type,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<ThreadSafeSender> sender,
base::SingleThreadTaskRunner* io_runner,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
@@ -95,29 +193,100 @@ void BlobTransportController::OnMemoryRequest(
const std::vector<storage::BlobItemBytesRequest>& requests,
std::vector<base::SharedMemoryHandle>* memory_handles,
const std::vector<IPC::PlatformFileForTransit>& file_handles,
+ base::TaskRunner* file_runner,
IPC::Sender* sender) {
- std::vector<storage::BlobItemBytesResponse> responses;
- ResponsesStatus status =
- GetResponses(uuid, requests, memory_handles, file_handles, &responses);
-
- switch (status) {
- case ResponsesStatus::BLOB_NOT_FOUND:
- // sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid,
- // IPCBlobCreationCancelCode::UNKNOWN));
- return;
- case ResponsesStatus::SHARED_MEMORY_MAP_FAILED:
- // This would happen if the renderer process doesn't have enough memory
- // to map the shared memory, which is possible if we don't have much
- // memory. If this scenario happens often, we could delay the response
- // until we have enough memory. For now we just fail.
- CHECK(false) << "Unable to map shared memory to send blob " << uuid
- << ".";
- break;
- case ResponsesStatus::SUCCESS:
- break;
+ std::vector<BlobItemBytesResponse> responses;
+ auto it = blob_storage_.find(uuid);
+ // Ignore invalid messages.
+ if (it == blob_storage_.end())
+ return;
+
+ BlobConsolidation* consolidation = it->second.get();
+ const auto& consolidated_items = consolidation->consolidated_items();
+
+ std::unique_ptr<std::vector<BlobItemBytesRequest>> file_requests(
+ new std::vector<BlobItemBytesRequest>());
+
+ // Since we can be writing to the same shared memory handle from multiple
+ // requests, we keep them in a vector and lazily create them.
+ ScopedVector<SharedMemory> opened_memory;
+ opened_memory.resize(memory_handles->size());
+ for (const BlobItemBytesRequest& request : requests) {
+ DCHECK_LT(request.renderer_item_index, consolidated_items.size())
+ << "Invalid item index";
+
+ const ConsolidatedItem& item =
+ consolidated_items[request.renderer_item_index];
+ DCHECK_LE(request.renderer_item_offset + request.size, item.length)
+ << "Invalid data range";
+ DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type";
+
+ switch (request.transport_strategy) {
+ case IPCBlobItemRequestStrategy::IPC: {
+ responses.push_back(BlobItemBytesResponse(request.request_number));
+ BlobItemBytesResponse& response = responses.back();
+ ReadStatus status = consolidation->ReadMemory(
+ request.renderer_item_index, request.renderer_item_offset,
+ request.size, response.allocate_mutable_data(request.size));
+ DCHECK(status == ReadStatus::OK)
+ << "Error reading from consolidated blob: "
+ << static_cast<int>(status);
+ break;
+ }
+ case IPCBlobItemRequestStrategy::SHARED_MEMORY: {
+ responses.push_back(BlobItemBytesResponse(request.request_number));
+ DCHECK_LT(request.handle_index, memory_handles->size())
+ << "Invalid handle index.";
+ SharedMemory* memory = opened_memory[request.handle_index];
+ if (!memory) {
+ SharedMemoryHandle& handle = (*memory_handles)[request.handle_index];
+ DCHECK(SharedMemory::IsHandleValid(handle));
+ std::unique_ptr<SharedMemory> shared_memory(
+ new SharedMemory(handle, false));
+
+ if (!shared_memory->Map(request.size)) {
+ // This would happen if the renderer process doesn't have enough
+ // memory to map the shared memory, which is possible if we don't
+ // have much memory. If this scenario happens often, we could delay
+ // the response until we have enough memory. For now we just fail.
+ CHECK(false) << "Unable to map shared memory to send blob " << uuid
+ << ".";
+ return;
+ }
+ memory = shared_memory.get();
+ opened_memory[request.handle_index] = shared_memory.release();
+ }
+ CHECK(memory->memory()) << "Couldn't map memory for blob transfer.";
+ ReadStatus status = consolidation->ReadMemory(
+ request.renderer_item_index, request.renderer_item_offset,
+ request.size,
+ static_cast<char*>(memory->memory()) + request.handle_offset);
+ DCHECK(status == ReadStatus::OK)
+ << "Error reading from consolidated blob: "
+ << static_cast<int>(status);
+ break;
+ }
+ case IPCBlobItemRequestStrategy::FILE:
+ DCHECK_LT(request.handle_index, file_handles.size())
+ << "Invalid handle index.";
+ file_requests->push_back(request);
+ break;
+ case IPCBlobItemRequestStrategy::UNKNOWN:
+ NOTREACHED();
+ break;
+ }
+ }
+ if (!file_requests->empty()) {
+ base::PostTaskAndReplyWithResult(
+ file_runner, FROM_HERE,
+ base::Bind(&WriteDiskRequests, make_scoped_refptr(consolidation),
+ base::Passed(&file_requests), file_handles),
+ base::Bind(&BlobTransportController::OnFileWriteComplete,
+ weak_factory_.GetWeakPtr(), sender, uuid));
}
- sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses));
+ if (!responses.empty())
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses));
}
void BlobTransportController::OnCancel(
@@ -132,6 +301,17 @@ void BlobTransportController::OnDone(const std::string& uuid) {
ReleaseBlobConsolidation(uuid);
}
+void BlobTransportController::CancelAllBlobTransfers() {
+ weak_factory_.InvalidateWeakPtrs();
+ if (!blob_storage_.empty() && main_thread_runner_) {
+ main_thread_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&DecChildProcessRefCountTimes, blob_storage_.size()));
+ }
+ main_thread_runner_ = nullptr;
+ blob_storage_.clear();
+}
+
// static
void BlobTransportController::GetDescriptions(
BlobConsolidation* consolidation,
@@ -186,21 +366,28 @@ void BlobTransportController::GetDescriptions(
}
}
-BlobTransportController::BlobTransportController() {}
+BlobTransportController::BlobTransportController() : weak_factory_(this) {}
BlobTransportController::~BlobTransportController() {}
-void BlobTransportController::ClearForTesting() {
- if (!blob_storage_.empty() && main_thread_runner_) {
- main_thread_runner_->PostTask(FROM_HERE,
- base::Bind(&DecChildProcessRefCount));
+void BlobTransportController::OnFileWriteComplete(
+ IPC::Sender* sender,
+ const std::string& uuid,
+ const std::pair<std::vector<BlobItemBytesResponse>,
+ IPCBlobCreationCancelCode>& result) {
+ if (blob_storage_.find(uuid) == blob_storage_.end())
+ return;
+ if (!result.first.empty()) {
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, result.first));
+ return;
}
- blob_storage_.clear();
+ sender->Send(new BlobStorageMsg_CancelBuildingBlob(uuid, result.second));
+ ReleaseBlobConsolidation(uuid);
}
void BlobTransportController::StoreBlobDataForRequests(
const std::string& uuid,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
if (!main_thread_runner_.get()) {
main_thread_runner_ = std::move(main_runner);
@@ -208,81 +395,6 @@ void BlobTransportController::StoreBlobDataForRequests(
blob_storage_[uuid] = std::move(consolidation);
}
-BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
- const std::string& uuid,
- const std::vector<BlobItemBytesRequest>& requests,
- std::vector<SharedMemoryHandle>* memory_handles,
- const std::vector<IPC::PlatformFileForTransit>& file_handles,
- std::vector<BlobItemBytesResponse>* out) {
- DCHECK(out->empty());
- auto it = blob_storage_.find(uuid);
- if (it == blob_storage_.end())
- return ResponsesStatus::BLOB_NOT_FOUND;
-
- BlobConsolidation* consolidation = it->second.get();
- const auto& consolidated_items = consolidation->consolidated_items();
-
- // Since we can be writing to the same shared memory handle from multiple
- // requests, we keep them in a vector and lazily create them.
- ScopedVector<SharedMemory> opened_memory;
- opened_memory.resize(memory_handles->size());
- for (const BlobItemBytesRequest& request : requests) {
- DCHECK_LT(request.renderer_item_index, consolidated_items.size())
- << "Invalid item index";
-
- const ConsolidatedItem& item =
- consolidated_items[request.renderer_item_index];
- DCHECK_LE(request.renderer_item_offset + request.size, item.length)
- << "Invalid data range";
- DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type";
-
- out->push_back(BlobItemBytesResponse(request.request_number));
- switch (request.transport_strategy) {
- case IPCBlobItemRequestStrategy::IPC: {
- BlobItemBytesResponse& response = out->back();
- ReadStatus status = consolidation->ReadMemory(
- request.renderer_item_index, request.renderer_item_offset,
- request.size, response.allocate_mutable_data(request.size));
- DCHECK(status == ReadStatus::OK)
- << "Error reading from consolidated blob: "
- << static_cast<int>(status);
- break;
- }
- case IPCBlobItemRequestStrategy::SHARED_MEMORY: {
- DCHECK_LT(request.handle_index, memory_handles->size())
- << "Invalid handle index.";
- SharedMemory* memory = opened_memory[request.handle_index];
- if (!memory) {
- SharedMemoryHandle& handle = (*memory_handles)[request.handle_index];
- DCHECK(SharedMemory::IsHandleValid(handle));
- std::unique_ptr<SharedMemory> shared_memory(
- new SharedMemory(handle, false));
- if (!shared_memory->Map(request.size))
- return ResponsesStatus::SHARED_MEMORY_MAP_FAILED;
- memory = shared_memory.get();
- opened_memory[request.handle_index] = shared_memory.release();
- }
- CHECK(memory->memory()) << "Couldn't map memory for blob transfer.";
- ReadStatus status = consolidation->ReadMemory(
- request.renderer_item_index, request.renderer_item_offset,
- request.size,
- static_cast<char*>(memory->memory()) + request.handle_offset);
- DCHECK(status == ReadStatus::OK)
- << "Error reading from consolidated blob: "
- << static_cast<int>(status);
- break;
- }
- case IPCBlobItemRequestStrategy::FILE:
- NOTREACHED() << "TODO(dmurph): Not implemented.";
- break;
- case IPCBlobItemRequestStrategy::UNKNOWN:
- NOTREACHED();
- break;
- }
- }
- return ResponsesStatus::SUCCESS;
-}
-
void BlobTransportController::ReleaseBlobConsolidation(
const std::string& uuid) {
if (blob_storage_.erase(uuid)) {

Powered by Google App Engine
This is Rietveld 408576698