Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(580)

Unified Diff: content/child/blob_storage/blob_transport_controller.cc

Issue 1414123002: [BlobAsync] Renderer support for blob file writing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@blob-hookup
Patch Set: Added test & simplified IPC callback[ Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/blob_storage/blob_transport_controller.cc
diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc
index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..7e0adce768bd212b865bb2aaa56ba49555ec19b3 100644
--- a/content/child/blob_storage/blob_transport_controller.cc
+++ b/content/child/blob_storage/blob_transport_controller.cc
@@ -4,24 +4,34 @@
#include "content/child/blob_storage/blob_transport_controller.h"
+#include <limits>
#include <utility>
#include <vector>
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/files/file.h"
#include "base/lazy_instance.h"
+#include "base/location.h"
+#include "base/memory/ptr_util.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/shared_memory.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
+#include "base/task_runner.h"
#include "content/child/blob_storage/blob_consolidation.h"
#include "content/child/child_process.h"
#include "content/child/thread_safe_sender.h"
#include "content/common/fileapi/webblob_messages.h"
+#include "ipc/ipc_message.h"
#include "ipc/ipc_sender.h"
#include "storage/common/blob_storage/blob_item_bytes_request.h"
#include "storage/common/blob_storage/blob_item_bytes_response.h"
#include "storage/common/data_element.h"
#include "third_party/WebKit/public/platform/Platform.h"
+using base::File;
using base::SharedMemory;
using base::SharedMemoryHandle;
using storage::BlobItemBytesRequest;
@@ -30,10 +40,16 @@ using storage::IPCBlobItemRequestStrategy;
using storage::DataElement;
using storage::kBlobStorageIPCThresholdBytes;
-namespace content {
-
+using storage::BlobItemBytesResponse;
+using storage::BlobItemBytesRequest;
using storage::IPCBlobCreationCancelCode;
+namespace content {
+using ResponseCallback = base::Callback<void(
+ const std::vector<storage::BlobItemBytesResponse>& /* responses */)>;
+using CancelCallback =
+ base::Callback<void(storage::IPCBlobCreationCancelCode /* reason */)>;
+
using ConsolidatedItem = BlobConsolidation::ConsolidatedItem;
using ReadStatus = BlobConsolidation::ReadStatus;
@@ -52,6 +68,73 @@ void DecChildProcessRefCount() {
blink::Platform::current()->suddenTerminationChanged(true);
ChildProcess::current()->ReleaseProcess();
}
+
+bool WriteSingleChunk(base::File* file,
+ size_t total_read,
+ const char* memory,
+ size_t size) {
+ size_t written = 0;
+ size_t max_int = static_cast<size_t>(std::numeric_limits<int>::max());
+ while (written < size) {
+ size_t writing_size = std::min(max_int, size - written);
+ int actual_written =
+ file->WriteAtCurrentPos(memory, static_cast<int>(writing_size));
+ if (actual_written < 0) {
+ LOG(ERROR) << "Error writing to file " << actual_written;
+ return false;
+ }
+ written += writing_size;
+ }
+ return true;
+}
+
+bool WriteSingleRequestToDisk(const BlobConsolidation* consolidation,
+ const BlobItemBytesRequest& request,
+ const IPC::PlatformFileForTransit& transit_file) {
+ File file = IPC::PlatformFileForTransitToFile(transit_file);
+ if (!file.IsValid())
+ return false;
+ CHECK_LE(request.handle_offset,
+ static_cast<uint64_t>(std::numeric_limits<int64_t>::max()));
+ int64_t seek_distance =
+ file.Seek(File::FROM_BEGIN, static_cast<int64_t>(request.handle_offset));
+ if (seek_distance < 0) {
+ LOG(ERROR) << "Error seeking " << request.handle_offset << " in file, got "
+ << seek_distance
+ << ". File error: " << File::ErrorToString(file.error_details());
+ return false;
+ }
+ BlobConsolidation::ReadStatus status = consolidation->VisitMemory(
+ request.renderer_item_index, request.renderer_item_offset, request.size,
+ base::Bind(&WriteSingleChunk, &file));
+ // We need to release the file so we don't automatically close the file.
+ file.TakePlatformFile();
+ return status == ReadStatus::OK;
+}
+
+void WriteDiskRequests(
+ scoped_refptr<BlobConsolidation> consolidation,
+ std::unique_ptr<std::vector<BlobItemBytesRequest>> requests,
+ const std::vector<IPC::PlatformFileForTransit>& file_handles,
+ const ResponseCallback& response_callback,
+ const CancelCallback& error_callback,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ for (const auto& request : *requests) {
+ bool success = WriteSingleRequestToDisk(consolidation.get(), request,
+ file_handles[request.handle_index]);
+ if (!success) {
+ io_task_runner->PostTask(
+ FROM_HERE, base::Bind(error_callback,
+ IPCBlobCreationCancelCode::FILE_WRITE_FAILED));
+ return;
+ }
+ io_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(response_callback,
+ std::vector<BlobItemBytesResponse>{
+ BlobItemBytesResponse(request.request_number)}));
michaeln 2016/04/21 01:58:41 is BlobItemBytesResponse.time_file_modified not ne
dmurph 2016/04/22 22:37:09 Oh right, I need to add that.
+ }
+}
} // namespace
BlobTransportController* BlobTransportController::GetInstance() {
@@ -62,7 +145,7 @@ BlobTransportController* BlobTransportController::GetInstance() {
void BlobTransportController::InitiateBlobTransfer(
const std::string& uuid,
const std::string& content_type,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<ThreadSafeSender> sender,
base::SingleThreadTaskRunner* io_runner,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
@@ -82,8 +165,7 @@ void BlobTransportController::InitiateBlobTransfer(
FROM_HERE,
base::Bind(&BlobTransportController::StoreBlobDataForRequests,
base::Unretained(BlobTransportController::GetInstance()), uuid,
- base::Passed(std::move(consolidation)),
- base::Passed(std::move(main_runner))));
+ base::Passed(&consolidation), base::Passed(&main_runner)));
// TODO(dmurph): Merge register and start messages.
sender->Send(new BlobStorageMsg_RegisterBlobUUID(uuid, content_type, "",
referenced_blobs));
@@ -95,10 +177,17 @@ void BlobTransportController::OnMemoryRequest(
const std::vector<storage::BlobItemBytesRequest>& requests,
std::vector<base::SharedMemoryHandle>* memory_handles,
const std::vector<IPC::PlatformFileForTransit>& file_handles,
- IPC::Sender* sender) {
+ scoped_refptr<base::TaskRunner> io_runner,
+ base::TaskRunner* file_runner,
+ const IPCSender& ipc_sender) {
std::vector<storage::BlobItemBytesResponse> responses;
- ResponsesStatus status =
- GetResponses(uuid, requests, memory_handles, file_handles, &responses);
+ ResponsesStatus status = GetResponses(
+ uuid, requests, memory_handles, file_handles, &responses,
+ std::move(io_runner), file_runner,
+ base::Bind(&BlobTransportController::SendResponses,
+ base::Unretained(this), ipc_sender, uuid),
+ base::Bind(&BlobTransportController::SendCancelAndReleaseConsolidation,
+ base::Unretained(this), ipc_sender, uuid));
switch (status) {
case ResponsesStatus::BLOB_NOT_FOUND:
@@ -114,10 +203,13 @@ void BlobTransportController::OnMemoryRequest(
<< ".";
break;
case ResponsesStatus::SUCCESS:
+ case ResponsesStatus::PENDING_IO:
break;
}
- sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses));
+ if (!responses.empty())
+ ipc_sender.Run(base::WrapUnique(
michaeln 2016/04/21 18:51:43 As coded, the GetResponses helper is responsible f
dmurph 2016/04/22 22:37:09 Ok, done, check it out.
dmurph 2016/04/22 22:37:09 sgtm.
+ new BlobStorageMsg_MemoryItemResponse(uuid, responses)));
}
void BlobTransportController::OnCancel(
@@ -198,9 +290,26 @@ void BlobTransportController::ClearForTesting() {
blob_storage_.clear();
}
+void BlobTransportController::SendResponses(
+ const IPCSender& ipc_sender,
+ const std::string& uuid,
+ const std::vector<storage::BlobItemBytesResponse>& responses) {
+ ipc_sender.Run(
+ base::WrapUnique(new BlobStorageMsg_MemoryItemResponse(uuid, responses)));
+}
+
+void BlobTransportController::SendCancelAndReleaseConsolidation(
+ const IPCSender& ipc_sender,
+ const std::string& uuid,
+ storage::IPCBlobCreationCancelCode reason) {
+ ipc_sender.Run(
+ base::WrapUnique(new BlobStorageMsg_CancelBuildingBlob(uuid, reason)));
+ ReleaseBlobConsolidation(uuid);
+}
+
void BlobTransportController::StoreBlobDataForRequests(
const std::string& uuid,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
if (!main_thread_runner_.get()) {
main_thread_runner_ = std::move(main_runner);
@@ -213,7 +322,11 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
const std::vector<BlobItemBytesRequest>& requests,
std::vector<SharedMemoryHandle>* memory_handles,
const std::vector<IPC::PlatformFileForTransit>& file_handles,
- std::vector<BlobItemBytesResponse>* out) {
+ std::vector<BlobItemBytesResponse>* out,
+ scoped_refptr<base::TaskRunner> io_runner,
+ base::TaskRunner* file_runner,
+ const ResponseCallback& response_callback,
+ const CancelCallback& error_callback) {
DCHECK(out->empty());
auto it = blob_storage_.find(uuid);
if (it == blob_storage_.end())
@@ -222,6 +335,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
BlobConsolidation* consolidation = it->second.get();
const auto& consolidated_items = consolidation->consolidated_items();
+ scoped_ptr<std::vector<BlobItemBytesRequest>> file_requests(
+ new std::vector<BlobItemBytesRequest>());
+
// Since we can be writing to the same shared memory handle from multiple
// requests, we keep them in a vector and lazily create them.
ScopedVector<SharedMemory> opened_memory;
@@ -236,9 +352,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
<< "Invalid data range";
DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type";
- out->push_back(BlobItemBytesResponse(request.request_number));
switch (request.transport_strategy) {
case IPCBlobItemRequestStrategy::IPC: {
+ out->push_back(BlobItemBytesResponse(request.request_number));
BlobItemBytesResponse& response = out->back();
ReadStatus status = consolidation->ReadMemory(
request.renderer_item_index, request.renderer_item_offset,
@@ -249,6 +365,7 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
break;
}
case IPCBlobItemRequestStrategy::SHARED_MEMORY: {
+ out->push_back(BlobItemBytesResponse(request.request_number));
DCHECK_LT(request.handle_index, memory_handles->size())
<< "Invalid handle index.";
SharedMemory* memory = opened_memory[request.handle_index];
@@ -273,14 +390,24 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
break;
}
case IPCBlobItemRequestStrategy::FILE:
- NOTREACHED() << "TODO(dmurph): Not implemented.";
+ DCHECK_LT(request.handle_index, file_handles.size())
+ << "Invalid handle index.";
+ file_requests->push_back(request);
break;
case IPCBlobItemRequestStrategy::UNKNOWN:
NOTREACHED();
break;
}
}
- return ResponsesStatus::SUCCESS;
+ if (file_requests->empty()) {
+ return ResponsesStatus::SUCCESS;
+ }
+ file_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&WriteDiskRequests, consolidation,
+ base::Passed(&file_requests), file_handles, response_callback,
+ error_callback, base::Passed(&io_runner)));
michaeln 2016/04/21 01:58:41 Passing in a reply task runner seems odd. I'd expe
dmurph 2016/04/22 22:37:09 Done.
+ return ResponsesStatus::PENDING_IO;
}
void BlobTransportController::ReleaseBlobConsolidation(

Powered by Google App Engine
This is Rietveld 408576698