Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(208)

Unified Diff: content/child/blob_storage/blob_transport_controller.cc

Issue 1414123002: [BlobAsync] Renderer support for blob file writing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@blob-hookup
Patch Set: rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/blob_storage/blob_transport_controller.cc
diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc
index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..c0157bd017b055443d7da8c4a01041333fb25504 100644
--- a/content/child/blob_storage/blob_transport_controller.cc
+++ b/content/child/blob_storage/blob_transport_controller.cc
@@ -4,14 +4,21 @@
#include "content/child/blob_storage/blob_transport_controller.h"
+#include <limits>
#include <utility>
#include <vector>
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/files/file.h"
#include "base/lazy_instance.h"
+#include "base/location.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/shared_memory.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
+#include "base/task_runner.h"
#include "content/child/blob_storage/blob_consolidation.h"
#include "content/child/child_process.h"
#include "content/child/thread_safe_sender.h"
@@ -22,6 +29,7 @@
#include "storage/common/data_element.h"
#include "third_party/WebKit/public/platform/Platform.h"
+using base::File;
using base::SharedMemory;
using base::SharedMemoryHandle;
using storage::BlobItemBytesRequest;
@@ -30,10 +38,12 @@ using storage::IPCBlobItemRequestStrategy;
using storage::DataElement;
using storage::kBlobStorageIPCThresholdBytes;
-namespace content {
-
+using storage::BlobItemBytesResponse;
+using storage::BlobItemBytesRequest;
using storage::IPCBlobCreationCancelCode;
+namespace content {
+
using ConsolidatedItem = BlobConsolidation::ConsolidatedItem;
using ReadStatus = BlobConsolidation::ReadStatus;
@@ -52,6 +62,73 @@ void DecChildProcessRefCount() {
blink::Platform::current()->suddenTerminationChanged(true);
ChildProcess::current()->ReleaseProcess();
}
+
+bool WriteSingleChunk(base::File* file,
+ size_t total_read,
+ const char* memory,
+ size_t size) {
+ size_t written = 0;
+ size_t max_int = static_cast<size_t>(std::numeric_limits<int>::max());
+ while (written < size) {
+ size_t writing_size = std::min(max_int, size - written);
+ int actual_written =
+ file->WriteAtCurrentPos(memory, static_cast<int>(writing_size));
+ if (actual_written < 0) {
+ LOG(ERROR) << "Error writing to file " << actual_written;
+ return false;
+ }
+ written += writing_size;
+ }
+ return true;
+}
+
+bool WriteSingleRequestToDisk(const BlobConsolidation* consolidation,
+ const BlobItemBytesRequest& request,
+ const IPC::PlatformFileForTransit& transit_file) {
+ File file = IPC::PlatformFileForTransitToFile(transit_file);
+ if (!file.IsValid())
+ return false;
+ CHECK_LE(request.handle_offset,
+ static_cast<uint64_t>(std::numeric_limits<int64_t>::max()));
+ int64_t seek_distance =
+ file.Seek(File::FROM_BEGIN, static_cast<int64_t>(request.handle_offset));
+ if (seek_distance < 0) {
+ LOG(ERROR) << "Error seeking " << request.handle_offset << " in file, got "
+ << seek_distance
+ << ". File error: " << File::ErrorToString(file.error_details());
+ return false;
+ }
+ BlobConsolidation::ReadStatus status = consolidation->VisitMemory(
+ request.renderer_item_index, request.renderer_item_offset, request.size,
+ base::Bind(&WriteSingleChunk, &file));
+ // We need to release the file so we don't automatically close the file.
+ file.TakePlatformFile();
+ return status == ReadStatus::OK;
+}
+
+void WriteDiskRequests(
+ scoped_refptr<BlobConsolidation> consolidation,
+ std::vector<BlobItemBytesRequest>* requests,
+ const std::vector<IPC::PlatformFileForTransit>& file_handles,
+ const BlobTransportController::ResponseCallback& response_callback,
+ const BlobTransportController::CancelCallback& error_callback,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ for (const auto& request : *requests) {
+ bool success = WriteSingleRequestToDisk(consolidation.get(), request,
+ file_handles[request.handle_index]);
+ if (!success) {
+ io_task_runner->PostTask(
+ FROM_HERE, base::Bind(error_callback,
+ IPCBlobCreationCancelCode::FILE_WRITE_FAILED));
+ return;
+ }
+ io_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(response_callback,
+ std::vector<BlobItemBytesResponse>{
+ BlobItemBytesResponse(request.request_number)}));
+ }
+}
} // namespace
BlobTransportController* BlobTransportController::GetInstance() {
@@ -62,7 +139,7 @@ BlobTransportController* BlobTransportController::GetInstance() {
void BlobTransportController::InitiateBlobTransfer(
const std::string& uuid,
const std::string& content_type,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<ThreadSafeSender> sender,
base::SingleThreadTaskRunner* io_runner,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
@@ -95,10 +172,17 @@ void BlobTransportController::OnMemoryRequest(
const std::vector<storage::BlobItemBytesRequest>& requests,
std::vector<base::SharedMemoryHandle>* memory_handles,
const std::vector<IPC::PlatformFileForTransit>& file_handles,
- IPC::Sender* sender) {
+ IPC::Sender* sender,
+ scoped_refptr<base::TaskRunner> io_runner,
+ base::TaskRunner* file_runner,
+ const ResponseCallback& async_response_callback,
+ const CancelCallback& async_cancel_callback) {
std::vector<storage::BlobItemBytesResponse> responses;
- ResponsesStatus status =
- GetResponses(uuid, requests, memory_handles, file_handles, &responses);
+ ResponsesStatus status = GetResponses(
+ uuid, requests, memory_handles, file_handles, &responses,
+ std::move(io_runner), file_runner, async_response_callback,
+ base::Bind(&BlobTransportController::CancelAsyncBlobTransfer,
+ base::Unretained(this), async_cancel_callback, uuid));
switch (status) {
case ResponsesStatus::BLOB_NOT_FOUND:
@@ -114,10 +198,12 @@ void BlobTransportController::OnMemoryRequest(
<< ".";
break;
case ResponsesStatus::SUCCESS:
+ case ResponsesStatus::PENDING_IO:
break;
}
- sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses));
+ if (!responses.empty())
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses));
}
void BlobTransportController::OnCancel(
@@ -198,9 +284,17 @@ void BlobTransportController::ClearForTesting() {
blob_storage_.clear();
}
+void BlobTransportController::CancelAsyncBlobTransfer(
+ const CancelCallback& async_cancel_callback,
+ const std::string& uuid,
+ storage::IPCBlobCreationCancelCode code) {
+ blob_storage_.erase(uuid);
+ async_cancel_callback.Run(code);
+}
+
void BlobTransportController::StoreBlobDataForRequests(
const std::string& uuid,
- std::unique_ptr<BlobConsolidation> consolidation,
+ scoped_refptr<BlobConsolidation> consolidation,
scoped_refptr<base::SingleThreadTaskRunner> main_runner) {
if (!main_thread_runner_.get()) {
main_thread_runner_ = std::move(main_runner);
@@ -213,7 +307,11 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
const std::vector<BlobItemBytesRequest>& requests,
std::vector<SharedMemoryHandle>* memory_handles,
const std::vector<IPC::PlatformFileForTransit>& file_handles,
- std::vector<BlobItemBytesResponse>* out) {
+ std::vector<BlobItemBytesResponse>* out,
+ scoped_refptr<base::TaskRunner> io_runner,
+ base::TaskRunner* file_runner,
+ const ResponseCallback& response_callback,
+ const CancelCallback& error_callback) {
DCHECK(out->empty());
auto it = blob_storage_.find(uuid);
if (it == blob_storage_.end())
@@ -222,6 +320,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
BlobConsolidation* consolidation = it->second.get();
const auto& consolidated_items = consolidation->consolidated_items();
+ scoped_ptr<std::vector<BlobItemBytesRequest>> file_requests(
+ new std::vector<BlobItemBytesRequest>());
+
// Since we can be writing to the same shared memory handle from multiple
// requests, we keep them in a vector and lazily create them.
ScopedVector<SharedMemory> opened_memory;
@@ -236,9 +337,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
<< "Invalid data range";
DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type";
- out->push_back(BlobItemBytesResponse(request.request_number));
switch (request.transport_strategy) {
case IPCBlobItemRequestStrategy::IPC: {
+ out->push_back(BlobItemBytesResponse(request.request_number));
BlobItemBytesResponse& response = out->back();
ReadStatus status = consolidation->ReadMemory(
request.renderer_item_index, request.renderer_item_offset,
@@ -249,6 +350,7 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
break;
}
case IPCBlobItemRequestStrategy::SHARED_MEMORY: {
+ out->push_back(BlobItemBytesResponse(request.request_number));
DCHECK_LT(request.handle_index, memory_handles->size())
<< "Invalid handle index.";
SharedMemory* memory = opened_memory[request.handle_index];
@@ -273,14 +375,24 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses(
break;
}
case IPCBlobItemRequestStrategy::FILE:
- NOTREACHED() << "TODO(dmurph): Not implemented.";
+ DCHECK_LT(request.handle_index, file_handles.size())
+ << "Invalid handle index.";
+ file_requests->push_back(request);
break;
case IPCBlobItemRequestStrategy::UNKNOWN:
NOTREACHED();
break;
}
}
- return ResponsesStatus::SUCCESS;
+ if (file_requests->empty()) {
+ return ResponsesStatus::SUCCESS;
+ }
+ file_runner->PostTask(
+ FROM_HERE, base::Bind(&WriteDiskRequests, consolidation,
+ base::Owned(file_requests.release()), file_handles,
kinuko 2016/04/15 15:02:32 I think base::Passed(&file_requests) would be more
dmurph 2016/04/20 21:15:34 done.
+ response_callback, error_callback,
+ base::Passed(std::move(io_runner))));
+ return ResponsesStatus::PENDING_IO;
}
void BlobTransportController::ReleaseBlobConsolidation(

Powered by Google App Engine
This is Rietveld 408576698