Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Side by Side Diff: chrome/browser/chromeos/drive/sync_client.cc

Issue 391343002: Keep sync tasks alive as long as it's not finished (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Check in SyncClient Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/chromeos/drive/sync_client.h" 5 #include "chrome/browser/chromeos/drive/sync_client.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h" 10 #include "base/message_loop/message_loop_proxy.h"
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 if (error != FILE_ERROR_OK) { 114 if (error != FILE_ERROR_OK) {
115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; 115 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116 continue; 116 continue;
117 } 117 }
118 118
119 local_ids->push_back(local_id); 119 local_ids->push_back(local_id);
120 } 120 }
121 DCHECK(!it->HasError()); 121 DCHECK(!it->HasError());
122 } 122 }
123 123
124 // Runs the task and returns a dummy cancel closure. 124 // Gets the parent entry of the entry specified by the ID.
125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) { 125 FileError GetParentResourceEntry(ResourceMetadata* metadata,
126 task.Run(); 126 const std::string& local_id,
127 return base::Closure(); 127 ResourceEntry* parent) {
128 ResourceEntry entry;
129 FileError error = metadata->GetResourceEntryById(local_id, &entry);
130 if (error != FILE_ERROR_OK)
131 return error;
132 return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
128 } 133 }
129 134
130 } // namespace 135 } // namespace
131 136
132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} 137 SyncClient::SyncTask::SyncTask()
138 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
133 SyncClient::SyncTask::~SyncTask() {} 139 SyncClient::SyncTask::~SyncTask() {}
134 140
135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, 141 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
136 file_system::OperationObserver* observer, 142 file_system::OperationObserver* observer,
137 JobScheduler* scheduler, 143 JobScheduler* scheduler,
138 ResourceMetadata* metadata, 144 ResourceMetadata* metadata,
139 FileCache* cache, 145 FileCache* cache,
140 LoaderController* loader_controller, 146 LoaderController* loader_controller,
141 const base::FilePath& temporary_file_directory) 147 const base::FilePath& temporary_file_directory)
142 : blocking_task_runner_(blocking_task_runner), 148 : blocking_task_runner_(blocking_task_runner),
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 208
203 void SyncClient::RemoveFetchTask(const std::string& local_id) { 209 void SyncClient::RemoveFetchTask(const std::string& local_id) {
204 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 210 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
205 211
206 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); 212 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
207 if (it == tasks_.end()) 213 if (it == tasks_.end())
208 return; 214 return;
209 215
210 SyncTask* task = &it->second; 216 SyncTask* task = &it->second;
211 switch (task->state) { 217 switch (task->state) {
218 case SUSPENDED:
212 case PENDING: 219 case PENDING:
213 tasks_.erase(it); 220 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
214 break; 221 break;
215 case RUNNING: 222 case RUNNING:
216 if (!task->cancel_closure.is_null()) 223 if (!task->cancel_closure.is_null())
217 task->cancel_closure.Run(); 224 task->cancel_closure.Run();
218 break; 225 break;
219 } 226 }
220 } 227 }
221 228
222 void SyncClient::AddUpdateTask(const ClientContext& context, 229 void SyncClient::AddUpdateTask(const ClientContext& context,
223 const std::string& local_id) { 230 const std::string& local_id) {
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 231 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
225 AddUpdateTaskInternal(context, local_id, delay_); 232 AddUpdateTaskInternal(context, local_id, delay_);
226 } 233 }
227 234
235 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
236 const ClientContext& context) {
237 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
238 return download_operation_->EnsureFileDownloadedByLocalId(
239 local_id,
240 context,
241 GetFileContentInitializedCallback(),
242 google_apis::GetContentCallback(),
243 base::Bind(&SyncClient::OnFetchFileComplete,
244 weak_ptr_factory_.GetWeakPtr(),
245 local_id));
246 }
247
228 void SyncClient::AddFetchTaskInternal(const std::string& local_id, 248 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
229 const base::TimeDelta& delay) { 249 const base::TimeDelta& delay) {
230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 250 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231 251
232 SyncTask task; 252 SyncTask task;
233 task.task = base::Bind( 253 task.state = PENDING;
234 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, 254 task.context = ClientContext(BACKGROUND);
235 base::Unretained(download_operation_.get()), 255 task.task = base::Bind(&SyncClient::PerformFetchTask,
256 base::Unretained(this),
257 local_id);
258 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
259 }
260
261 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
262 const ClientContext& context) {
263 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
264 entry_update_performer_->UpdateEntry(
236 local_id, 265 local_id,
237 ClientContext(BACKGROUND), 266 context,
238 GetFileContentInitializedCallback(), 267 base::Bind(&SyncClient::OnTaskComplete,
239 google_apis::GetContentCallback(),
240 base::Bind(&SyncClient::OnFetchFileComplete,
241 weak_ptr_factory_.GetWeakPtr(), 268 weak_ptr_factory_.GetWeakPtr(),
269 UPDATE,
242 local_id)); 270 local_id));
243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); 271 return base::Closure();
244 } 272 }
245 273
246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, 274 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
247 const std::string& local_id, 275 const std::string& local_id,
248 const base::TimeDelta& delay) { 276 const base::TimeDelta& delay) {
277 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
278
249 SyncTask task; 279 SyncTask task;
250 task.task = base::Bind( 280 task.state = PENDING;
251 &RunTaskAndReturnDummyCancelClosure, 281 task.context = context;
252 base::Bind(&EntryUpdatePerformer::UpdateEntry, 282 task.task = base::Bind(&SyncClient::PerformUpdateTask,
253 base::Unretained(entry_update_performer_.get()), 283 base::Unretained(this),
254 local_id, 284 local_id);
255 context,
256 base::Bind(&SyncClient::OnUpdateComplete,
257 weak_ptr_factory_.GetWeakPtr(),
258 local_id)));
259 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); 285 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
260 } 286 }
261 287
262 void SyncClient::AddTask(const SyncTasks::key_type& key, 288 void SyncClient::AddTask(const SyncTasks::key_type& key,
263 const SyncTask& task, 289 const SyncTask& task,
264 const base::TimeDelta& delay) { 290 const base::TimeDelta& delay) {
265 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 291 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
266 292
267 SyncTasks::iterator it = tasks_.find(key); 293 SyncTasks::iterator it = tasks_.find(key);
268 if (it != tasks_.end()) { 294 if (it != tasks_.end()) {
269 switch (it->second.state) { 295 switch (it->second.state) {
296 case SUSPENDED:
297 // Activate the task.
298 it->second.state = PENDING;
299 break;
270 case PENDING: 300 case PENDING:
271 // The same task will run, do nothing. 301 // The same task will run, do nothing.
272 break; 302 return;
273 case RUNNING: 303 case RUNNING:
274 // Something has changed since the task started. Schedule rerun. 304 // Something has changed since the task started. Schedule rerun.
275 it->second.should_run_again = true; 305 it->second.should_run_again = true;
276 break; 306 return;
277 } 307 }
278 return; 308 } else {
309 tasks_[key] = task;
279 } 310 }
280
281 DCHECK_EQ(PENDING, task.state); 311 DCHECK_EQ(PENDING, task.state);
282 tasks_[key] = task;
283
284 base::MessageLoopProxy::current()->PostDelayedTask( 312 base::MessageLoopProxy::current()->PostDelayedTask(
285 FROM_HERE, 313 FROM_HERE,
286 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 314 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
287 delay); 315 delay);
288 } 316 }
289 317
290 void SyncClient::StartTask(const SyncTasks::key_type& key) { 318 void SyncClient::StartTask(const SyncTasks::key_type& key) {
319 ResourceEntry* parent = new ResourceEntry;
320 base::PostTaskAndReplyWithResult(
321 blocking_task_runner_.get(),
322 FROM_HERE,
323 base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
324 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
325 weak_ptr_factory_.GetWeakPtr(),
326 key,
327 base::Owned(parent)));
328 }
329
330 void SyncClient::StartTaskAfterGetParentResourceEntry(
331 const SyncTasks::key_type& key,
332 const ResourceEntry* parent,
333 FileError error) {
334 const SyncType type = key.first;
335 const std::string& local_id = key.second;
291 SyncTasks::iterator it = tasks_.find(key); 336 SyncTasks::iterator it = tasks_.find(key);
292 if (it == tasks_.end()) 337 if (it == tasks_.end())
293 return; 338 return;
294 339
295 SyncTask* task = &it->second; 340 SyncTask* task = &it->second;
296 switch (task->state) { 341 switch (task->state) {
342 case SUSPENDED:
297 case PENDING: 343 case PENDING:
298 task->state = RUNNING;
299 task->cancel_closure = task->task.Run();
300 break; 344 break;
301 case RUNNING: // Do nothing. 345 case RUNNING: // Do nothing.
302 break; 346 return;
303 } 347 }
348
349 if (error != FILE_ERROR_OK) {
350 OnTaskComplete(type, local_id, error);
351 return;
352 }
353
354 if (type == UPDATE &&
355 parent->resource_id().empty() &&
356 parent->local_id() != util::kDriveTrashDirLocalId) {
357 // Parent entry needs to be synced to get a resource ID.
358 // Suspend the task and register it as a dependent task of the parent.
359 const SyncTasks::key_type key_parent(type, parent->local_id());
360 SyncTasks::iterator it_parent = tasks_.find(key_parent);
361 if (it_parent == tasks_.end()) {
362 OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
363 LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
364 << local_id << ", parent_id = " << parent->local_id();
365 return;
366 }
367 task->state = SUSPENDED;
368 it_parent->second.dependent_tasks.push_back(key);
369 return;
370 }
371
372 // Run the task.
373 task->state = RUNNING;
374 task->cancel_closure = task->task.Run(task->context);
304 } 375 }
305 376
306 void SyncClient::OnGetLocalIdsOfBacklog( 377 void SyncClient::OnGetLocalIdsOfBacklog(
307 const std::vector<std::string>* to_fetch, 378 const std::vector<std::string>* to_fetch,
308 const std::vector<std::string>* to_update) { 379 const std::vector<std::string>* to_update) {
309 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 380 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
310 381
311 // Give priority to upload tasks over fetch tasks, so that dirty files are 382 // Give priority to upload tasks over fetch tasks, so that dirty files are
312 // uploaded as soon as possible. 383 // uploaded as soon as possible.
313 for (size_t i = 0; i < to_update->size(); ++i) { 384 for (size_t i = 0; i < to_update->size(); ++i) {
314 const std::string& local_id = (*to_update)[i]; 385 const std::string& local_id = (*to_update)[i];
315 DVLOG(1) << "Queuing to update: " << local_id; 386 DVLOG(1) << "Queuing to update: " << local_id;
316 AddUpdateTask(ClientContext(BACKGROUND), local_id); 387 AddUpdateTask(ClientContext(BACKGROUND), local_id);
317 } 388 }
318 389
319 for (size_t i = 0; i < to_fetch->size(); ++i) { 390 for (size_t i = 0; i < to_fetch->size(); ++i) {
320 const std::string& local_id = (*to_fetch)[i]; 391 const std::string& local_id = (*to_fetch)[i];
321 DVLOG(1) << "Queuing to fetch: " << local_id; 392 DVLOG(1) << "Queuing to fetch: " << local_id;
322 AddFetchTaskInternal(local_id, delay_); 393 AddFetchTaskInternal(local_id, delay_);
323 } 394 }
324 } 395 }
325 396
326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { 397 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 398 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 399
329 for (size_t i = 0; i < local_ids->size(); ++i) 400 for (size_t i = 0; i < local_ids->size(); ++i)
330 AddFetchTask((*local_ids)[i]); 401 AddFetchTask((*local_ids)[i]);
331 } 402 }
332 403
333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { 404 void SyncClient::OnTaskComplete(SyncType type,
405 const std::string& local_id,
406 FileError error) {
334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 407 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
335 408
336 const SyncTasks::key_type key(type, local_id); 409 const SyncTasks::key_type key(type, local_id);
337 SyncTasks::iterator it = tasks_.find(key); 410 SyncTasks::iterator it = tasks_.find(key);
338 DCHECK(it != tasks_.end()); 411 DCHECK(it != tasks_.end());
339 412
413 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
414
415 switch (error) {
416 case FILE_ERROR_OK:
417 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
418 break;
419 case FILE_ERROR_ABORT:
420 // Ignore it because this is caused by user's cancel operations.
421 break;
422 case FILE_ERROR_NO_CONNECTION:
423 // Run the task again so that we'll retry once the connection is back.
424 it->second.should_run_again = true;
425 it->second.context = ClientContext(BACKGROUND);
426 break;
427 case FILE_ERROR_SERVICE_UNAVAILABLE:
428 // Run the task again so that we'll retry once the service is back.
429 it->second.should_run_again = true;
430 it->second.context = ClientContext(BACKGROUND);
431 retry_delay = long_delay_;
432 operation_observer_->OnDriveSyncError(
433 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
434 break;
435 default:
436 operation_observer_->OnDriveSyncError(
437 file_system::DRIVE_SYNC_ERROR_MISC, local_id);
438 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
439 << ": " << FileErrorToString(error);
440 }
441
340 if (it->second.should_run_again) { 442 if (it->second.should_run_again) {
341 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; 443 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
444 it->second.state = PENDING;
342 it->second.should_run_again = false; 445 it->second.should_run_again = false;
343 it->second.task.Run(); 446 base::MessageLoopProxy::current()->PostDelayedTask(
344 return false; 447 FROM_HERE,
448 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
449 retry_delay);
450 } else {
451 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
452 StartTask(it->second.dependent_tasks[i]);
453 tasks_.erase(it);
345 } 454 }
346
347 tasks_.erase(it);
348 return true;
349 } 455 }
350 456
351 void SyncClient::OnFetchFileComplete(const std::string& local_id, 457 void SyncClient::OnFetchFileComplete(const std::string& local_id,
352 FileError error, 458 FileError error,
353 const base::FilePath& local_path, 459 const base::FilePath& local_path,
354 scoped_ptr<ResourceEntry> entry) { 460 scoped_ptr<ResourceEntry> entry) {
355 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 461 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
356 462 OnTaskComplete(FETCH, local_id, error);
357 if (!OnTaskComplete(FETCH, local_id)) 463 if (error == FILE_ERROR_ABORT) {
358 return; 464 // If user cancels download, unpin the file so that we do not sync the file
359 465 // again.
360 if (error == FILE_ERROR_OK) { 466 base::PostTaskAndReplyWithResult(
361 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); 467 blocking_task_runner_,
362 } else { 468 FROM_HERE,
363 switch (error) { 469 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
364 case FILE_ERROR_ABORT: 470 base::Bind(&util::EmptyFileOperationCallback));
365 // If user cancels download, unpin the file so that we do not sync the
366 // file again.
367 base::PostTaskAndReplyWithResult(
368 blocking_task_runner_,
369 FROM_HERE,
370 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
371 base::Bind(&util::EmptyFileOperationCallback));
372 break;
373 case FILE_ERROR_NO_CONNECTION:
374 // Add the task again so that we'll retry once the connection is back.
375 AddFetchTaskInternal(local_id, delay_);
376 break;
377 case FILE_ERROR_SERVICE_UNAVAILABLE:
378 // Add the task again so that we'll retry once the service is back.
379 AddFetchTaskInternal(local_id, long_delay_);
380 operation_observer_->OnDriveSyncError(
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
382 local_id);
383 break;
384 default:
385 operation_observer_->OnDriveSyncError(
386 file_system::DRIVE_SYNC_ERROR_MISC,
387 local_id);
388 LOG(WARNING) << "Failed to fetch " << local_id
389 << ": " << FileErrorToString(error);
390 }
391 } 471 }
392 } 472 }
393 473
394 void SyncClient::OnUpdateComplete(const std::string& local_id,
395 FileError error) {
396 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
397
398 if (!OnTaskComplete(UPDATE, local_id))
399 return;
400
401 if (error == FILE_ERROR_OK) {
402 DVLOG(1) << "Updated " << local_id;
403
404 // Add update tasks for child entries which may be waiting for the parent to
405 // be updated.
406 ResourceEntryVector* entries = new ResourceEntryVector;
407 base::PostTaskAndReplyWithResult(
408 blocking_task_runner_.get(),
409 FROM_HERE,
410 base::Bind(&ResourceMetadata::ReadDirectoryById,
411 base::Unretained(metadata_), local_id, entries),
412 base::Bind(&SyncClient::AddChildUpdateTasks,
413 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
414 } else {
415 switch (error) {
416 case FILE_ERROR_ABORT:
417 // Ignore it because this is caused by user's cancel operations.
418 break;
419 case FILE_ERROR_NO_CONNECTION:
420 // Add the task again so that we'll retry once the connection is back.
421 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
422 base::TimeDelta::FromSeconds(0));
423 break;
424 case FILE_ERROR_SERVICE_UNAVAILABLE:
425 // Add the task again so that we'll retry once the service is back.
426 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
427 operation_observer_->OnDriveSyncError(
428 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
429 local_id);
430 break;
431 default:
432 operation_observer_->OnDriveSyncError(
433 file_system::DRIVE_SYNC_ERROR_MISC,
434 local_id);
435 LOG(WARNING) << "Failed to update " << local_id << ": "
436 << FileErrorToString(error);
437 }
438 }
439 }
440
441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
442 FileError error) {
443 if (error != FILE_ERROR_OK)
444 return;
445
446 for (size_t i = 0; i < entries->size(); ++i) {
447 const ResourceEntry& entry = (*entries)[i];
448 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
449 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
450 base::TimeDelta::FromSeconds(0));
451 }
452 }
453 }
454
455 } // namespace internal 474 } // namespace internal
456 } // namespace drive 475 } // namespace drive
OLDNEW
« no previous file with comments | « chrome/browser/chromeos/drive/sync_client.h ('k') | chrome/browser/chromeos/drive/sync_client_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698