| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "chrome/browser/chromeos/drive/sync_client.h" | |
| 6 | |
| 7 #include <vector> | |
| 8 | |
| 9 #include "base/bind.h" | |
| 10 #include "base/thread_task_runner_handle.h" | |
| 11 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h" | |
| 12 #include "components/drive/drive.pb.h" | |
| 13 #include "components/drive/file_cache.h" | |
| 14 #include "components/drive/file_system/download_operation.h" | |
| 15 #include "components/drive/file_system/operation_delegate.h" | |
| 16 #include "components/drive/file_system_core_util.h" | |
| 17 #include "components/drive/job_scheduler.h" | |
| 18 #include "google_apis/drive/task_util.h" | |
| 19 | |
| 20 namespace drive { | |
| 21 namespace internal { | |
| 22 | |
| 23 namespace { | |
| 24 | |
| 25 // The delay constant is used to delay processing a sync task. We should not | |
| 26 // process SyncTasks immediately for the following reasons: | |
| 27 // | |
| 28 // 1) For fetching, the user may accidentally click on "Make available | |
| 29 // offline" checkbox on a file, and immediately cancel it in a second. | |
| 30 // It's a waste to fetch the file in this case. | |
| 31 // | |
| 32 // 2) For uploading, file writing via HTML5 file system API is performed in | |
| 33 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We | |
| 34 // shouldn't start uploading right after the step 1). Besides, the user | |
| 35 // may edit the same file repeatedly in a short period of time. | |
| 36 // | |
| 37 // TODO(satorux): We should find a way to handle the upload case more nicely, | |
| 38 // and shorten the delay. crbug.com/134774 | |
| 39 const int kDelaySeconds = 1; | |
| 40 | |
| 41 // The delay constant is used to delay retrying a sync task on server errors. | |
| 42 const int kLongDelaySeconds = 600; | |
| 43 | |
| 44 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not | |
| 45 // fetched (not present locally), to |to_update| if the file needs update. | |
| 46 void CollectBacklog(ResourceMetadata* metadata, | |
| 47 std::vector<std::string>* to_fetch, | |
| 48 std::vector<std::string>* to_update) { | |
| 49 DCHECK(to_fetch); | |
| 50 DCHECK(to_update); | |
| 51 | |
| 52 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); | |
| 53 for (; !it->IsAtEnd(); it->Advance()) { | |
| 54 const std::string& local_id = it->GetID(); | |
| 55 const ResourceEntry& entry = it->GetValue(); | |
| 56 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) { | |
| 57 to_update->push_back(local_id); | |
| 58 continue; | |
| 59 } | |
| 60 | |
| 61 bool should_update = false; | |
| 62 switch (entry.metadata_edit_state()) { | |
| 63 case ResourceEntry::CLEAN: | |
| 64 break; | |
| 65 case ResourceEntry::SYNCING: | |
| 66 case ResourceEntry::DIRTY: | |
| 67 should_update = true; | |
| 68 break; | |
| 69 } | |
| 70 | |
| 71 if (entry.file_specific_info().cache_state().is_pinned() && | |
| 72 !entry.file_specific_info().cache_state().is_present()) | |
| 73 to_fetch->push_back(local_id); | |
| 74 | |
| 75 if (entry.file_specific_info().cache_state().is_dirty()) | |
| 76 should_update = true; | |
| 77 | |
| 78 if (should_update) | |
| 79 to_update->push_back(local_id); | |
| 80 } | |
| 81 DCHECK(!it->HasError()); | |
| 82 } | |
| 83 | |
| 84 // Iterates cache entries and collects IDs of ones with obsolete cache files. | |
| 85 void CheckExistingPinnedFiles(ResourceMetadata* metadata, | |
| 86 FileCache* cache, | |
| 87 std::vector<std::string>* local_ids) { | |
| 88 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); | |
| 89 for (; !it->IsAtEnd(); it->Advance()) { | |
| 90 const ResourceEntry& entry = it->GetValue(); | |
| 91 const FileCacheEntry& cache_state = | |
| 92 entry.file_specific_info().cache_state(); | |
| 93 const std::string& local_id = it->GetID(); | |
| 94 if (!cache_state.is_pinned() || !cache_state.is_present()) | |
| 95 continue; | |
| 96 | |
| 97 // If MD5s don't match, it indicates the local cache file is stale, unless | |
| 98 // the file is dirty (the MD5 is "local"). We should never re-fetch the | |
| 99 // file when we have a locally modified version. | |
| 100 if (entry.file_specific_info().md5() == cache_state.md5() || | |
| 101 cache_state.is_dirty()) | |
| 102 continue; | |
| 103 | |
| 104 FileError error = cache->Remove(local_id); | |
| 105 if (error != FILE_ERROR_OK) { | |
| 106 LOG(WARNING) << "Failed to remove cache entry: " << local_id; | |
| 107 continue; | |
| 108 } | |
| 109 | |
| 110 error = cache->Pin(local_id); | |
| 111 if (error != FILE_ERROR_OK) { | |
| 112 LOG(WARNING) << "Failed to pin cache entry: " << local_id; | |
| 113 continue; | |
| 114 } | |
| 115 | |
| 116 local_ids->push_back(local_id); | |
| 117 } | |
| 118 DCHECK(!it->HasError()); | |
| 119 } | |
| 120 | |
| 121 // Gets the parent entry of the entry specified by the ID. | |
| 122 FileError GetParentResourceEntry(ResourceMetadata* metadata, | |
| 123 const std::string& local_id, | |
| 124 ResourceEntry* parent) { | |
| 125 ResourceEntry entry; | |
| 126 FileError error = metadata->GetResourceEntryById(local_id, &entry); | |
| 127 if (error != FILE_ERROR_OK) | |
| 128 return error; | |
| 129 return metadata->GetResourceEntryById(entry.parent_local_id(), parent); | |
| 130 } | |
| 131 | |
| 132 } // namespace | |
| 133 | |
| 134 SyncClient::SyncTask::SyncTask() | |
| 135 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {} | |
| 136 SyncClient::SyncTask::~SyncTask() {} | |
| 137 | |
| 138 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, | |
| 139 file_system::OperationDelegate* delegate, | |
| 140 JobScheduler* scheduler, | |
| 141 ResourceMetadata* metadata, | |
| 142 FileCache* cache, | |
| 143 LoaderController* loader_controller, | |
| 144 const base::FilePath& temporary_file_directory) | |
| 145 : blocking_task_runner_(blocking_task_runner), | |
| 146 operation_delegate_(delegate), | |
| 147 metadata_(metadata), | |
| 148 cache_(cache), | |
| 149 download_operation_(new file_system::DownloadOperation( | |
| 150 blocking_task_runner, | |
| 151 delegate, | |
| 152 scheduler, | |
| 153 metadata, | |
| 154 cache, | |
| 155 temporary_file_directory)), | |
| 156 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner, | |
| 157 delegate, | |
| 158 scheduler, | |
| 159 metadata, | |
| 160 cache, | |
| 161 loader_controller)), | |
| 162 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)), | |
| 163 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)), | |
| 164 weak_ptr_factory_(this) { | |
| 165 } | |
| 166 | |
| 167 SyncClient::~SyncClient() { | |
| 168 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 169 } | |
| 170 | |
| 171 void SyncClient::StartProcessingBacklog() { | |
| 172 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 173 | |
| 174 std::vector<std::string>* to_fetch = new std::vector<std::string>; | |
| 175 std::vector<std::string>* to_update = new std::vector<std::string>; | |
| 176 blocking_task_runner_->PostTaskAndReply( | |
| 177 FROM_HERE, | |
| 178 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update), | |
| 179 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog, | |
| 180 weak_ptr_factory_.GetWeakPtr(), | |
| 181 base::Owned(to_fetch), | |
| 182 base::Owned(to_update))); | |
| 183 } | |
| 184 | |
| 185 void SyncClient::StartCheckingExistingPinnedFiles() { | |
| 186 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 187 | |
| 188 std::vector<std::string>* local_ids = new std::vector<std::string>; | |
| 189 blocking_task_runner_->PostTaskAndReply( | |
| 190 FROM_HERE, | |
| 191 base::Bind(&CheckExistingPinnedFiles, | |
| 192 metadata_, | |
| 193 cache_, | |
| 194 local_ids), | |
| 195 base::Bind(&SyncClient::AddFetchTasks, | |
| 196 weak_ptr_factory_.GetWeakPtr(), | |
| 197 base::Owned(local_ids))); | |
| 198 } | |
| 199 | |
| 200 void SyncClient::AddFetchTask(const std::string& local_id) { | |
| 201 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 202 AddFetchTaskInternal(local_id, delay_); | |
| 203 } | |
| 204 | |
| 205 void SyncClient::RemoveFetchTask(const std::string& local_id) { | |
| 206 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 207 | |
| 208 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); | |
| 209 if (it == tasks_.end()) | |
| 210 return; | |
| 211 | |
| 212 SyncTask* task = &it->second; | |
| 213 switch (task->state) { | |
| 214 case SUSPENDED: | |
| 215 case PENDING: | |
| 216 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT); | |
| 217 break; | |
| 218 case RUNNING: | |
| 219 if (!task->cancel_closure.is_null()) | |
| 220 task->cancel_closure.Run(); | |
| 221 break; | |
| 222 } | |
| 223 } | |
| 224 | |
| 225 void SyncClient::AddUpdateTask(const ClientContext& context, | |
| 226 const std::string& local_id) { | |
| 227 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 228 AddUpdateTaskInternal(context, local_id, delay_); | |
| 229 } | |
| 230 | |
| 231 bool SyncClient:: WaitForUpdateTaskToComplete( | |
| 232 const std::string& local_id, | |
| 233 const FileOperationCallback& callback) { | |
| 234 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 235 | |
| 236 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id)); | |
| 237 if (it == tasks_.end()) | |
| 238 return false; | |
| 239 | |
| 240 SyncTask* task = &it->second; | |
| 241 task->waiting_callbacks.push_back(callback); | |
| 242 return true; | |
| 243 } | |
| 244 | |
| 245 base::Closure SyncClient::PerformFetchTask(const std::string& local_id, | |
| 246 const ClientContext& context) { | |
| 247 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 248 return download_operation_->EnsureFileDownloadedByLocalId( | |
| 249 local_id, | |
| 250 context, | |
| 251 GetFileContentInitializedCallback(), | |
| 252 google_apis::GetContentCallback(), | |
| 253 base::Bind(&SyncClient::OnFetchFileComplete, | |
| 254 weak_ptr_factory_.GetWeakPtr(), | |
| 255 local_id)); | |
| 256 } | |
| 257 | |
| 258 void SyncClient::AddFetchTaskInternal(const std::string& local_id, | |
| 259 const base::TimeDelta& delay) { | |
| 260 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 261 | |
| 262 SyncTask task; | |
| 263 task.state = PENDING; | |
| 264 task.context = ClientContext(BACKGROUND); | |
| 265 task.task = base::Bind(&SyncClient::PerformFetchTask, | |
| 266 base::Unretained(this), | |
| 267 local_id); | |
| 268 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); | |
| 269 } | |
| 270 | |
| 271 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id, | |
| 272 const ClientContext& context) { | |
| 273 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 274 entry_update_performer_->UpdateEntry( | |
| 275 local_id, | |
| 276 context, | |
| 277 base::Bind(&SyncClient::OnTaskComplete, | |
| 278 weak_ptr_factory_.GetWeakPtr(), | |
| 279 UPDATE, | |
| 280 local_id)); | |
| 281 return base::Closure(); | |
| 282 } | |
| 283 | |
| 284 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, | |
| 285 const std::string& local_id, | |
| 286 const base::TimeDelta& delay) { | |
| 287 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 288 | |
| 289 SyncTask task; | |
| 290 task.state = PENDING; | |
| 291 task.context = context; | |
| 292 task.task = base::Bind(&SyncClient::PerformUpdateTask, | |
| 293 base::Unretained(this), | |
| 294 local_id); | |
| 295 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); | |
| 296 } | |
| 297 | |
| 298 void SyncClient::AddTask(const SyncTasks::key_type& key, | |
| 299 const SyncTask& task, | |
| 300 const base::TimeDelta& delay) { | |
| 301 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 302 | |
| 303 SyncTasks::iterator it = tasks_.find(key); | |
| 304 if (it != tasks_.end()) { | |
| 305 switch (it->second.state) { | |
| 306 case SUSPENDED: | |
| 307 // Activate the task. | |
| 308 it->second.state = PENDING; | |
| 309 break; | |
| 310 case PENDING: | |
| 311 // The same task will run, do nothing. | |
| 312 return; | |
| 313 case RUNNING: | |
| 314 // Something has changed since the task started. Schedule rerun. | |
| 315 it->second.should_run_again = true; | |
| 316 return; | |
| 317 } | |
| 318 } else { | |
| 319 tasks_[key] = task; | |
| 320 } | |
| 321 DCHECK_EQ(PENDING, task.state); | |
| 322 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
| 323 FROM_HERE, | |
| 324 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), | |
| 325 delay); | |
| 326 } | |
| 327 | |
| 328 void SyncClient::StartTask(const SyncTasks::key_type& key) { | |
| 329 ResourceEntry* parent = new ResourceEntry; | |
| 330 base::PostTaskAndReplyWithResult( | |
| 331 blocking_task_runner_.get(), | |
| 332 FROM_HERE, | |
| 333 base::Bind(&GetParentResourceEntry, metadata_, key.second, parent), | |
| 334 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry, | |
| 335 weak_ptr_factory_.GetWeakPtr(), | |
| 336 key, | |
| 337 base::Owned(parent))); | |
| 338 } | |
| 339 | |
| 340 void SyncClient::StartTaskAfterGetParentResourceEntry( | |
| 341 const SyncTasks::key_type& key, | |
| 342 const ResourceEntry* parent, | |
| 343 FileError error) { | |
| 344 const SyncType type = key.first; | |
| 345 const std::string& local_id = key.second; | |
| 346 SyncTasks::iterator it = tasks_.find(key); | |
| 347 if (it == tasks_.end()) | |
| 348 return; | |
| 349 | |
| 350 SyncTask* task = &it->second; | |
| 351 switch (task->state) { | |
| 352 case SUSPENDED: | |
| 353 case PENDING: | |
| 354 break; | |
| 355 case RUNNING: // Do nothing. | |
| 356 return; | |
| 357 } | |
| 358 | |
| 359 if (error != FILE_ERROR_OK) { | |
| 360 OnTaskComplete(type, local_id, error); | |
| 361 return; | |
| 362 } | |
| 363 | |
| 364 if (type == UPDATE && | |
| 365 parent->resource_id().empty() && | |
| 366 parent->local_id() != util::kDriveTrashDirLocalId) { | |
| 367 // Parent entry needs to be synced to get a resource ID. | |
| 368 // Suspend the task and register it as a dependent task of the parent. | |
| 369 const SyncTasks::key_type key_parent(type, parent->local_id()); | |
| 370 SyncTasks::iterator it_parent = tasks_.find(key_parent); | |
| 371 if (it_parent == tasks_.end()) { | |
| 372 OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION); | |
| 373 LOG(WARNING) << "Parent task not found: type = " << type << ", id = " | |
| 374 << local_id << ", parent_id = " << parent->local_id(); | |
| 375 return; | |
| 376 } | |
| 377 task->state = SUSPENDED; | |
| 378 it_parent->second.dependent_tasks.push_back(key); | |
| 379 return; | |
| 380 } | |
| 381 | |
| 382 // Run the task. | |
| 383 task->state = RUNNING; | |
| 384 task->cancel_closure = task->task.Run(task->context); | |
| 385 } | |
| 386 | |
| 387 void SyncClient::OnGetLocalIdsOfBacklog( | |
| 388 const std::vector<std::string>* to_fetch, | |
| 389 const std::vector<std::string>* to_update) { | |
| 390 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 391 | |
| 392 // Give priority to upload tasks over fetch tasks, so that dirty files are | |
| 393 // uploaded as soon as possible. | |
| 394 for (size_t i = 0; i < to_update->size(); ++i) { | |
| 395 const std::string& local_id = (*to_update)[i]; | |
| 396 DVLOG(1) << "Queuing to update: " << local_id; | |
| 397 AddUpdateTask(ClientContext(BACKGROUND), local_id); | |
| 398 } | |
| 399 | |
| 400 for (size_t i = 0; i < to_fetch->size(); ++i) { | |
| 401 const std::string& local_id = (*to_fetch)[i]; | |
| 402 DVLOG(1) << "Queuing to fetch: " << local_id; | |
| 403 AddFetchTaskInternal(local_id, delay_); | |
| 404 } | |
| 405 } | |
| 406 | |
| 407 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { | |
| 408 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 409 | |
| 410 for (size_t i = 0; i < local_ids->size(); ++i) | |
| 411 AddFetchTask((*local_ids)[i]); | |
| 412 } | |
| 413 | |
| 414 void SyncClient::OnTaskComplete(SyncType type, | |
| 415 const std::string& local_id, | |
| 416 FileError error) { | |
| 417 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 418 | |
| 419 const SyncTasks::key_type key(type, local_id); | |
| 420 SyncTasks::iterator it = tasks_.find(key); | |
| 421 DCHECK(it != tasks_.end()); | |
| 422 | |
| 423 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0); | |
| 424 | |
| 425 switch (error) { | |
| 426 case FILE_ERROR_OK: | |
| 427 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id; | |
| 428 break; | |
| 429 case FILE_ERROR_ABORT: | |
| 430 // Ignore it because this is caused by user's cancel operations. | |
| 431 break; | |
| 432 case FILE_ERROR_NO_CONNECTION: | |
| 433 // Run the task again so that we'll retry once the connection is back. | |
| 434 it->second.should_run_again = true; | |
| 435 it->second.context = ClientContext(BACKGROUND); | |
| 436 break; | |
| 437 case FILE_ERROR_SERVICE_UNAVAILABLE: | |
| 438 // Run the task again so that we'll retry once the service is back. | |
| 439 it->second.should_run_again = true; | |
| 440 it->second.context = ClientContext(BACKGROUND); | |
| 441 retry_delay = long_delay_; | |
| 442 operation_delegate_->OnDriveSyncError( | |
| 443 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id); | |
| 444 break; | |
| 445 default: | |
| 446 operation_delegate_->OnDriveSyncError( | |
| 447 file_system::DRIVE_SYNC_ERROR_MISC, local_id); | |
| 448 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id | |
| 449 << ": " << FileErrorToString(error); | |
| 450 } | |
| 451 | |
| 452 for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) { | |
| 453 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 454 FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error)); | |
| 455 } | |
| 456 it->second.waiting_callbacks.clear(); | |
| 457 | |
| 458 if (it->second.should_run_again) { | |
| 459 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; | |
| 460 it->second.state = PENDING; | |
| 461 it->second.should_run_again = false; | |
| 462 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
| 463 FROM_HERE, | |
| 464 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), | |
| 465 retry_delay); | |
| 466 } else { | |
| 467 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i) | |
| 468 StartTask(it->second.dependent_tasks[i]); | |
| 469 tasks_.erase(it); | |
| 470 } | |
| 471 } | |
| 472 | |
| 473 void SyncClient::OnFetchFileComplete(const std::string& local_id, | |
| 474 FileError error, | |
| 475 const base::FilePath& local_path, | |
| 476 scoped_ptr<ResourceEntry> entry) { | |
| 477 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 478 OnTaskComplete(FETCH, local_id, error); | |
| 479 if (error == FILE_ERROR_ABORT) { | |
| 480 // If user cancels download, unpin the file so that we do not sync the file | |
| 481 // again. | |
| 482 base::PostTaskAndReplyWithResult( | |
| 483 blocking_task_runner_.get(), | |
| 484 FROM_HERE, | |
| 485 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), | |
| 486 base::Bind(&util::EmptyFileOperationCallback)); | |
| 487 } | |
| 488 } | |
| 489 | |
| 490 } // namespace internal | |
| 491 } // namespace drive | |
| OLD | NEW |