Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Side by Side Diff: chrome/browser/sync_file_system/drive_backend/sync_engine.cc

Issue 236313009: [SyncFS] Post tasks between SyncEngine and SyncWorker (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Work for comments Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 5 #include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/threading/sequenced_worker_pool.h" 10 #include "base/threading/sequenced_worker_pool.h"
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 drive_service->Initialize(signin_manager->GetAuthenticatedAccountId()); 83 drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
84 84
85 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 85 scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
86 new drive::DriveUploader(drive_service.get(), drive_task_runner.get())); 86 new drive::DriveUploader(drive_service.get(), drive_task_runner.get()));
87 87
88 drive::DriveNotificationManager* notification_manager = 88 drive::DriveNotificationManager* notification_manager =
89 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 89 drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
90 ExtensionService* extension_service = 90 ExtensionService* extension_service =
91 extensions::ExtensionSystem::Get(context)->extension_service(); 91 extensions::ExtensionSystem::Get(context)->extension_service();
92 92
93 scoped_refptr<base::SequencedTaskRunner> task_runner( 93 scoped_refptr<base::SequencedTaskRunner> file_task_runner(
94 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 94 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
95 worker_pool->GetSequenceToken(), 95 worker_pool->GetSequenceToken(),
96 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 96 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
97 97
98 // TODO(peria): Create another task runner to manage SyncWorker.
99 base::SequencedTaskRunner*
tzik 2014/04/21 04:09:10 s/base::SequencedTaskRunner/scoped_refptr<base::Si
peria 2014/04/21 04:50:06 Done.
100 worker_task_runner = base::MessageLoopProxy::current();
101
98 scoped_ptr<drive_backend::SyncEngine> sync_engine( 102 scoped_ptr<drive_backend::SyncEngine> sync_engine(
99 new SyncEngine(drive_service.Pass(), 103 new SyncEngine(drive_service.Pass(),
100 drive_uploader.Pass(), 104 drive_uploader.Pass(),
105 worker_task_runner,
101 notification_manager, 106 notification_manager,
102 extension_service, 107 extension_service,
103 signin_manager)); 108 signin_manager));
104 sync_engine->Initialize(GetSyncFileSystemDir(context->GetPath()), 109 sync_engine->Initialize(
105 task_runner.get(), 110 GetSyncFileSystemDir(context->GetPath()),
106 NULL); 111 file_task_runner.get(),
112 NULL);
107 113
108 return sync_engine.Pass(); 114 return sync_engine.Pass();
109 } 115 }
110 116
111 void SyncEngine::AppendDependsOnFactories( 117 void SyncEngine::AppendDependsOnFactories(
112 std::set<BrowserContextKeyedServiceFactory*>* factories) { 118 std::set<BrowserContextKeyedServiceFactory*>* factories) {
113 DCHECK(factories); 119 DCHECK(factories);
114 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 120 factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
115 factories->insert(SigninManagerFactory::GetInstance()); 121 factories->insert(SigninManagerFactory::GetInstance());
116 factories->insert( 122 factories->insert(
117 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 123 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
118 } 124 }
119 125
120 SyncEngine::~SyncEngine() { 126 SyncEngine::~SyncEngine() {
121 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 127 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
122 GetDriveService()->RemoveObserver(this); 128 GetDriveService()->RemoveObserver(this);
123 if (notification_manager_) 129 if (notification_manager_)
124 notification_manager_->RemoveObserver(this); 130 notification_manager_->RemoveObserver(this);
125 } 131 }
126 132
127 void SyncEngine::Initialize(const base::FilePath& base_dir, 133 void SyncEngine::Initialize(const base::FilePath& base_dir,
128 base::SequencedTaskRunner* task_runner, 134 base::SequencedTaskRunner* file_task_runner,
129 leveldb::Env* env_override) { 135 leveldb::Env* env_override) {
130 scoped_ptr<SyncEngineContext> sync_engine_context( 136 scoped_ptr<SyncEngineContext> sync_engine_context(
131 new SyncEngineContext(drive_service_.get(), 137 new SyncEngineContext(drive_service_.get(),
132 drive_uploader_.get(), 138 drive_uploader_.get(),
133 task_runner)); 139 base::MessageLoopProxy::current(),
tzik 2014/04/21 04:09:10 worker_task_runner_?
peria 2014/04/21 04:50:06 UI task runner should be set here, so base::Messag
tzik 2014/04/21 05:10:05 Ah, right I misread that.
134 // TODO(peria): Move this create function to thread pool. 140 file_task_runner));
141 // TODO(peria): Use PostTask on |worker_task_runner_| to call this function.
135 sync_worker_ = SyncWorker::CreateOnWorker(weak_ptr_factory_.GetWeakPtr(), 142 sync_worker_ = SyncWorker::CreateOnWorker(weak_ptr_factory_.GetWeakPtr(),
136 base_dir, 143 base_dir,
137 sync_engine_context.Pass(), 144 sync_engine_context.Pass(),
138 env_override); 145 env_override);
139 146
140 if (notification_manager_) 147 if (notification_manager_)
141 notification_manager_->AddObserver(this); 148 notification_manager_->AddObserver(this);
142 GetDriveService()->AddObserver(this); 149 GetDriveService()->AddObserver(this);
143 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 150 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
144 } 151 }
145 152
146 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 153 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
147 service_observers_.AddObserver(observer); 154 service_observers_.AddObserver(observer);
148 } 155 }
149 156
150 void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 157 void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
151 file_status_observers_.AddObserver(observer); 158 file_status_observers_.AddObserver(observer);
152 } 159 }
153 160
154 void SyncEngine::RegisterOrigin( 161 void SyncEngine::RegisterOrigin(
155 const GURL& origin, const SyncStatusCallback& callback) { 162 const GURL& origin, const SyncStatusCallback& callback) {
156 sync_worker_->RegisterOrigin(origin, callback); 163 worker_task_runner_->PostTask(
164 FROM_HERE,
165 base::Bind(&SyncWorker::RegisterOrigin,
166 base::Unretained(sync_worker_.get()),
167 origin, callback));
157 } 168 }
158 169
159 void SyncEngine::EnableOrigin( 170 void SyncEngine::EnableOrigin(
160 const GURL& origin, const SyncStatusCallback& callback) { 171 const GURL& origin, const SyncStatusCallback& callback) {
161 sync_worker_->EnableOrigin(origin, callback); 172 worker_task_runner_->PostTask(
173 FROM_HERE,
174 base::Bind(&SyncWorker::EnableOrigin,
175 base::Unretained(sync_worker_.get()),
176 origin, callback));
162 } 177 }
163 178
164 void SyncEngine::DisableOrigin( 179 void SyncEngine::DisableOrigin(
165 const GURL& origin, const SyncStatusCallback& callback) { 180 const GURL& origin, const SyncStatusCallback& callback) {
166 sync_worker_->DisableOrigin(origin, callback); 181 worker_task_runner_->PostTask(
182 FROM_HERE,
183 base::Bind(&SyncWorker::DisableOrigin,
184 base::Unretained(sync_worker_.get()),
185 origin, callback));
167 } 186 }
168 187
169 void SyncEngine::UninstallOrigin( 188 void SyncEngine::UninstallOrigin(
170 const GURL& origin, 189 const GURL& origin,
171 UninstallFlag flag, 190 UninstallFlag flag,
172 const SyncStatusCallback& callback) { 191 const SyncStatusCallback& callback) {
173 sync_worker_->UninstallOrigin(origin, flag, callback); 192 worker_task_runner_->PostTask(
193 FROM_HERE,
194 base::Bind(&SyncWorker::UninstallOrigin,
195 base::Unretained(sync_worker_.get()),
196 origin, flag, callback));
174 } 197 }
175 198
176 void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 199 void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
177 sync_worker_->ProcessRemoteChange(callback); 200 worker_task_runner_->PostTask(
201 FROM_HERE,
202 base::Bind(&SyncWorker::ProcessRemoteChange,
203 base::Unretained(sync_worker_.get()),
204 callback));
178 } 205 }
179 206
180 void SyncEngine::SetRemoteChangeProcessor( 207 void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
181 RemoteChangeProcessor* processor) { 208 worker_task_runner_->PostTask(
182 sync_worker_->SetRemoteChangeProcessor(processor); 209 FROM_HERE,
210 base::Bind(&SyncWorker::SetRemoteChangeProcessor,
211 base::Unretained(sync_worker_.get()),
212 processor));
183 } 213 }
184 214
185 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 215 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
186 return this; 216 return this;
187 } 217 }
188 218
189 bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) { 219 bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) {
190 // TODO(tzik): Implement this before we support manual conflict resolution. 220 // TODO(tzik): Implement this before we support manual conflict resolution.
191 return false; 221 return false;
192 } 222 }
193 223
194 RemoteServiceState SyncEngine::GetCurrentState() const { 224 RemoteServiceState SyncEngine::GetCurrentState() const {
225 // TODO(peria): Post task
195 return sync_worker_->GetCurrentState(); 226 return sync_worker_->GetCurrentState();
196 } 227 }
197 228
198 void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) { 229 void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) {
230 // TODO(peria): Make this route asynchronous.
199 sync_worker_->GetOriginStatusMap(status_map); 231 sync_worker_->GetOriginStatusMap(status_map);
200 } 232 }
201 233
202 scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) { 234 scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) {
235 // TODO(peria): Make this route asynchronous.
203 return sync_worker_->DumpFiles(origin); 236 return sync_worker_->DumpFiles(origin);
204 } 237 }
205 238
206 scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() { 239 scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() {
240 // TODO(peria): Make this route asynchronous.
207 return sync_worker_->DumpDatabase(); 241 return sync_worker_->DumpDatabase();
208 } 242 }
209 243
210 void SyncEngine::SetSyncEnabled(bool enabled) { 244 void SyncEngine::SetSyncEnabled(bool enabled) {
211 sync_worker_->SetSyncEnabled(enabled); 245 worker_task_runner_->PostTask(
246 FROM_HERE,
247 base::Bind(&SyncWorker::SetSyncEnabled,
248 base::Unretained(sync_worker_.get()),
249 enabled));
212 } 250 }
213 251
214 void SyncEngine::UpdateSyncEnabled(bool enabled) { 252 void SyncEngine::UpdateSyncEnabled(bool enabled) {
215 const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled"; 253 const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled";
216 FOR_EACH_OBSERVER( 254 FOR_EACH_OBSERVER(
217 Observer, service_observers_, 255 Observer, service_observers_,
218 OnRemoteServiceStateUpdated(GetCurrentState(), status_message)); 256 OnRemoteServiceStateUpdated(GetCurrentState(), status_message));
219 } 257 }
220 258
221 SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy( 259 SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy(
222 ConflictResolutionPolicy policy) { 260 ConflictResolutionPolicy policy) {
261 // TODO(peria): Make this route asynchronous.
223 return sync_worker_->SetDefaultConflictResolutionPolicy(policy); 262 return sync_worker_->SetDefaultConflictResolutionPolicy(policy);
224 } 263 }
225 264
226 SyncStatusCode SyncEngine::SetConflictResolutionPolicy( 265 SyncStatusCode SyncEngine::SetConflictResolutionPolicy(
227 const GURL& origin, 266 const GURL& origin,
228 ConflictResolutionPolicy policy) { 267 ConflictResolutionPolicy policy) {
268 // TODO(peria): Make this route asynchronous.
229 return sync_worker_->SetConflictResolutionPolicy(origin, policy); 269 return sync_worker_->SetConflictResolutionPolicy(origin, policy);
230 } 270 }
231 271
232 ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy() 272 ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy()
233 const { 273 const {
274 // TODO(peria): Make this route asynchronous.
234 return sync_worker_->GetDefaultConflictResolutionPolicy(); 275 return sync_worker_->GetDefaultConflictResolutionPolicy();
235 } 276 }
236 277
237 ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy( 278 ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy(
238 const GURL& origin) const { 279 const GURL& origin) const {
280 // TODO(peria): Make this route asynchronous.
239 return sync_worker_->GetConflictResolutionPolicy(origin); 281 return sync_worker_->GetConflictResolutionPolicy(origin);
240 } 282 }
241 283
242 void SyncEngine::GetRemoteVersions( 284 void SyncEngine::GetRemoteVersions(
243 const fileapi::FileSystemURL& url, 285 const fileapi::FileSystemURL& url,
244 const RemoteVersionsCallback& callback) { 286 const RemoteVersionsCallback& callback) {
245 // TODO(tzik): Implement this before we support manual conflict resolution. 287 // TODO(tzik): Implement this before we support manual conflict resolution.
246 callback.Run(SYNC_STATUS_FAILED, std::vector<Version>()); 288 callback.Run(SYNC_STATUS_FAILED, std::vector<Version>());
247 } 289 }
248 290
(...skipping 15 matching lines...) Expand all
264 OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker())); 306 OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
265 } 307 }
266 } 308 }
267 309
268 void SyncEngine::ApplyLocalChange( 310 void SyncEngine::ApplyLocalChange(
269 const FileChange& local_change, 311 const FileChange& local_change,
270 const base::FilePath& local_path, 312 const base::FilePath& local_path,
271 const SyncFileMetadata& local_metadata, 313 const SyncFileMetadata& local_metadata,
272 const fileapi::FileSystemURL& url, 314 const fileapi::FileSystemURL& url,
273 const SyncStatusCallback& callback) { 315 const SyncStatusCallback& callback) {
274 sync_worker_->ApplyLocalChange( 316 worker_task_runner_->PostTask(
275 local_change, local_path, local_metadata, url, callback); 317 FROM_HERE,
318 base::Bind(&SyncWorker::ApplyLocalChange,
319 base::Unretained(sync_worker_.get()),
320 local_change,
321 local_path,
322 local_metadata,
323 url,
324 callback));
276 } 325 }
277 326
278 SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() { 327 SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
328 // TODO(peria): Post task
279 return sync_worker_->GetSyncTaskManager(); 329 return sync_worker_->GetSyncTaskManager();
280 } 330 }
281 331
282 void SyncEngine::OnNotificationReceived() { 332 void SyncEngine::OnNotificationReceived() {
283 sync_worker_->OnNotificationReceived(); 333 worker_task_runner_->PostTask(
334 FROM_HERE,
335 base::Bind(&SyncWorker::OnNotificationReceived,
336 base::Unretained(sync_worker_.get())));
284 } 337 }
285 338
286 void SyncEngine::OnPushNotificationEnabled(bool) {} 339 void SyncEngine::OnPushNotificationEnabled(bool) {}
287 340
288 void SyncEngine::OnReadyToSendRequests() { 341 void SyncEngine::OnReadyToSendRequests() {
289 sync_worker_->OnReadyToSendRequests( 342 const std::string account_id =
290 signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : ""); 343 signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
344
345 worker_task_runner_->PostTask(
346 FROM_HERE,
347 base::Bind(&SyncWorker::OnReadyToSendRequests,
348 base::Unretained(sync_worker_.get()),
349 account_id));
291 } 350 }
292 351
293 void SyncEngine::OnRefreshTokenInvalid() { 352 void SyncEngine::OnRefreshTokenInvalid() {
294 sync_worker_->OnRefreshTokenInvalid(); 353 worker_task_runner_->PostTask(
354 FROM_HERE,
355 base::Bind(&SyncWorker::OnRefreshTokenInvalid,
356 base::Unretained(sync_worker_.get())));
295 } 357 }
296 358
297 void SyncEngine::OnNetworkChanged( 359 void SyncEngine::OnNetworkChanged(
298 net::NetworkChangeNotifier::ConnectionType type) { 360 net::NetworkChangeNotifier::ConnectionType type) {
299 sync_worker_->OnNetworkChanged(type); 361 worker_task_runner_->PostTask(
362 FROM_HERE,
363 base::Bind(&SyncWorker::OnNetworkChanged,
364 base::Unretained(sync_worker_.get()),
365 type));
300 } 366 }
301 367
302 drive::DriveServiceInterface* SyncEngine::GetDriveService() { 368 drive::DriveServiceInterface* SyncEngine::GetDriveService() {
303 return sync_worker_->GetDriveService(); 369 return drive_service_.get();
304 } 370 }
305 371
306 drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() { 372 drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
307 return sync_worker_->GetDriveUploader(); 373 return drive_uploader_.get();
308 } 374 }
309 375
310 MetadataDatabase* SyncEngine::GetMetadataDatabase() { 376 MetadataDatabase* SyncEngine::GetMetadataDatabase() {
377 // TODO(peria): Post task
311 return sync_worker_->GetMetadataDatabase(); 378 return sync_worker_->GetMetadataDatabase();
312 } 379 }
313 380
314 SyncEngine::SyncEngine( 381 SyncEngine::SyncEngine(
315 scoped_ptr<drive::DriveServiceInterface> drive_service, 382 scoped_ptr<drive::DriveServiceInterface> drive_service,
316 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 383 scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
384 base::SequencedTaskRunner* worker_task_runner,
317 drive::DriveNotificationManager* notification_manager, 385 drive::DriveNotificationManager* notification_manager,
318 ExtensionServiceInterface* extension_service, 386 ExtensionServiceInterface* extension_service,
319 SigninManagerBase* signin_manager) 387 SigninManagerBase* signin_manager)
320 : drive_service_(drive_service.Pass()), 388 : drive_service_(drive_service.Pass()),
321 drive_uploader_(drive_uploader.Pass()), 389 drive_uploader_(drive_uploader.Pass()),
322 notification_manager_(notification_manager), 390 notification_manager_(notification_manager),
323 extension_service_(extension_service), 391 extension_service_(extension_service),
324 signin_manager_(signin_manager), 392 signin_manager_(signin_manager),
393 worker_task_runner_(worker_task_runner),
325 weak_ptr_factory_(this) {} 394 weak_ptr_factory_(this) {}
326 395
327 void SyncEngine::DidProcessRemoteChange(RemoteToLocalSyncer* syncer) { 396 void SyncEngine::DidProcessRemoteChange(RemoteToLocalSyncer* syncer) {
328 if (syncer->sync_action() != SYNC_ACTION_NONE && syncer->url().is_valid()) { 397 if (syncer->sync_action() != SYNC_ACTION_NONE && syncer->url().is_valid()) {
329 FOR_EACH_OBSERVER(FileStatusObserver, 398 FOR_EACH_OBSERVER(FileStatusObserver,
330 file_status_observers_, 399 file_status_observers_,
331 OnFileStatusChanged(syncer->url(), 400 OnFileStatusChanged(syncer->url(),
332 SYNC_FILE_STATUS_SYNCED, 401 SYNC_FILE_STATUS_SYNCED,
333 syncer->sync_action(), 402 syncer->sync_action(),
334 SYNC_DIRECTION_REMOTE_TO_LOCAL)); 403 SYNC_DIRECTION_REMOTE_TO_LOCAL));
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
402 void SyncEngine::NotifyLastOperationStatus() { 471 void SyncEngine::NotifyLastOperationStatus() {
403 FOR_EACH_OBSERVER( 472 FOR_EACH_OBSERVER(
404 Observer, 473 Observer,
405 service_observers_, 474 service_observers_,
406 OnRemoteChangeQueueUpdated( 475 OnRemoteChangeQueueUpdated(
407 GetMetadataDatabase()->CountDirtyTracker())); 476 GetMetadataDatabase()->CountDirtyTracker()));
408 } 477 }
409 478
410 } // namespace drive_backend 479 } // namespace drive_backend
411 } // namespace sync_file_system 480 } // namespace sync_file_system
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698