Index: content/child/blob_storage/blob_transport_controller.cc |
diff --git a/content/child/blob_storage/blob_transport_controller.cc b/content/child/blob_storage/blob_transport_controller.cc |
index 0bcadbc85d30e80ac8d13d4cbaad0b5864627a04..c0157bd017b055443d7da8c4a01041333fb25504 100644 |
--- a/content/child/blob_storage/blob_transport_controller.cc |
+++ b/content/child/blob_storage/blob_transport_controller.cc |
@@ -4,14 +4,21 @@ |
#include "content/child/blob_storage/blob_transport_controller.h" |
+#include <limits> |
#include <utility> |
#include <vector> |
+#include "base/bind.h" |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "base/files/file.h" |
#include "base/lazy_instance.h" |
+#include "base/location.h" |
#include "base/memory/scoped_vector.h" |
#include "base/memory/shared_memory.h" |
#include "base/single_thread_task_runner.h" |
#include "base/stl_util.h" |
+#include "base/task_runner.h" |
#include "content/child/blob_storage/blob_consolidation.h" |
#include "content/child/child_process.h" |
#include "content/child/thread_safe_sender.h" |
@@ -22,6 +29,7 @@ |
#include "storage/common/data_element.h" |
#include "third_party/WebKit/public/platform/Platform.h" |
+using base::File; |
using base::SharedMemory; |
using base::SharedMemoryHandle; |
using storage::BlobItemBytesRequest; |
@@ -30,10 +38,12 @@ using storage::IPCBlobItemRequestStrategy; |
using storage::DataElement; |
using storage::kBlobStorageIPCThresholdBytes; |
-namespace content { |
- |
+using storage::BlobItemBytesResponse; |
+using storage::BlobItemBytesRequest; |
using storage::IPCBlobCreationCancelCode; |
+namespace content { |
+ |
using ConsolidatedItem = BlobConsolidation::ConsolidatedItem; |
using ReadStatus = BlobConsolidation::ReadStatus; |
@@ -52,6 +62,73 @@ void DecChildProcessRefCount() { |
blink::Platform::current()->suddenTerminationChanged(true); |
ChildProcess::current()->ReleaseProcess(); |
} |
+ |
+bool WriteSingleChunk(base::File* file, |
+ size_t total_read, |
+ const char* memory, |
+ size_t size) { |
+ size_t written = 0; |
+ size_t max_int = static_cast<size_t>(std::numeric_limits<int>::max()); |
+ while (written < size) { |
+ size_t writing_size = std::min(max_int, size - written); |
+ int actual_written = |
+ file->WriteAtCurrentPos(memory, static_cast<int>(writing_size)); |
+ if (actual_written < 0) { |
+ LOG(ERROR) << "Error writing to file " << actual_written; |
+ return false; |
+ } |
+ written += writing_size; |
+ } |
+ return true; |
+} |
+ |
+bool WriteSingleRequestToDisk(const BlobConsolidation* consolidation, |
+ const BlobItemBytesRequest& request, |
+ const IPC::PlatformFileForTransit& transit_file) { |
+ File file = IPC::PlatformFileForTransitToFile(transit_file); |
+ if (!file.IsValid()) |
+ return false; |
+ CHECK_LE(request.handle_offset, |
+ static_cast<uint64_t>(std::numeric_limits<int64_t>::max())); |
+ int64_t seek_distance = |
+ file.Seek(File::FROM_BEGIN, static_cast<int64_t>(request.handle_offset)); |
+ if (seek_distance < 0) { |
+ LOG(ERROR) << "Error seeking " << request.handle_offset << " in file, got " |
+ << seek_distance |
+ << ". File error: " << File::ErrorToString(file.error_details()); |
+ return false; |
+ } |
+ BlobConsolidation::ReadStatus status = consolidation->VisitMemory( |
+ request.renderer_item_index, request.renderer_item_offset, request.size, |
+ base::Bind(&WriteSingleChunk, &file)); |
+ // We need to release the file so we don't automatically close the file. |
+ file.TakePlatformFile(); |
+ return status == ReadStatus::OK; |
+} |
+ |
+void WriteDiskRequests( |
+ scoped_refptr<BlobConsolidation> consolidation, |
+ std::vector<BlobItemBytesRequest>* requests, |
+ const std::vector<IPC::PlatformFileForTransit>& file_handles, |
+ const BlobTransportController::ResponseCallback& response_callback, |
+ const BlobTransportController::CancelCallback& error_callback, |
+ scoped_refptr<base::TaskRunner> io_task_runner) { |
+ for (const auto& request : *requests) { |
+ bool success = WriteSingleRequestToDisk(consolidation.get(), request, |
+ file_handles[request.handle_index]); |
+ if (!success) { |
+ io_task_runner->PostTask( |
+ FROM_HERE, base::Bind(error_callback, |
+ IPCBlobCreationCancelCode::FILE_WRITE_FAILED)); |
+ return; |
+ } |
+ io_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(response_callback, |
+ std::vector<BlobItemBytesResponse>{ |
+ BlobItemBytesResponse(request.request_number)})); |
+ } |
+} |
} // namespace |
BlobTransportController* BlobTransportController::GetInstance() { |
@@ -62,7 +139,7 @@ BlobTransportController* BlobTransportController::GetInstance() { |
void BlobTransportController::InitiateBlobTransfer( |
const std::string& uuid, |
const std::string& content_type, |
- std::unique_ptr<BlobConsolidation> consolidation, |
+ scoped_refptr<BlobConsolidation> consolidation, |
scoped_refptr<ThreadSafeSender> sender, |
base::SingleThreadTaskRunner* io_runner, |
scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
@@ -95,10 +172,17 @@ void BlobTransportController::OnMemoryRequest( |
const std::vector<storage::BlobItemBytesRequest>& requests, |
std::vector<base::SharedMemoryHandle>* memory_handles, |
const std::vector<IPC::PlatformFileForTransit>& file_handles, |
- IPC::Sender* sender) { |
+ IPC::Sender* sender, |
+ scoped_refptr<base::TaskRunner> io_runner, |
+ base::TaskRunner* file_runner, |
+ const ResponseCallback& async_response_callback, |
+ const CancelCallback& async_cancel_callback) { |
std::vector<storage::BlobItemBytesResponse> responses; |
- ResponsesStatus status = |
- GetResponses(uuid, requests, memory_handles, file_handles, &responses); |
+ ResponsesStatus status = GetResponses( |
+ uuid, requests, memory_handles, file_handles, &responses, |
+ std::move(io_runner), file_runner, async_response_callback, |
+ base::Bind(&BlobTransportController::CancelAsyncBlobTransfer, |
+ base::Unretained(this), async_cancel_callback, uuid)); |
switch (status) { |
case ResponsesStatus::BLOB_NOT_FOUND: |
@@ -114,10 +198,12 @@ void BlobTransportController::OnMemoryRequest( |
<< "."; |
break; |
case ResponsesStatus::SUCCESS: |
+ case ResponsesStatus::PENDING_IO: |
break; |
} |
- sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
+ if (!responses.empty()) |
+ sender->Send(new BlobStorageMsg_MemoryItemResponse(uuid, responses)); |
} |
void BlobTransportController::OnCancel( |
@@ -198,9 +284,17 @@ void BlobTransportController::ClearForTesting() { |
blob_storage_.clear(); |
} |
+void BlobTransportController::CancelAsyncBlobTransfer( |
+ const CancelCallback& async_cancel_callback, |
+ const std::string& uuid, |
+ storage::IPCBlobCreationCancelCode code) { |
+ blob_storage_.erase(uuid); |
+ async_cancel_callback.Run(code); |
+} |
+ |
void BlobTransportController::StoreBlobDataForRequests( |
const std::string& uuid, |
- std::unique_ptr<BlobConsolidation> consolidation, |
+ scoped_refptr<BlobConsolidation> consolidation, |
scoped_refptr<base::SingleThreadTaskRunner> main_runner) { |
if (!main_thread_runner_.get()) { |
main_thread_runner_ = std::move(main_runner); |
@@ -213,7 +307,11 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
const std::vector<BlobItemBytesRequest>& requests, |
std::vector<SharedMemoryHandle>* memory_handles, |
const std::vector<IPC::PlatformFileForTransit>& file_handles, |
- std::vector<BlobItemBytesResponse>* out) { |
+ std::vector<BlobItemBytesResponse>* out, |
+ scoped_refptr<base::TaskRunner> io_runner, |
+ base::TaskRunner* file_runner, |
+ const ResponseCallback& response_callback, |
+ const CancelCallback& error_callback) { |
DCHECK(out->empty()); |
auto it = blob_storage_.find(uuid); |
if (it == blob_storage_.end()) |
@@ -222,6 +320,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
BlobConsolidation* consolidation = it->second.get(); |
const auto& consolidated_items = consolidation->consolidated_items(); |
+ scoped_ptr<std::vector<BlobItemBytesRequest>> file_requests( |
+ new std::vector<BlobItemBytesRequest>()); |
+ |
// Since we can be writing to the same shared memory handle from multiple |
// requests, we keep them in a vector and lazily create them. |
ScopedVector<SharedMemory> opened_memory; |
@@ -236,9 +337,9 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
<< "Invalid data range"; |
DCHECK_EQ(item.type, DataElement::TYPE_BYTES) << "Invalid element type"; |
- out->push_back(BlobItemBytesResponse(request.request_number)); |
switch (request.transport_strategy) { |
case IPCBlobItemRequestStrategy::IPC: { |
+ out->push_back(BlobItemBytesResponse(request.request_number)); |
BlobItemBytesResponse& response = out->back(); |
ReadStatus status = consolidation->ReadMemory( |
request.renderer_item_index, request.renderer_item_offset, |
@@ -249,6 +350,7 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
break; |
} |
case IPCBlobItemRequestStrategy::SHARED_MEMORY: { |
+ out->push_back(BlobItemBytesResponse(request.request_number)); |
DCHECK_LT(request.handle_index, memory_handles->size()) |
<< "Invalid handle index."; |
SharedMemory* memory = opened_memory[request.handle_index]; |
@@ -273,14 +375,24 @@ BlobTransportController::ResponsesStatus BlobTransportController::GetResponses( |
break; |
} |
case IPCBlobItemRequestStrategy::FILE: |
- NOTREACHED() << "TODO(dmurph): Not implemented."; |
+ DCHECK_LT(request.handle_index, file_handles.size()) |
+ << "Invalid handle index."; |
+ file_requests->push_back(request); |
break; |
case IPCBlobItemRequestStrategy::UNKNOWN: |
NOTREACHED(); |
break; |
} |
} |
- return ResponsesStatus::SUCCESS; |
+ if (file_requests->empty()) { |
+ return ResponsesStatus::SUCCESS; |
+ } |
+ file_runner->PostTask( |
+ FROM_HERE, base::Bind(&WriteDiskRequests, consolidation, |
+ base::Owned(file_requests.release()), file_handles, |
kinuko
2016/04/15 15:02:32
I think base::Passed(&file_requests) would be more
dmurph
2016/04/20 21:15:34
done.
|
+ response_callback, error_callback, |
+ base::Passed(std::move(io_runner)))); |
+ return ResponsesStatus::PENDING_IO; |
} |
void BlobTransportController::ReleaseBlobConsolidation( |