Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: storage/browser/blob/blob_registry_impl.cc

Issue 2892953006: WIP POC blob transport over mojo
Patch Set: pass mojo blobs over ipc Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « storage/browser/blob/blob_registry_impl.h ('k') | third_party/WebKit/Source/platform/BUILD.gn » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "storage/browser/blob/blob_registry_impl.h"
6
7 #include "storage/browser/blob/blob_data_builder.h"
8 #include "storage/browser/blob/blob_impl.h"
9 #include "storage/browser/blob/blob_storage_context.h"
10
11 namespace storage {
12
13 namespace {
14
15 using MemoryStrategy = BlobMemoryController::Strategy;
16
17 bool CalculateBlobMemorySize(const std::vector<mojom::DataElementPtr>& elements,
18 size_t* shortcut_bytes,
19 uint64_t* total_bytes) {
20 DCHECK(shortcut_bytes);
21 DCHECK(total_bytes);
22
23 base::CheckedNumeric<uint64_t> total_size_checked = 0;
24 base::CheckedNumeric<size_t> shortcut_size_checked = 0;
25 for (const auto& e : elements) {
26 if (e->is_bytes()) {
27 total_size_checked += e->get_bytes().size();
28 shortcut_size_checked += e->get_bytes().size();
29 } else if (e->is_large_bytes()) {
30 total_size_checked += e->get_large_bytes()->length;
31 } else {
32 continue;
33 }
34 if (!total_size_checked.IsValid() || !shortcut_size_checked.IsValid())
35 return false;
36 }
37 *shortcut_bytes = shortcut_size_checked.ValueOrDie();
38 *total_bytes = total_size_checked.ValueOrDie();
39 return true;
40 }
41
42 class ByteStreamReceiver {
43 public:
44 ByteStreamReceiver(mojo::ScopedDataPipeConsumerHandle pipe,
45 BlobDataBuilder* builder,
46 size_t item_index,
47 size_t expected_length,
48 base::OnceCallback<void(bool success)> done_callback)
49 : builder_(builder),
50 item_index_(item_index),
51 expected_length_(expected_length),
52 pipe_(std::move(pipe)),
53 watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
54 done_callback_(std::move(done_callback)) {
55 watcher_.Watch(
56 pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
57 base::Bind(&ByteStreamReceiver::OnReadable, base::Unretained(this)));
58 }
59
60 ~ByteStreamReceiver() {
61 std::move(done_callback_).Run(offset_ == expected_length_);
62 LOG(INFO) << "Read " << offset_ << " out of " << expected_length_
63 << " bytes";
64 }
65
66 void OnReadable(MojoResult result) {
67 if (result == MOJO_RESULT_CANCELLED ||
68 result == MOJO_RESULT_FAILED_PRECONDITION) {
69 LOG(INFO) << "Done reading, closing connection: " << result << " for "
70 << builder_->uuid();
71 delete this;
72 return;
73 }
74 DCHECK_EQ(result, MOJO_RESULT_OK);
75
76 // TODO(mek): Use two-phase reads to directly read into BlobDataBuilder
77 while (true) {
78 uint32_t num_bytes = 0;
79 MojoResult query_result = mojo::ReadDataRaw(
80 pipe_.get(), nullptr, &num_bytes, MOJO_READ_DATA_FLAG_QUERY);
81 if (query_result == MOJO_RESULT_SHOULD_WAIT)
82 break;
83 DCHECK_EQ(query_result, MOJO_RESULT_OK);
84
85 LOG(INFO) << "Have " << num_bytes << " to read";
86 std::vector<char> data(num_bytes);
87 query_result = mojo::ReadDataRaw(pipe_.get(), data.data(), &num_bytes,
88 MOJO_READ_DATA_FLAG_ALL_OR_NONE);
89 if (query_result == MOJO_RESULT_SHOULD_WAIT)
90 break;
91 DCHECK_EQ(query_result, MOJO_RESULT_OK);
92 bool result = builder_->PopulateFutureData(item_index_, data.data(),
93 offset_, num_bytes);
94 offset_ += num_bytes;
95 DCHECK(result);
96 }
97 }
98
99 private:
100 BlobDataBuilder* builder_;
101 size_t item_index_;
102 size_t expected_length_;
103 mojo::ScopedDataPipeConsumerHandle pipe_;
104 mojo::SimpleWatcher watcher_;
105 size_t offset_ = 0;
106 base::OnceCallback<void(bool success)> done_callback_;
107 };
108
109 } // namespace
110
111 BlobRegistryImpl::BlobRegistryImpl(BlobStorageContext* context)
112 : context_(context), weak_ptr_factory_(this) {}
113
114 BlobRegistryImpl::~BlobRegistryImpl() {
115 LOG(INFO) << "Destroying blob registry";
116 }
117
118 void BlobRegistryImpl::Bind(const service_manager::BindSourceInfo& source_info,
119 storage::mojom::BlobRegistryRequest request) {
120 bindings_.AddBinding(this, std::move(request));
121 }
122
123 void BlobRegistryImpl::Register(mojom::BlobRequest blob,
124 const std::string& uuid,
125 const std::string& content_type,
126 const std::string& content_disposition,
127 std::vector<mojom::DataElementPtr> elements,
128 RegisterCallback callback) {
129 RegisterWithBlobUUIDs(std::move(blob), uuid, content_type,
130 content_disposition, std::move(elements),
131 std::vector<std::string>(), std::move(callback), "");
132 }
133
134 void BlobRegistryImpl::RegisterWithBlobUUIDs(
135 mojom::BlobRequest blob,
136 const std::string& uuid,
137 const std::string& content_type,
138 const std::string& content_disposition,
139 std::vector<mojom::DataElementPtr> elements,
140 std::vector<std::string> blob_uuids,
141 RegisterCallback callback,
142 const std::string& next_blob) {
143 if (!next_blob.empty())
144 blob_uuids.push_back(next_blob);
145 while (blob_uuids.size() < elements.size() &&
146 !elements[blob_uuids.size()]->is_blob())
147 blob_uuids.push_back("");
148 if (blob_uuids.size() < elements.size()) {
149 // next element is a blob, request its UUID
150 elements[blob_uuids.size()]->get_blob()->blob->InternalGetUUID(
151 base::BindOnce(&BlobRegistryImpl::RegisterWithBlobUUIDs,
152 weak_ptr_factory_.GetWeakPtr(), std::move(blob), uuid,
153 content_type, content_disposition, std::move(elements),
154 std::move(blob_uuids), std::move(callback)));
155 return;
156 }
157
158 LOG(INFO) << "Register " << uuid << " contains " << elements.size()
159 << " elements";
160 if (uuid.empty() || context_->registry().HasEntry(uuid)) {
161 LOG(INFO) << "Invalid UUID";
162 std::move(callback).Run();
163 return;
164 }
165
166 // TODO(mek): can read files/filesystem checks
167
168 // TODO(mek): Validate that our referenced blobs aren't us
169
170 uint64_t transport_memory_size = 0;
171 size_t shortcut_size = 0;
172 if (!CalculateBlobMemorySize(elements, &shortcut_size,
173 &transport_memory_size)) {
174 LOG(INFO) << "Can't calculate size";
175 std::unique_ptr<BlobDataHandle> handle =
176 context_->AddBrokenBlob(uuid, content_type, content_disposition,
177 BlobStatus::ERR_INVALID_CONSTRUCTION_ARGUMENTS);
178 new BlobImpl(std::move(handle), std::move(blob));
179 std::move(callback).Run();
180 return;
181 }
182
183 const BlobMemoryController& memory_controller = context_->memory_controller();
184 MemoryStrategy memory_strategy =
185 memory_controller.DetermineStrategy(shortcut_size, transport_memory_size);
186
187 auto builderPtr = base::MakeUnique<BlobDataBuilder>(uuid);
188 BlobDataBuilder& builder = *builderPtr;
189 builder.set_content_type(content_type);
190 builder.set_content_disposition(content_disposition);
191 int idx = 0;
192 std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data;
193 std::list<size_t> file_sizes;
194 for (const auto& e : elements) {
195 if (e->is_bytes()) {
196 LOG(INFO) << " Bytes: " << e->get_bytes().size();
197 builder.AppendData(reinterpret_cast<const char*>(e->get_bytes().data()),
198 e->get_bytes().size());
199 } else if (e->is_large_bytes()) {
200 LOG(INFO) << " Future Bytes: " << e->get_large_bytes()->length;
201 if (memory_strategy == MemoryStrategy::FILE) {
202 future_data.push_back(std::make_pair(
203 builder.AppendFutureFile(0, e->get_large_bytes()->length,
204 future_data.size()),
205 std::move(e->get_large_bytes()->data)));
206 } else {
207 future_data.push_back(std::make_pair(
208 builder.AppendFutureData(e->get_large_bytes()->length),
209 std::move(e->get_large_bytes()->data)));
210 }
211 file_sizes.push_back(e->get_large_bytes()->length);
212 } else if (e->is_file()) {
213 LOG(INFO) << " File: " << e->get_file()->length << " "
214 << e->get_file()->path;
215 builder.AppendFile(base::FilePath::FromUTF8Unsafe(e->get_file()->path),
216 e->get_file()->offset, e->get_file()->length,
217 e->get_file()->expected_modification_time);
218 } else if (e->is_file_filesystem()) {
219 LOG(INFO) << " Filesystem: " << e->get_file_filesystem()->length;
220 builder.AppendFileSystemFile(
221 e->get_file_filesystem()->url, e->get_file_filesystem()->offset,
222 e->get_file_filesystem()->length,
223 e->get_file_filesystem()->expected_modification_time);
224 } else if (e->is_blob()) {
225 LOG(INFO) << " Blob: " << e->get_blob()->length;
226 std::string ref_uuid = blob_uuids[idx];
227 builder.AppendBlob(ref_uuid, e->get_blob()->offset,
228 e->get_blob()->length);
229 } else {
230 NOTREACHED();
231 }
232 idx++;
233 }
234
235 switch (memory_strategy) {
236 case MemoryStrategy::TOO_LARGE: {
237 LOG(INFO) << "Blob too large";
238 std::unique_ptr<BlobDataHandle> handle =
239 context_->AddBrokenBlob(uuid, content_type, content_disposition,
240 BlobStatus::ERR_OUT_OF_MEMORY);
241 new BlobImpl(std::move(handle), std::move(blob));
242 std::move(callback).Run();
243 return;
244 }
245 case MemoryStrategy::NONE_NEEDED: {
246 DCHECK(future_data.empty());
247 std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
248 builder, BlobStorageContext::TransportAllowedCallback());
249 new BlobImpl(std::move(handle), std::move(blob));
250 std::move(callback).Run();
251 return;
252 }
253 case MemoryStrategy::IPC:
254 // No separate IPC strategy in mojo for now.
255 case MemoryStrategy::SHARED_MEMORY: {
256 // DCHECK(!future_data.empty());
257 LOG(INFO) << " Future as stream";
258 std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
259 builder, base::Bind(&BlobRegistryImpl::OnReadyForDataStreams,
260 weak_ptr_factory_.GetWeakPtr(), uuid,
261 base::Passed(std::move(builderPtr)),
262 base::Passed(std::move(future_data)),
263 base::Passed(std::move(file_sizes))));
264 new BlobImpl(std::move(handle), std::move(blob));
265 std::move(callback).Run();
266 return;
267 }
268 case MemoryStrategy::FILE: {
269 // DCHECK(!future_data.empty());
270 LOG(INFO) << " Future as files";
271 std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
272 builder, base::Bind(&BlobRegistryImpl::OnReadyForFileTransport,
273 weak_ptr_factory_.GetWeakPtr(), uuid,
274 base::Passed(std::move(builderPtr)),
275 base::Passed(std::move(future_data)),
276 base::Passed(std::move(file_sizes))));
277 new BlobImpl(std::move(handle), std::move(blob));
278 std::move(callback).Run();
279 return;
280 }
281 }
282 NOTREACHED();
283 }
284
285 void BlobRegistryImpl::DeprecatedGetBlob(const std::string& uuid,
286 mojom::BlobRequest blob) {
287 LOG(INFO) << "Get " << uuid;
288 new BlobImpl(context_->GetBlobDataFromUUID(uuid), std::move(blob));
289 }
290
291 void BlobRegistryImpl::OnReadyForDataStreams(
292 const std::string& uuid,
293 std::unique_ptr<BlobDataBuilder> builder,
294 std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data,
295 std::list<size_t> file_sizes,
296 BlobStatus status,
297 std::vector<BlobMemoryController::FileCreationInfo> file_info) {
298 if (future_data.empty()) {
299 OnDataStreamDone(uuid);
300 return;
301 }
302
303 LOG(INFO) << "Ready for datastreams: " << int(status) << " for " << uuid;
304 size_t index = future_data.begin()->first;
305 size_t size = *file_sizes.begin();
306 mojom::BytesProviderPtr ptr = std::move(future_data.begin()->second);
307 future_data.pop_front();
308 file_sizes.pop_front();
309 mojo::DataPipe pipe;
310 ptr->RequestAsStream(std::move(pipe.producer_handle));
311 BlobDataBuilder* b = builder.get();
312 new ByteStreamReceiver(
313 std::move(pipe.consumer_handle), b, index, size,
314 base::BindOnce(
315 &BlobRegistryImpl::OnReadDatastream, weak_ptr_factory_.GetWeakPtr(),
316 uuid,
317 base::BindOnce(&BlobRegistryImpl::OnReadyForDataStreams,
318 weak_ptr_factory_.GetWeakPtr(), uuid,
319 std::move(builder), std::move(future_data),
320 std::move(file_sizes), status, std::move(file_info))));
321 }
322
323 void BlobRegistryImpl::OnReadDatastream(const std::string& uuid,
324 base::OnceClosure next_read_callback,
325 bool success) {
326 if (!success) {
327 context_->CancelBuildingBlob(uuid, BlobStatus::ERR_SOURCE_DIED_IN_TRANSIT);
328 return;
329 }
330 std::move(next_read_callback).Run();
331 }
332
333 void BlobRegistryImpl::OnReadyForFileTransport(
334 const std::string& uuid,
335 std::unique_ptr<BlobDataBuilder> builder,
336 std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data,
337 std::list<size_t> file_sizes,
338 BlobStatus status,
339 std::vector<BlobMemoryController::FileCreationInfo> file_info) {
340 LOG(INFO) << "Ready for files for " << uuid << " (" << int(status) << ")";
341 LOG(INFO) << " Files: " << file_info.size()
342 << ", data blocks: " << future_data.size();
343 BlobDataBuilder* b = builder.release();
344 auto size_it = file_sizes.begin();
345 auto info_it = file_info.begin();
346 for (auto& p : future_data) {
347 LOG(INFO) << " Requesting data as file";
348 mojom::BytesProviderPtr ptr = std::move(p.second);
349 ptr->RequestAsFile(
350 0, *size_it, std::move(info_it->file), 0,
351 base::BindOnce(&BlobRegistryImpl::OnFileDone,
352 weak_ptr_factory_.GetWeakPtr(), uuid, b, p.first,
353 std::move(ptr), info_it->file_reference));
354 ++size_it;
355 ++info_it;
356 }
357 }
358
359 void BlobRegistryImpl::OnDataStreamDone(const std::string& uuid) {
360 if (context_->registry().HasEntry(uuid))
361 context_->NotifyTransportComplete(uuid);
362 }
363
364 void BlobRegistryImpl::OnFileDone(
365 const std::string& uuid,
366 BlobDataBuilder* builder,
367 size_t item_index,
368 mojom::BytesProviderPtr ptr,
369 const scoped_refptr<ShareableFileReference>& file_reference,
370 base::Optional<base::Time> time_file_modified) {
371 LOG(INFO) << "File done for " << uuid << " " << *time_file_modified;
372 builder->PopulateFutureFile(item_index, file_reference, *time_file_modified);
373 if (context_->registry().HasEntry(uuid))
374 context_->NotifyTransportComplete(uuid);
375 }
376
377 } // namespace storage
OLDNEW
« no previous file with comments | « storage/browser/blob/blob_registry_impl.h ('k') | third_party/WebKit/Source/platform/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698