OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/browser/chromeos/drive/sync_client.h" | 5 #include "chrome/browser/chromeos/drive/sync_client.h" |
6 | 6 |
7 #include <vector> | 7 #include <vector> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop_proxy.h" | 10 #include "base/message_loop/message_loop_proxy.h" |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
114 if (error != FILE_ERROR_OK) { | 114 if (error != FILE_ERROR_OK) { |
115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; | 115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; |
116 continue; | 116 continue; |
117 } | 117 } |
118 | 118 |
119 local_ids->push_back(local_id); | 119 local_ids->push_back(local_id); |
120 } | 120 } |
121 DCHECK(!it->HasError()); | 121 DCHECK(!it->HasError()); |
122 } | 122 } |
123 | 123 |
124 // Runs the task and returns a dummy cancel closure. | |
125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) { | |
126 task.Run(); | |
127 return base::Closure(); | |
128 } | |
129 | |
130 } // namespace | 124 } // namespace |
131 | 125 |
132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} | 126 SyncClient::SyncTask::SyncTask() |
127 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {} | |
133 SyncClient::SyncTask::~SyncTask() {} | 128 SyncClient::SyncTask::~SyncTask() {} |
134 | 129 |
135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, | 130 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, |
136 file_system::OperationObserver* observer, | 131 file_system::OperationObserver* observer, |
137 JobScheduler* scheduler, | 132 JobScheduler* scheduler, |
138 ResourceMetadata* metadata, | 133 ResourceMetadata* metadata, |
139 FileCache* cache, | 134 FileCache* cache, |
140 LoaderController* loader_controller, | 135 LoaderController* loader_controller, |
141 const base::FilePath& temporary_file_directory) | 136 const base::FilePath& temporary_file_directory) |
142 : blocking_task_runner_(blocking_task_runner), | 137 : blocking_task_runner_(blocking_task_runner), |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
202 | 197 |
203 void SyncClient::RemoveFetchTask(const std::string& local_id) { | 198 void SyncClient::RemoveFetchTask(const std::string& local_id) { |
204 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 199 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
205 | 200 |
206 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); | 201 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); |
207 if (it == tasks_.end()) | 202 if (it == tasks_.end()) |
208 return; | 203 return; |
209 | 204 |
210 SyncTask* task = &it->second; | 205 SyncTask* task = &it->second; |
211 switch (task->state) { | 206 switch (task->state) { |
207 case SUSPENDED: | |
212 case PENDING: | 208 case PENDING: |
213 tasks_.erase(it); | 209 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT); |
214 break; | 210 break; |
215 case RUNNING: | 211 case RUNNING: |
216 if (!task->cancel_closure.is_null()) | 212 if (!task->cancel_closure.is_null()) |
217 task->cancel_closure.Run(); | 213 task->cancel_closure.Run(); |
218 break; | 214 break; |
219 } | 215 } |
220 } | 216 } |
221 | 217 |
222 void SyncClient::AddUpdateTask(const ClientContext& context, | 218 void SyncClient::AddUpdateTask(const ClientContext& context, |
223 const std::string& local_id) { | 219 const std::string& local_id) { |
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 220 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
225 AddUpdateTaskInternal(context, local_id, delay_); | 221 AddUpdateTaskInternal(context, local_id, delay_); |
226 } | 222 } |
227 | 223 |
224 base::Closure SyncClient::PerformFetchTask(const std::string& local_id, | |
225 const ClientContext& context) { | |
226 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
227 return download_operation_->EnsureFileDownloadedByLocalId( | |
228 local_id, | |
229 context, | |
230 GetFileContentInitializedCallback(), | |
231 google_apis::GetContentCallback(), | |
232 base::Bind(&SyncClient::OnFetchFileComplete, | |
233 weak_ptr_factory_.GetWeakPtr(), | |
234 local_id)); | |
235 } | |
236 | |
228 void SyncClient::AddFetchTaskInternal(const std::string& local_id, | 237 void SyncClient::AddFetchTaskInternal(const std::string& local_id, |
229 const base::TimeDelta& delay) { | 238 const base::TimeDelta& delay) { |
230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 239 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
231 | 240 |
232 SyncTask task; | 241 SyncTask task; |
233 task.task = base::Bind( | 242 task.state = PENDING; |
234 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, | 243 task.context = ClientContext(BACKGROUND); |
235 base::Unretained(download_operation_.get()), | 244 task.task = base::Bind(&SyncClient::PerformFetchTask, |
245 base::Unretained(this), | |
246 local_id); | |
247 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); | |
248 } | |
249 | |
250 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id, | |
251 const ClientContext& context) { | |
252 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
253 entry_update_performer_->UpdateEntry( | |
236 local_id, | 254 local_id, |
237 ClientContext(BACKGROUND), | 255 context, |
238 GetFileContentInitializedCallback(), | 256 base::Bind(&SyncClient::OnTaskComplete, |
239 google_apis::GetContentCallback(), | |
240 base::Bind(&SyncClient::OnFetchFileComplete, | |
241 weak_ptr_factory_.GetWeakPtr(), | 257 weak_ptr_factory_.GetWeakPtr(), |
258 UPDATE, | |
242 local_id)); | 259 local_id)); |
243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); | 260 return base::Closure(); |
244 } | 261 } |
245 | 262 |
246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, | 263 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, |
247 const std::string& local_id, | 264 const std::string& local_id, |
248 const base::TimeDelta& delay) { | 265 const base::TimeDelta& delay) { |
266 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
267 | |
249 SyncTask task; | 268 SyncTask task; |
250 task.task = base::Bind( | 269 task.state = PENDING; |
251 &RunTaskAndReturnDummyCancelClosure, | 270 task.context = context; |
252 base::Bind(&EntryUpdatePerformer::UpdateEntry, | 271 task.task = base::Bind(&SyncClient::PerformUpdateTask, |
253 base::Unretained(entry_update_performer_.get()), | 272 base::Unretained(this), |
254 local_id, | 273 local_id); |
255 context, | |
256 base::Bind(&SyncClient::OnUpdateComplete, | |
257 weak_ptr_factory_.GetWeakPtr(), | |
258 local_id))); | |
259 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); | 274 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); |
260 } | 275 } |
261 | 276 |
262 void SyncClient::AddTask(const SyncTasks::key_type& key, | 277 void SyncClient::AddTask(const SyncTasks::key_type& key, |
263 const SyncTask& task, | 278 const SyncTask& task, |
264 const base::TimeDelta& delay) { | 279 const base::TimeDelta& delay) { |
265 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 280 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
266 | 281 |
267 SyncTasks::iterator it = tasks_.find(key); | 282 SyncTasks::iterator it = tasks_.find(key); |
268 if (it != tasks_.end()) { | 283 if (it != tasks_.end()) { |
269 switch (it->second.state) { | 284 switch (it->second.state) { |
285 case SUSPENDED: | |
286 // Activate the task. | |
287 it->second.state = PENDING; | |
288 break; | |
270 case PENDING: | 289 case PENDING: |
271 // The same task will run, do nothing. | 290 // The same task will run, do nothing. |
272 break; | 291 return; |
273 case RUNNING: | 292 case RUNNING: |
274 // Something has changed since the task started. Schedule rerun. | 293 // Something has changed since the task started. Schedule rerun. |
275 it->second.should_run_again = true; | 294 it->second.should_run_again = true; |
276 break; | 295 return; |
277 } | 296 } |
278 return; | 297 } else { |
298 tasks_[key] = task; | |
279 } | 299 } |
280 | |
281 DCHECK_EQ(PENDING, task.state); | 300 DCHECK_EQ(PENDING, task.state); |
282 tasks_[key] = task; | |
283 | |
284 base::MessageLoopProxy::current()->PostDelayedTask( | 301 base::MessageLoopProxy::current()->PostDelayedTask( |
285 FROM_HERE, | 302 FROM_HERE, |
286 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), | 303 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), |
287 delay); | 304 delay); |
288 } | 305 } |
289 | 306 |
290 void SyncClient::StartTask(const SyncTasks::key_type& key) { | 307 void SyncClient::StartTask(const SyncTasks::key_type& key) { |
291 SyncTasks::iterator it = tasks_.find(key); | 308 SyncTasks::iterator it = tasks_.find(key); |
292 if (it == tasks_.end()) | 309 if (it == tasks_.end()) |
293 return; | 310 return; |
294 | 311 |
295 SyncTask* task = &it->second; | 312 SyncTask* task = &it->second; |
296 switch (task->state) { | 313 switch (task->state) { |
314 case SUSPENDED: | |
297 case PENDING: | 315 case PENDING: |
298 task->state = RUNNING; | 316 task->state = RUNNING; |
299 task->cancel_closure = task->task.Run(); | 317 task->cancel_closure = task->task.Run(task->context); |
300 break; | 318 break; |
301 case RUNNING: // Do nothing. | 319 case RUNNING: // Do nothing. |
302 break; | 320 break; |
303 } | 321 } |
304 } | 322 } |
305 | 323 |
306 void SyncClient::OnGetLocalIdsOfBacklog( | 324 void SyncClient::OnGetLocalIdsOfBacklog( |
307 const std::vector<std::string>* to_fetch, | 325 const std::vector<std::string>* to_fetch, |
308 const std::vector<std::string>* to_update) { | 326 const std::vector<std::string>* to_update) { |
309 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
(...skipping 13 matching lines...) Expand all Loading... | |
323 } | 341 } |
324 } | 342 } |
325 | 343 |
326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { | 344 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { |
327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 345 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
328 | 346 |
329 for (size_t i = 0; i < local_ids->size(); ++i) | 347 for (size_t i = 0; i < local_ids->size(); ++i) |
330 AddFetchTask((*local_ids)[i]); | 348 AddFetchTask((*local_ids)[i]); |
331 } | 349 } |
332 | 350 |
333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { | 351 void SyncClient::OnTaskComplete(SyncType type, |
352 const std::string& local_id, | |
353 FileError error) { | |
334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 354 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
335 | 355 |
336 const SyncTasks::key_type key(type, local_id); | 356 const SyncTasks::key_type key(type, local_id); |
337 SyncTasks::iterator it = tasks_.find(key); | 357 SyncTasks::iterator it = tasks_.find(key); |
338 DCHECK(it != tasks_.end()); | 358 DCHECK(it != tasks_.end()); |
339 | 359 |
360 bool should_erase_task = true; | |
361 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0); | |
362 | |
363 switch (error) { | |
364 case FILE_ERROR_OK: | |
365 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id; | |
366 break; | |
367 case FILE_ERROR_ABORT: | |
368 // Ignore it because this is caused by user's cancel operations. | |
369 break; | |
370 case FILE_ERROR_NO_CONNECTION: | |
371 // Run the task again so that we'll retry once the connection is back. | |
372 it->second.should_run_again = true; | |
373 it->second.context = ClientContext(BACKGROUND); | |
374 break; | |
375 case FILE_ERROR_SERVICE_UNAVAILABLE: | |
376 // Run the task again so that we'll retry once the service is back. | |
377 it->second.should_run_again = true; | |
378 it->second.context = ClientContext(BACKGROUND); | |
379 retry_delay = long_delay_; | |
380 operation_observer_->OnDriveSyncError( | |
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id); | |
382 break; | |
383 case FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED: { | |
384 // Suspend the task and register it as a dependent task of the parent. | |
385 should_erase_task = false; | |
386 it->second.state = SUSPENDED; | |
387 ResourceEntry* entry = new ResourceEntry; | |
388 base::PostTaskAndReplyWithResult( | |
389 blocking_task_runner_.get(), | |
390 FROM_HERE, | |
391 base::Bind(&ResourceMetadata::GetResourceEntryById, | |
392 base::Unretained(metadata_), | |
393 local_id, | |
394 entry), | |
395 base::Bind(&SyncClient::AddDependentTask, | |
396 weak_ptr_factory_.GetWeakPtr(), | |
397 type, | |
398 local_id, | |
399 base::Owned(entry))); | |
400 break; | |
401 } | |
402 default: | |
403 operation_observer_->OnDriveSyncError( | |
404 file_system::DRIVE_SYNC_ERROR_MISC, local_id); | |
405 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id | |
406 << ": " << FileErrorToString(error); | |
407 } | |
408 | |
340 if (it->second.should_run_again) { | 409 if (it->second.should_run_again) { |
341 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; | 410 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; |
411 it->second.state = PENDING; | |
342 it->second.should_run_again = false; | 412 it->second.should_run_again = false; |
343 it->second.task.Run(); | 413 base::MessageLoopProxy::current()->PostDelayedTask( |
344 return false; | 414 FROM_HERE, |
415 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), | |
416 retry_delay); | |
417 } else if (should_erase_task) { | |
418 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i) | |
419 StartTask(it->second.dependent_tasks[i]); | |
420 tasks_.erase(it); | |
345 } | 421 } |
346 | |
347 tasks_.erase(it); | |
348 return true; | |
349 } | 422 } |
350 | 423 |
351 void SyncClient::OnFetchFileComplete(const std::string& local_id, | 424 void SyncClient::OnFetchFileComplete(const std::string& local_id, |
352 FileError error, | 425 FileError error, |
353 const base::FilePath& local_path, | 426 const base::FilePath& local_path, |
354 scoped_ptr<ResourceEntry> entry) { | 427 scoped_ptr<ResourceEntry> entry) { |
355 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 428 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
356 | 429 OnTaskComplete(FETCH, local_id, error); |
357 if (!OnTaskComplete(FETCH, local_id)) | 430 if (error == FILE_ERROR_ABORT) { |
358 return; | 431 // If user cancels download, unpin the file so that we do not sync the file |
359 | 432 // again. |
360 if (error == FILE_ERROR_OK) { | 433 base::PostTaskAndReplyWithResult( |
361 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); | 434 blocking_task_runner_, |
362 } else { | 435 FROM_HERE, |
363 switch (error) { | 436 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), |
364 case FILE_ERROR_ABORT: | 437 base::Bind(&util::EmptyFileOperationCallback)); |
365 // If user cancels download, unpin the file so that we do not sync the | |
366 // file again. | |
367 base::PostTaskAndReplyWithResult( | |
368 blocking_task_runner_, | |
369 FROM_HERE, | |
370 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), | |
371 base::Bind(&util::EmptyFileOperationCallback)); | |
372 break; | |
373 case FILE_ERROR_NO_CONNECTION: | |
374 // Add the task again so that we'll retry once the connection is back. | |
375 AddFetchTaskInternal(local_id, delay_); | |
376 break; | |
377 case FILE_ERROR_SERVICE_UNAVAILABLE: | |
378 // Add the task again so that we'll retry once the service is back. | |
379 AddFetchTaskInternal(local_id, long_delay_); | |
380 operation_observer_->OnDriveSyncError( | |
381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, | |
382 local_id); | |
383 break; | |
384 default: | |
385 operation_observer_->OnDriveSyncError( | |
386 file_system::DRIVE_SYNC_ERROR_MISC, | |
387 local_id); | |
388 LOG(WARNING) << "Failed to fetch " << local_id | |
389 << ": " << FileErrorToString(error); | |
390 } | |
391 } | 438 } |
392 } | 439 } |
393 | 440 |
394 void SyncClient::OnUpdateComplete(const std::string& local_id, | 441 void SyncClient::AddDependentTask(SyncType type, |
442 const std::string& local_id, | |
443 const ResourceEntry* entry, | |
395 FileError error) { | 444 FileError error) { |
396 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | 445 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
446 if (error != FILE_ERROR_OK) { | |
447 OnTaskComplete(type, local_id, error); | |
448 return; | |
449 } | |
397 | 450 |
398 if (!OnTaskComplete(UPDATE, local_id)) | 451 const SyncTasks::key_type key_parent(type, entry->parent_local_id()); |
452 SyncTasks::iterator it_parent = tasks_.find(key_parent); | |
453 if (it_parent == tasks_.end()) { | |
kinaba
2014/07/17 06:07:46
Can't there be a danger that the parent task has c
hashimoto
2014/07/18 06:12:16
Extremely good catch.
Changed EntryUpdatePerformer
| |
454 OnTaskComplete(type, local_id, FILE_ERROR_NOT_FOUND); | |
399 return; | 455 return; |
456 } | |
400 | 457 |
401 if (error == FILE_ERROR_OK) { | 458 it_parent->second.dependent_tasks.push_back( |
402 DVLOG(1) << "Updated " << local_id; | 459 SyncTasks::key_type(type, local_id)); |
403 | |
404 // Add update tasks for child entries which may be waiting for the parent to | |
405 // be updated. | |
406 ResourceEntryVector* entries = new ResourceEntryVector; | |
407 base::PostTaskAndReplyWithResult( | |
408 blocking_task_runner_.get(), | |
409 FROM_HERE, | |
410 base::Bind(&ResourceMetadata::ReadDirectoryById, | |
411 base::Unretained(metadata_), local_id, entries), | |
412 base::Bind(&SyncClient::AddChildUpdateTasks, | |
413 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries))); | |
414 } else { | |
415 switch (error) { | |
416 case FILE_ERROR_ABORT: | |
417 // Ignore it because this is caused by user's cancel operations. | |
418 break; | |
419 case FILE_ERROR_NO_CONNECTION: | |
420 // Add the task again so that we'll retry once the connection is back. | |
421 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, | |
422 base::TimeDelta::FromSeconds(0)); | |
423 break; | |
424 case FILE_ERROR_SERVICE_UNAVAILABLE: | |
425 // Add the task again so that we'll retry once the service is back. | |
426 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_); | |
427 operation_observer_->OnDriveSyncError( | |
428 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, | |
429 local_id); | |
430 break; | |
431 default: | |
432 operation_observer_->OnDriveSyncError( | |
433 file_system::DRIVE_SYNC_ERROR_MISC, | |
434 local_id); | |
435 LOG(WARNING) << "Failed to update " << local_id << ": " | |
436 << FileErrorToString(error); | |
437 } | |
438 } | |
439 } | |
440 | |
441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries, | |
442 FileError error) { | |
443 if (error != FILE_ERROR_OK) | |
444 return; | |
445 | |
446 for (size_t i = 0; i < entries->size(); ++i) { | |
447 const ResourceEntry& entry = (*entries)[i]; | |
448 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) { | |
449 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(), | |
450 base::TimeDelta::FromSeconds(0)); | |
451 } | |
452 } | |
453 } | 460 } |
454 | 461 |
455 } // namespace internal | 462 } // namespace internal |
456 } // namespace drive | 463 } // namespace drive |
OLD | NEW |