Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(546)

Side by Side Diff: chrome/browser/chromeos/drive/sync_client.cc

Issue 391343002: Keep sync tasks alive as long as it's not finished (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/chromeos/drive/sync_client.h" 5 #include "chrome/browser/chromeos/drive/sync_client.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h" 10 #include "base/message_loop/message_loop_proxy.h"
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 if (error != FILE_ERROR_OK) { 114 if (error != FILE_ERROR_OK) {
115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; 115 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116 continue; 116 continue;
117 } 117 }
118 118
119 local_ids->push_back(local_id); 119 local_ids->push_back(local_id);
120 } 120 }
121 DCHECK(!it->HasError()); 121 DCHECK(!it->HasError());
122 } 122 }
123 123
124 // Runs the task and returns a dummy cancel closure.
125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
126 task.Run();
127 return base::Closure();
128 }
129
130 } // namespace 124 } // namespace
131 125
132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} 126 SyncClient::SyncTask::SyncTask()
127 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
133 SyncClient::SyncTask::~SyncTask() {} 128 SyncClient::SyncTask::~SyncTask() {}
134 129
135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, 130 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
136 file_system::OperationObserver* observer, 131 file_system::OperationObserver* observer,
137 JobScheduler* scheduler, 132 JobScheduler* scheduler,
138 ResourceMetadata* metadata, 133 ResourceMetadata* metadata,
139 FileCache* cache, 134 FileCache* cache,
140 LoaderController* loader_controller, 135 LoaderController* loader_controller,
141 const base::FilePath& temporary_file_directory) 136 const base::FilePath& temporary_file_directory)
142 : blocking_task_runner_(blocking_task_runner), 137 : blocking_task_runner_(blocking_task_runner),
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 197
203 void SyncClient::RemoveFetchTask(const std::string& local_id) { 198 void SyncClient::RemoveFetchTask(const std::string& local_id) {
204 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 199 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
205 200
206 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); 201 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
207 if (it == tasks_.end()) 202 if (it == tasks_.end())
208 return; 203 return;
209 204
210 SyncTask* task = &it->second; 205 SyncTask* task = &it->second;
211 switch (task->state) { 206 switch (task->state) {
207 case SUSPENDED:
212 case PENDING: 208 case PENDING:
213 tasks_.erase(it); 209 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
214 break; 210 break;
215 case RUNNING: 211 case RUNNING:
216 if (!task->cancel_closure.is_null()) 212 if (!task->cancel_closure.is_null())
217 task->cancel_closure.Run(); 213 task->cancel_closure.Run();
218 break; 214 break;
219 } 215 }
220 } 216 }
221 217
222 void SyncClient::AddUpdateTask(const ClientContext& context, 218 void SyncClient::AddUpdateTask(const ClientContext& context,
223 const std::string& local_id) { 219 const std::string& local_id) {
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 220 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
225 AddUpdateTaskInternal(context, local_id, delay_); 221 AddUpdateTaskInternal(context, local_id, delay_);
226 } 222 }
227 223
224 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
225 const ClientContext& context) {
226 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
227 return download_operation_->EnsureFileDownloadedByLocalId(
228 local_id,
229 context,
230 GetFileContentInitializedCallback(),
231 google_apis::GetContentCallback(),
232 base::Bind(&SyncClient::OnFetchFileComplete,
233 weak_ptr_factory_.GetWeakPtr(),
234 local_id));
235 }
236
228 void SyncClient::AddFetchTaskInternal(const std::string& local_id, 237 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
229 const base::TimeDelta& delay) { 238 const base::TimeDelta& delay) {
230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 239 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231 240
232 SyncTask task; 241 SyncTask task;
233 task.task = base::Bind( 242 task.state = PENDING;
234 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, 243 task.context = ClientContext(BACKGROUND);
235 base::Unretained(download_operation_.get()), 244 task.task = base::Bind(&SyncClient::PerformFetchTask,
245 base::Unretained(this),
246 local_id);
247 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
248 }
249
250 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
251 const ClientContext& context) {
252 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
253 entry_update_performer_->UpdateEntry(
236 local_id, 254 local_id,
237 ClientContext(BACKGROUND), 255 context,
238 GetFileContentInitializedCallback(), 256 base::Bind(&SyncClient::OnTaskComplete,
239 google_apis::GetContentCallback(),
240 base::Bind(&SyncClient::OnFetchFileComplete,
241 weak_ptr_factory_.GetWeakPtr(), 257 weak_ptr_factory_.GetWeakPtr(),
258 UPDATE,
242 local_id)); 259 local_id));
243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); 260 return base::Closure();
244 } 261 }
245 262
246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, 263 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
247 const std::string& local_id, 264 const std::string& local_id,
248 const base::TimeDelta& delay) { 265 const base::TimeDelta& delay) {
266 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
267
249 SyncTask task; 268 SyncTask task;
250 task.task = base::Bind( 269 task.state = PENDING;
251 &RunTaskAndReturnDummyCancelClosure, 270 task.context = context;
252 base::Bind(&EntryUpdatePerformer::UpdateEntry, 271 task.task = base::Bind(&SyncClient::PerformUpdateTask,
253 base::Unretained(entry_update_performer_.get()), 272 base::Unretained(this),
254 local_id, 273 local_id);
255 context,
256 base::Bind(&SyncClient::OnUpdateComplete,
257 weak_ptr_factory_.GetWeakPtr(),
258 local_id)));
259 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); 274 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
260 } 275 }
261 276
262 void SyncClient::AddTask(const SyncTasks::key_type& key, 277 void SyncClient::AddTask(const SyncTasks::key_type& key,
263 const SyncTask& task, 278 const SyncTask& task,
264 const base::TimeDelta& delay) { 279 const base::TimeDelta& delay) {
265 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 280 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
266 281
267 SyncTasks::iterator it = tasks_.find(key); 282 SyncTasks::iterator it = tasks_.find(key);
268 if (it != tasks_.end()) { 283 if (it != tasks_.end()) {
269 switch (it->second.state) { 284 switch (it->second.state) {
285 case SUSPENDED:
286 // Activate the task.
287 it->second.state = PENDING;
288 break;
270 case PENDING: 289 case PENDING:
271 // The same task will run, do nothing. 290 // The same task will run, do nothing.
272 break; 291 return;
273 case RUNNING: 292 case RUNNING:
274 // Something has changed since the task started. Schedule rerun. 293 // Something has changed since the task started. Schedule rerun.
275 it->second.should_run_again = true; 294 it->second.should_run_again = true;
276 break; 295 return;
277 } 296 }
278 return; 297 } else {
298 tasks_[key] = task;
279 } 299 }
280
281 DCHECK_EQ(PENDING, task.state); 300 DCHECK_EQ(PENDING, task.state);
282 tasks_[key] = task;
283
284 base::MessageLoopProxy::current()->PostDelayedTask( 301 base::MessageLoopProxy::current()->PostDelayedTask(
285 FROM_HERE, 302 FROM_HERE,
286 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 303 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
287 delay); 304 delay);
288 } 305 }
289 306
290 void SyncClient::StartTask(const SyncTasks::key_type& key) { 307 void SyncClient::StartTask(const SyncTasks::key_type& key) {
291 SyncTasks::iterator it = tasks_.find(key); 308 SyncTasks::iterator it = tasks_.find(key);
292 if (it == tasks_.end()) 309 if (it == tasks_.end())
293 return; 310 return;
294 311
295 SyncTask* task = &it->second; 312 SyncTask* task = &it->second;
296 switch (task->state) { 313 switch (task->state) {
314 case SUSPENDED:
297 case PENDING: 315 case PENDING:
298 task->state = RUNNING; 316 task->state = RUNNING;
299 task->cancel_closure = task->task.Run(); 317 task->cancel_closure = task->task.Run(task->context);
300 break; 318 break;
301 case RUNNING: // Do nothing. 319 case RUNNING: // Do nothing.
302 break; 320 break;
303 } 321 }
304 } 322 }
305 323
306 void SyncClient::OnGetLocalIdsOfBacklog( 324 void SyncClient::OnGetLocalIdsOfBacklog(
307 const std::vector<std::string>* to_fetch, 325 const std::vector<std::string>* to_fetch,
308 const std::vector<std::string>* to_update) { 326 const std::vector<std::string>* to_update) {
309 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
(...skipping 13 matching lines...) Expand all
323 } 341 }
324 } 342 }
325 343
326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { 344 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 345 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328 346
329 for (size_t i = 0; i < local_ids->size(); ++i) 347 for (size_t i = 0; i < local_ids->size(); ++i)
330 AddFetchTask((*local_ids)[i]); 348 AddFetchTask((*local_ids)[i]);
331 } 349 }
332 350
333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { 351 void SyncClient::OnTaskComplete(SyncType type,
352 const std::string& local_id,
353 FileError error) {
334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 354 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
335 355
336 const SyncTasks::key_type key(type, local_id); 356 const SyncTasks::key_type key(type, local_id);
337 SyncTasks::iterator it = tasks_.find(key); 357 SyncTasks::iterator it = tasks_.find(key);
338 DCHECK(it != tasks_.end()); 358 DCHECK(it != tasks_.end());
339 359
360 bool should_erase_task = true;
361 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
362
363 switch (error) {
364 case FILE_ERROR_OK:
365 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
366 break;
367 case FILE_ERROR_ABORT:
368 // Ignore it because this is caused by user's cancel operations.
369 break;
370 case FILE_ERROR_NO_CONNECTION:
371 // Run the task again so that we'll retry once the connection is back.
372 it->second.should_run_again = true;
373 it->second.context = ClientContext(BACKGROUND);
374 break;
375 case FILE_ERROR_SERVICE_UNAVAILABLE:
376 // Run the task again so that we'll retry once the service is back.
377 it->second.should_run_again = true;
378 it->second.context = ClientContext(BACKGROUND);
379 retry_delay = long_delay_;
380 operation_observer_->OnDriveSyncError(
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
382 break;
383 case FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED: {
384 // Suspend the task and register it as a dependent task of the parent.
385 should_erase_task = false;
386 it->second.state = SUSPENDED;
387 ResourceEntry* entry = new ResourceEntry;
388 base::PostTaskAndReplyWithResult(
389 blocking_task_runner_.get(),
390 FROM_HERE,
391 base::Bind(&ResourceMetadata::GetResourceEntryById,
392 base::Unretained(metadata_),
393 local_id,
394 entry),
395 base::Bind(&SyncClient::AddDependentTask,
396 weak_ptr_factory_.GetWeakPtr(),
397 type,
398 local_id,
399 base::Owned(entry)));
400 break;
401 }
402 default:
403 operation_observer_->OnDriveSyncError(
404 file_system::DRIVE_SYNC_ERROR_MISC, local_id);
405 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
406 << ": " << FileErrorToString(error);
407 }
408
340 if (it->second.should_run_again) { 409 if (it->second.should_run_again) {
341 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; 410 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
411 it->second.state = PENDING;
342 it->second.should_run_again = false; 412 it->second.should_run_again = false;
343 it->second.task.Run(); 413 base::MessageLoopProxy::current()->PostDelayedTask(
344 return false; 414 FROM_HERE,
415 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
416 retry_delay);
417 } else if (should_erase_task) {
418 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
419 StartTask(it->second.dependent_tasks[i]);
420 tasks_.erase(it);
345 } 421 }
346
347 tasks_.erase(it);
348 return true;
349 } 422 }
350 423
351 void SyncClient::OnFetchFileComplete(const std::string& local_id, 424 void SyncClient::OnFetchFileComplete(const std::string& local_id,
352 FileError error, 425 FileError error,
353 const base::FilePath& local_path, 426 const base::FilePath& local_path,
354 scoped_ptr<ResourceEntry> entry) { 427 scoped_ptr<ResourceEntry> entry) {
355 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 428 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
356 429 OnTaskComplete(FETCH, local_id, error);
357 if (!OnTaskComplete(FETCH, local_id)) 430 if (error == FILE_ERROR_ABORT) {
358 return; 431 // If user cancels download, unpin the file so that we do not sync the file
359 432 // again.
360 if (error == FILE_ERROR_OK) { 433 base::PostTaskAndReplyWithResult(
361 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); 434 blocking_task_runner_,
362 } else { 435 FROM_HERE,
363 switch (error) { 436 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
364 case FILE_ERROR_ABORT: 437 base::Bind(&util::EmptyFileOperationCallback));
365 // If user cancels download, unpin the file so that we do not sync the
366 // file again.
367 base::PostTaskAndReplyWithResult(
368 blocking_task_runner_,
369 FROM_HERE,
370 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
371 base::Bind(&util::EmptyFileOperationCallback));
372 break;
373 case FILE_ERROR_NO_CONNECTION:
374 // Add the task again so that we'll retry once the connection is back.
375 AddFetchTaskInternal(local_id, delay_);
376 break;
377 case FILE_ERROR_SERVICE_UNAVAILABLE:
378 // Add the task again so that we'll retry once the service is back.
379 AddFetchTaskInternal(local_id, long_delay_);
380 operation_observer_->OnDriveSyncError(
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
382 local_id);
383 break;
384 default:
385 operation_observer_->OnDriveSyncError(
386 file_system::DRIVE_SYNC_ERROR_MISC,
387 local_id);
388 LOG(WARNING) << "Failed to fetch " << local_id
389 << ": " << FileErrorToString(error);
390 }
391 } 438 }
392 } 439 }
393 440
394 void SyncClient::OnUpdateComplete(const std::string& local_id, 441 void SyncClient::AddDependentTask(SyncType type,
442 const std::string& local_id,
443 const ResourceEntry* entry,
395 FileError error) { 444 FileError error) {
396 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 445 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
446 if (error != FILE_ERROR_OK) {
447 OnTaskComplete(type, local_id, error);
448 return;
449 }
397 450
398 if (!OnTaskComplete(UPDATE, local_id)) 451 const SyncTasks::key_type key_parent(type, entry->parent_local_id());
452 SyncTasks::iterator it_parent = tasks_.find(key_parent);
453 if (it_parent == tasks_.end()) {
kinaba 2014/07/17 06:07:46 Can't there be a danger that the parent task has c
hashimoto 2014/07/18 06:12:16 Extremely good catch. Changed EntryUpdatePerformer
454 OnTaskComplete(type, local_id, FILE_ERROR_NOT_FOUND);
399 return; 455 return;
456 }
400 457
401 if (error == FILE_ERROR_OK) { 458 it_parent->second.dependent_tasks.push_back(
402 DVLOG(1) << "Updated " << local_id; 459 SyncTasks::key_type(type, local_id));
403
404 // Add update tasks for child entries which may be waiting for the parent to
405 // be updated.
406 ResourceEntryVector* entries = new ResourceEntryVector;
407 base::PostTaskAndReplyWithResult(
408 blocking_task_runner_.get(),
409 FROM_HERE,
410 base::Bind(&ResourceMetadata::ReadDirectoryById,
411 base::Unretained(metadata_), local_id, entries),
412 base::Bind(&SyncClient::AddChildUpdateTasks,
413 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
414 } else {
415 switch (error) {
416 case FILE_ERROR_ABORT:
417 // Ignore it because this is caused by user's cancel operations.
418 break;
419 case FILE_ERROR_NO_CONNECTION:
420 // Add the task again so that we'll retry once the connection is back.
421 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
422 base::TimeDelta::FromSeconds(0));
423 break;
424 case FILE_ERROR_SERVICE_UNAVAILABLE:
425 // Add the task again so that we'll retry once the service is back.
426 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
427 operation_observer_->OnDriveSyncError(
428 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
429 local_id);
430 break;
431 default:
432 operation_observer_->OnDriveSyncError(
433 file_system::DRIVE_SYNC_ERROR_MISC,
434 local_id);
435 LOG(WARNING) << "Failed to update " << local_id << ": "
436 << FileErrorToString(error);
437 }
438 }
439 }
440
441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
442 FileError error) {
443 if (error != FILE_ERROR_OK)
444 return;
445
446 for (size_t i = 0; i < entries->size(); ++i) {
447 const ResourceEntry& entry = (*entries)[i];
448 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
449 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
450 base::TimeDelta::FromSeconds(0));
451 }
452 }
453 } 460 }
454 461
455 } // namespace internal 462 } // namespace internal
456 } // namespace drive 463 } // namespace drive
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698