| Index: chrome/browser/sync/engine/syncer.cc
|
| ===================================================================
|
| --- chrome/browser/sync/engine/syncer.cc (revision 32731)
|
| +++ chrome/browser/sync/engine/syncer.cc (working copy)
|
| @@ -6,6 +6,7 @@
|
|
|
| #include "base/format_macros.h"
|
| #include "base/message_loop.h"
|
| +#include "base/time.h"
|
| #include "chrome/browser/chrome_thread.h"
|
| #include "chrome/browser/sync/engine/apply_updates_command.h"
|
| #include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
|
| @@ -28,6 +29,7 @@
|
| #include "chrome/browser/sync/syncable/syncable.h"
|
| #include "chrome/browser/sync/util/closure.h"
|
|
|
| +using base::TimeDelta;
|
| using sync_pb::ClientCommand;
|
| using syncable::Blob;
|
| using syncable::IS_UNAPPLIED_UPDATE;
|
| @@ -48,60 +50,41 @@
|
|
|
| namespace browser_sync {
|
|
|
| -Syncer::Syncer(
|
| - syncable::DirectoryManager* dirman,
|
| - const PathString& account_name,
|
| - ServerConnectionManager* connection_manager,
|
| - ModelSafeWorker* model_safe_worker)
|
| - : account_name_(account_name),
|
| - early_exit_requested_(false),
|
| +using sessions::StatusController;
|
| +using sessions::SyncSession;
|
| +using sessions::ConflictProgress;
|
| +
|
| +Syncer::Syncer(sessions::SyncSessionContext* context)
|
| + : early_exit_requested_(false),
|
| max_commit_batch_size_(kDefaultMaxCommitBatchSize),
|
| - connection_manager_(connection_manager),
|
| - dirman_(dirman),
|
| - command_channel_(NULL),
|
| - model_safe_worker_(model_safe_worker),
|
| + syncer_event_channel_(new SyncerEventChannel(SyncerEvent(
|
| + SyncerEvent::SHUTDOWN_USE_WITH_CARE))),
|
| + resolver_scoper_(context, &resolver_),
|
| + event_channel_scoper_(context, syncer_event_channel_.get()),
|
| + context_(context),
|
| updates_source_(sync_pb::GetUpdatesCallerInfo::UNKNOWN),
|
| - notifications_enabled_(false),
|
| pre_conflict_resolution_closure_(NULL) {
|
| - SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
|
| - syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
|
| shutdown_channel_.reset(new ShutdownChannel(this));
|
|
|
| - extensions_monitor_ = new ExtensionsActivityMonitor();
|
| -
|
| - ScopedDirLookup dir(dirman_, account_name_);
|
| + ScopedDirLookup dir(context->directory_manager(), context->account_name());
|
| // The directory must be good here.
|
| CHECK(dir.good());
|
| }
|
|
|
| -Syncer::~Syncer() {
|
| - if (!ChromeThread::DeleteSoon(ChromeThread::UI, FROM_HERE,
|
| - extensions_monitor_)) {
|
| - // In unittests, there may be no UI thread, so the above will fail.
|
| - delete extensions_monitor_;
|
| - }
|
| - extensions_monitor_ = NULL;
|
| -}
|
| -
|
| void Syncer::RequestNudge(int milliseconds) {
|
| - SyncerEvent event;
|
| - event.what_happened = SyncerEvent::REQUEST_SYNC_NUDGE;
|
| + SyncerEvent event(SyncerEvent::REQUEST_SYNC_NUDGE);
|
| event.nudge_delay_milliseconds = milliseconds;
|
| - channel()->NotifyListeners(event);
|
| + syncer_event_channel_->NotifyListeners(event);
|
| }
|
|
|
| -bool Syncer::SyncShare() {
|
| - SyncProcessState state(dirman_, account_name_, connection_manager_,
|
| - &resolver_, syncer_event_channel_.get(),
|
| - model_safe_worker());
|
| - return SyncShare(&state);
|
| +bool Syncer::SyncShare(sessions::SyncSession::Delegate* delegate) {
|
| + sessions::SyncSession session(context_, delegate);
|
| + return SyncShare(&session);
|
| }
|
|
|
| -bool Syncer::SyncShare(SyncProcessState* process_state) {
|
| - SyncCycleState cycle_state;
|
| - SyncerSession session(&cycle_state, process_state);
|
| - session.set_source(TestAndSetUpdatesSource());
|
| - session.set_notifications_enabled(notifications_enabled());
|
| +bool Syncer::SyncShare(sessions::SyncSession* session) {
|
| + session->status_controller()->ResetTransientState();
|
| + session->set_source(TestAndSetUpdatesSource());
|
| // This isn't perfect, as we can end up bundling extensions activity
|
| // intended for the next session into the current one. We could do a
|
| // test-and-reset as with the source, but note that also falls short if
|
| @@ -111,34 +94,24 @@
|
| // the records set here on the original attempt. This should provide us
|
| // with the right data "most of the time", and we're only using this for
|
| // analysis purposes, so Law of Large Numbers FTW.
|
| - extensions_monitor_->GetAndClearRecords(
|
| - session.mutable_extensions_activity());
|
| - SyncShare(&session, SYNCER_BEGIN, SYNCER_END);
|
| - return session.HasMoreToSync();
|
| + context_->extensions_monitor()->GetAndClearRecords(
|
| + session->mutable_extensions_activity());
|
| + SyncShare(session, SYNCER_BEGIN, SYNCER_END);
|
| + return session->HasMoreToSync();
|
| }
|
|
|
| -bool Syncer::SyncShare(SyncerStep first_step, SyncerStep last_step) {
|
| - SyncCycleState cycle_state;
|
| - SyncProcessState state(dirman_, account_name_, connection_manager_,
|
| - &resolver_, syncer_event_channel_.get(),
|
| - model_safe_worker());
|
| - SyncerSession session(&cycle_state, &state);
|
| +bool Syncer::SyncShare(SyncerStep first_step, SyncerStep last_step,
|
| + sessions::SyncSession::Delegate* delegate) {
|
| + sessions::SyncSession session(context_, delegate);
|
| SyncShare(&session, first_step, last_step);
|
| return session.HasMoreToSync();
|
| }
|
|
|
| -void Syncer::SyncShare(SyncerSession* session) {
|
| - SyncShare(session, SYNCER_BEGIN, SYNCER_END);
|
| -}
|
| -
|
| -void Syncer::SyncShare(SyncerSession* session,
|
| +void Syncer::SyncShare(sessions::SyncSession* session,
|
| const SyncerStep first_step,
|
| const SyncerStep last_step) {
|
| SyncerStep current_step = first_step;
|
|
|
| - // Reset silenced_until_, it is the callers responsibility to honor throttles.
|
| - silenced_until_ = session->silenced_until();
|
| -
|
| SyncerStep next_step = current_step;
|
| while (!ExitRequested()) {
|
| switch (current_step) {
|
| @@ -172,7 +145,7 @@
|
| process_updates.Execute(session);
|
| // We should download all of the updates before attempting to process
|
| // them.
|
| - if (session->CountUpdates() == 0) {
|
| + if (session->status_controller()->CountUpdates() == 0) {
|
| next_step = APPLY_UPDATES;
|
| } else {
|
| next_step = DOWNLOAD_UPDATES;
|
| @@ -189,23 +162,23 @@
|
| // These two steps are combined since they are executed within the same
|
| // write transaction.
|
| case BUILD_COMMIT_REQUEST: {
|
| - SyncerStatus status(session);
|
| - status.set_syncing(true);
|
| + session->status_controller()->set_syncing(true);
|
|
|
| LOG(INFO) << "Processing Commit Request";
|
| - ScopedDirLookup dir(session->dirman(), session->account_name());
|
| + ScopedDirLookup dir(context_->directory_manager(),
|
| + context_->account_name());
|
| if (!dir.good()) {
|
| LOG(ERROR) << "Scoped dir lookup failed!";
|
| return;
|
| }
|
| WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
|
| - SyncerSession::ScopedSetWriteTransaction set_trans(session, &trans);
|
| + sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
|
|
|
| LOG(INFO) << "Getting the Commit IDs";
|
| GetCommitIdsCommand get_commit_ids_command(max_commit_batch_size_);
|
| get_commit_ids_command.Execute(session);
|
|
|
| - if (!session->commit_ids().empty()) {
|
| + if (!session->status_controller()->commit_ids().empty()) {
|
| LOG(INFO) << "Building a commit message";
|
| BuildCommitCommand build_commit_command;
|
| build_commit_command.Execute(session);
|
| @@ -226,8 +199,7 @@
|
| }
|
| case PROCESS_COMMIT_RESPONSE: {
|
| LOG(INFO) << "Processing the commit response";
|
| - ProcessCommitResponseCommand process_response_command(
|
| - extensions_monitor_);
|
| + ProcessCommitResponseCommand process_response_command;
|
| process_response_command.Execute(session);
|
| next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
|
| break;
|
| @@ -236,7 +208,7 @@
|
| LOG(INFO) << "Building and Processing Conflict Sets";
|
| BuildAndProcessConflictSetsCommand build_process_conflict_sets;
|
| build_process_conflict_sets.Execute(session);
|
| - if (session->conflict_sets_built())
|
| + if (session->status_controller()->conflict_sets_built())
|
| next_step = SYNCER_END;
|
| else
|
| next_step = RESOLVE_CONFLICTS;
|
| @@ -253,22 +225,24 @@
|
|
|
| ResolveConflictsCommand resolve_conflicts_command;
|
| resolve_conflicts_command.Execute(session);
|
| - if (session->HasConflictingUpdates())
|
| + StatusController* status = session->status_controller();
|
| + if (status->update_progress().HasConflictingUpdates())
|
| next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
|
| else
|
| next_step = SYNCER_END;
|
| break;
|
| }
|
| case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
|
| + StatusController* status = session->status_controller();
|
| + const ConflictProgress* progress = status->conflict_progress();
|
| LOG(INFO) << "Applying updates to resolve conflicts";
|
| ApplyUpdatesCommand apply_updates;
|
| - int num_conflicting_updates = session->conflicting_update_count();
|
| + int num_conflicting_updates = progress->ConflictingItemsSize();
|
| apply_updates.Execute(session);
|
| - int post_facto_conflicting_updates =
|
| - session->conflicting_update_count();
|
| - session->set_conflicts_resolved(session->conflicts_resolved() ||
|
| + int post_facto_conflicting_updates = progress->ConflictingItemsSize();
|
| + status->set_conflicts_resolved(status->conflicts_resolved() ||
|
| num_conflicting_updates > post_facto_conflicting_updates);
|
| - if (session->conflicts_resolved())
|
| + if (status->conflicts_resolved())
|
| next_step = RESOLVE_CONFLICTS;
|
| else
|
| next_step = SYNCER_END;
|
| @@ -289,21 +263,27 @@
|
| current_step = next_step;
|
| }
|
| post_while:
|
| - // Copy any lingering useful state out of the session.
|
| - silenced_until_ = session->silenced_until();
|
| return;
|
| }
|
|
|
| -void Syncer::ProcessClientCommand(SyncerSession* session) {
|
| - if (!session->update_response().has_client_command())
|
| +void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
|
| + const ClientToServerResponse& response =
|
| + session->status_controller()->updates_response();
|
| + if (!response.has_client_command())
|
| return;
|
| - const ClientCommand command = session->update_response().client_command();
|
| - if (command_channel_)
|
| - command_channel_->NotifyListeners(&command);
|
| + const ClientCommand& command = response.client_command();
|
|
|
| // The server limits the number of items a client can commit in one batch.
|
| if (command.has_max_commit_batch_size())
|
| max_commit_batch_size_ = command.max_commit_batch_size();
|
| + if (command.has_set_sync_long_poll_interval()) {
|
| + session->delegate()->OnReceivedLongPollIntervalUpdate(
|
| + TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
|
| + }
|
| + if (command.has_set_sync_poll_interval()) {
|
| + session->delegate()->OnReceivedShortPollIntervalUpdate(
|
| + TimeDelta::FromSeconds(command.set_sync_poll_interval()));
|
| + }
|
| }
|
|
|
| void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
|
|
|