Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: chrome/browser/sync/engine/syncapi.cc

Issue 7044059: [Sync] Ensure all accesses to SyncInternal's observers are thread safe. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Review Created 9 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tools/valgrind/tsan/suppressions.txt » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncapi.h" 5 #include "chrome/browser/sync/engine/syncapi.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <bitset> 8 #include <bitset>
9 #include <iomanip> 9 #include <iomanip>
10 #include <list> 10 #include <list>
(...skipping 1219 matching lines...) Expand 10 before | Expand all | Expand 10 after
1230 1230
1231 // SyncNotifierObserver implementation. 1231 // SyncNotifierObserver implementation.
1232 virtual void OnNotificationStateChange( 1232 virtual void OnNotificationStateChange(
1233 bool notifications_enabled); 1233 bool notifications_enabled);
1234 1234
1235 virtual void OnIncomingNotification( 1235 virtual void OnIncomingNotification(
1236 const syncable::ModelTypePayloadMap& type_payloads); 1236 const syncable::ModelTypePayloadMap& type_payloads);
1237 1237
1238 virtual void StoreState(const std::string& cookie); 1238 virtual void StoreState(const std::string& cookie);
1239 1239
1240 // Thread-safe observers_ accessors.
1241 void CopyObservers(ObserverList<SyncManager::Observer>* observers_copy);
akalin 2011/06/08 23:14:41 this should be const, too
Nicolas Zea 2011/06/08 23:25:33 ObserverList does not have a const iterator, so th
1242 bool HaveObservers() const;
1240 void AddObserver(SyncManager::Observer* observer); 1243 void AddObserver(SyncManager::Observer* observer);
1241
1242 void RemoveObserver(SyncManager::Observer* observer); 1244 void RemoveObserver(SyncManager::Observer* observer);
1243 1245
1244 // Accessors for the private members. 1246 // Accessors for the private members.
1245 DirectoryManager* dir_manager() { return share_.dir_manager.get(); } 1247 DirectoryManager* dir_manager() { return share_.dir_manager.get(); }
1246 SyncAPIServerConnectionManager* connection_manager() { 1248 SyncAPIServerConnectionManager* connection_manager() {
1247 return connection_manager_.get(); 1249 return connection_manager_.get();
1248 } 1250 }
1249 SyncerThread* syncer_thread() { return syncer_thread_.get(); } 1251 SyncerThread* syncer_thread() { return syncer_thread_.get(); }
1250 UserShare* GetUserShare() { return &share_; } 1252 UserShare* GetUserShare() { return &share_; }
1251 1253
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after
1520 browser_sync::JsArgList FindNodesContainingString( 1522 browser_sync::JsArgList FindNodesContainingString(
1521 const browser_sync::JsArgList& args); 1523 const browser_sync::JsArgList& args);
1522 1524
1523 // We couple the DirectoryManager and username together in a UserShare member 1525 // We couple the DirectoryManager and username together in a UserShare member
1524 // so we can return a handle to share_ to clients of the API for use when 1526 // so we can return a handle to share_ to clients of the API for use when
1525 // constructing any transaction type. 1527 // constructing any transaction type.
1526 UserShare share_; 1528 UserShare share_;
1527 1529
1528 MessageLoop* core_message_loop_; 1530 MessageLoop* core_message_loop_;
1529 1531
1532 // We have to lock around every observers_ access because it can get accessed
1533 // from any thread and added to/removed from on the core thread.
1534 mutable base::Lock observers_lock_;
1530 ObserverList<SyncManager::Observer> observers_; 1535 ObserverList<SyncManager::Observer> observers_;
1531 1536
1532 browser_sync::JsEventRouter* parent_router_; 1537 browser_sync::JsEventRouter* parent_router_;
1533 1538
1534 // The ServerConnectionManager used to abstract communication between the 1539 // The ServerConnectionManager used to abstract communication between the
1535 // client (the Syncer) and the sync server. 1540 // client (the Syncer) and the sync server.
1536 scoped_ptr<SyncAPIServerConnectionManager> connection_manager_; 1541 scoped_ptr<SyncAPIServerConnectionManager> connection_manager_;
1537 1542
1538 // The thread that runs the Syncer. Needs to be explicitly Start()ed. 1543 // The thread that runs the Syncer. Needs to be explicitly Start()ed.
1539 scoped_ptr<SyncerThread> syncer_thread_; 1544 scoped_ptr<SyncerThread> syncer_thread_;
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after
1801 1806
1802 ReadNode node(&trans); 1807 ReadNode node(&trans);
1803 if (!node.InitByTagLookup(kNigoriTag)) { 1808 if (!node.InitByTagLookup(kNigoriTag)) {
1804 NOTREACHED(); 1809 NOTREACHED();
1805 return; 1810 return;
1806 } 1811 }
1807 1812
1808 nigori.CopyFrom(node.GetNigoriSpecifics()); 1813 nigori.CopyFrom(node.GetNigoriSpecifics());
1809 Cryptographer::UpdateResult result = cryptographer->Update(nigori); 1814 Cryptographer::UpdateResult result = cryptographer->Update(nigori);
1810 if (result == Cryptographer::NEEDS_PASSPHRASE) { 1815 if (result == Cryptographer::NEEDS_PASSPHRASE) {
1811 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 1816 ObserverList<SyncManager::Observer> temp_obs_list;
1812 OnPassphraseRequired(sync_api::REASON_DECRYPTION)); 1817 CopyObservers(&temp_obs_list);
1818 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1819 OnPassphraseRequired(sync_api::REASON_DECRYPTION));
1813 } 1820 }
1814 1821
1815 // Refresh list of encrypted datatypes. 1822 // Refresh list of encrypted datatypes.
1816 encrypted_types = GetEncryptedTypes(&trans); 1823 encrypted_types = GetEncryptedTypes(&trans);
1817 } 1824 }
1818 1825
1819 1826
1820 1827
1821 // Ensure any datatypes that need encryption are encrypted. 1828 // Ensure any datatypes that need encryption are encrypted.
1822 EncryptDataTypes(encrypted_types); 1829 EncryptDataTypes(encrypted_types);
1823 } 1830 }
1824 1831
1825 void SyncManager::SyncInternal::StartSyncingNormally() { 1832 void SyncManager::SyncInternal::StartSyncingNormally() {
1826 // Start the syncer thread. This won't actually 1833 // Start the syncer thread. This won't actually
1827 // result in any syncing until at least the 1834 // result in any syncing until at least the
1828 // DirectoryManager broadcasts the OPENED event, 1835 // DirectoryManager broadcasts the OPENED event,
1829 // and a valid server connection is detected. 1836 // and a valid server connection is detected.
1830 if (syncer_thread()) // NULL during certain unittests. 1837 if (syncer_thread()) // NULL during certain unittests.
1831 syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); 1838 syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL);
1832 } 1839 }
1833 1840
1834 void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { 1841 void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
1835 // There is only one real time we need this mutex. If we get an auth 1842 // There is only one real time we need this mutex. If we get an auth
1836 // success, and before the initial sync ends we get an auth failure. In this 1843 // success, and before the initial sync ends we get an auth failure. In this
1837 // case we'll be listening to both the AuthWatcher and Syncer, and it's a race 1844 // case we'll be listening to both the AuthWatcher and Syncer, and it's a race
1838 // between their respective threads to call MarkAndNotify. We need to make 1845 // between their respective threads to call MarkAndNotify. We need to make
1839 // sure the observer is notified once and only once. 1846 // sure the observer is notified once and only once.
1840 { 1847 {
1841 base::AutoLock lock(initialized_mutex_); 1848 base::AutoLock lock(initialized_mutex_);
1842 if (initialized_) 1849 if (initialized_)
1843 return; 1850 return;
1844 initialized_ = true; 1851 initialized_ = true;
1845 } 1852 }
1846 1853
1847 // Notify that initialization is complete. 1854 // Notify that initialization is complete.
1848 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 1855 ObserverList<SyncManager::Observer> temp_obs_list;
1856 CopyObservers(&temp_obs_list);
1857 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1849 OnInitializationComplete()); 1858 OnInitializationComplete());
1850 } 1859 }
1851 1860
1852 void SyncManager::SyncInternal::SendNotification() { 1861 void SyncManager::SyncInternal::SendNotification() {
1853 DCHECK_EQ(MessageLoop::current(), core_message_loop_); 1862 DCHECK_EQ(MessageLoop::current(), core_message_loop_);
1854 if (!sync_notifier_) { 1863 if (!sync_notifier_) {
1855 VLOG(1) << "Not sending notification: sync_notifier_ is NULL"; 1864 VLOG(1) << "Not sending notification: sync_notifier_ is NULL";
1856 return; 1865 return;
1857 } 1866 }
1858 sync_notifier_->SendNotification(); 1867 sync_notifier_->SendNotification();
1859 } 1868 }
1860 1869
1861 bool SyncManager::SyncInternal::OpenDirectory() { 1870 bool SyncManager::SyncInternal::OpenDirectory() {
1862 DCHECK(!initialized()) << "Should only happen once"; 1871 DCHECK(!initialized()) << "Should only happen once";
1863 1872
1864 bool share_opened = dir_manager()->Open(username_for_share()); 1873 bool share_opened = dir_manager()->Open(username_for_share());
1865 DCHECK(share_opened); 1874 DCHECK(share_opened);
1866 if (!share_opened) { 1875 if (!share_opened) {
1867 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 1876 ObserverList<SyncManager::Observer> temp_obs_list;
1877 CopyObservers(&temp_obs_list);
1878 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1868 OnStopSyncingPermanently()); 1879 OnStopSyncingPermanently());
1869 1880
1870 LOG(ERROR) << "Could not open share for:" << username_for_share(); 1881 LOG(ERROR) << "Could not open share for:" << username_for_share();
1871 return false; 1882 return false;
1872 } 1883 }
1873 1884
1874 // Database has to be initialized for the guid to be available. 1885 // Database has to be initialized for the guid to be available.
1875 syncable::ScopedDirLookup lookup(dir_manager(), username_for_share()); 1886 syncable::ScopedDirLookup lookup(dir_manager(), username_for_share());
1876 if (!lookup.good()) { 1887 if (!lookup.good()) {
1877 NOTREACHED(); 1888 NOTREACHED();
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
1942 registrar_->GetModelSafeRoutingInfo(&routes); 1953 registrar_->GetModelSafeRoutingInfo(&routes);
1943 syncable::ModelTypeSet enabled_types; 1954 syncable::ModelTypeSet enabled_types;
1944 for (ModelSafeRoutingInfo::const_iterator it = routes.begin(); 1955 for (ModelSafeRoutingInfo::const_iterator it = routes.begin();
1945 it != routes.end(); ++it) { 1956 it != routes.end(); ++it) {
1946 enabled_types.insert(it->first); 1957 enabled_types.insert(it->first);
1947 } 1958 }
1948 sync_notifier_->UpdateEnabledTypes(enabled_types); 1959 sync_notifier_->UpdateEnabledTypes(enabled_types);
1949 } 1960 }
1950 1961
1951 void SyncManager::SyncInternal::RaiseAuthNeededEvent() { 1962 void SyncManager::SyncInternal::RaiseAuthNeededEvent() {
1952 FOR_EACH_OBSERVER( 1963 ObserverList<SyncManager::Observer> temp_obs_list;
1953 SyncManager::Observer, observers_, 1964 CopyObservers(&temp_obs_list);
1965 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1954 OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS))); 1966 OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS)));
1955 } 1967 }
1956 1968
1957 void SyncManager::SyncInternal::SetUsingExplicitPassphrasePrefForMigration( 1969 void SyncManager::SyncInternal::SetUsingExplicitPassphrasePrefForMigration(
1958 WriteTransaction* const trans) { 1970 WriteTransaction* const trans) {
1959 WriteNode node(trans); 1971 WriteNode node(trans);
1960 if (!node.InitByTagLookup(kNigoriTag)) { 1972 if (!node.InitByTagLookup(kNigoriTag)) {
1961 // TODO(albertb): Plumb an UnrecoverableError all the way back to the PSS. 1973 // TODO(albertb): Plumb an UnrecoverableError all the way back to the PSS.
1962 NOTREACHED(); 1974 NOTREACHED();
1963 return; 1975 return;
1964 } 1976 }
1965 sync_pb::NigoriSpecifics specifics(node.GetNigoriSpecifics()); 1977 sync_pb::NigoriSpecifics specifics(node.GetNigoriSpecifics());
1966 specifics.set_using_explicit_passphrase(true); 1978 specifics.set_using_explicit_passphrase(true);
1967 node.SetNigoriSpecifics(specifics); 1979 node.SetNigoriSpecifics(specifics);
1968 } 1980 }
1969 1981
1970 void SyncManager::SyncInternal::SetPassphrase( 1982 void SyncManager::SyncInternal::SetPassphrase(
1971 const std::string& passphrase, bool is_explicit) { 1983 const std::string& passphrase, bool is_explicit) {
1972 // We do not accept empty passphrases. 1984 // We do not accept empty passphrases.
1973 if (passphrase.empty()) { 1985 if (passphrase.empty()) {
1974 VLOG(1) << "Rejecting empty passphrase."; 1986 VLOG(1) << "Rejecting empty passphrase.";
1975 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 1987 ObserverList<SyncManager::Observer> temp_obs_list;
1988 CopyObservers(&temp_obs_list);
1989 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1976 OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED)); 1990 OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED));
1977 return; 1991 return;
1978 } 1992 }
1979 1993
1980 // All accesses to the cryptographer are protected by a transaction. 1994 // All accesses to the cryptographer are protected by a transaction.
1981 WriteTransaction trans(GetUserShare()); 1995 WriteTransaction trans(GetUserShare());
1982 Cryptographer* cryptographer = trans.GetCryptographer(); 1996 Cryptographer* cryptographer = trans.GetCryptographer();
1983 KeyParams params = {"localhost", "dummy", passphrase}; 1997 KeyParams params = {"localhost", "dummy", passphrase};
1984 1998
1985 if (cryptographer->has_pending_keys()) { 1999 if (cryptographer->has_pending_keys()) {
1986 if (!cryptographer->DecryptPendingKeys(params)) { 2000 if (!cryptographer->DecryptPendingKeys(params)) {
1987 VLOG(1) << "Passphrase failed to decrypt pending keys."; 2001 VLOG(1) << "Passphrase failed to decrypt pending keys.";
1988 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2002 ObserverList<SyncManager::Observer> temp_obs_list;
2003 CopyObservers(&temp_obs_list);
2004 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
1989 OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED)); 2005 OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED));
1990 return; 2006 return;
1991 } 2007 }
1992 2008
1993 // TODO(tim): If this is the first time the user has entered a passphrase 2009 // TODO(tim): If this is the first time the user has entered a passphrase
1994 // since the protocol changed to store passphrase preferences in the cloud, 2010 // since the protocol changed to store passphrase preferences in the cloud,
1995 // make sure we update this preference. See bug 62103. 2011 // make sure we update this preference. See bug 62103.
1996 // TODO(jhawkins): Verify that this logic may be removed now that the 2012 // TODO(jhawkins): Verify that this logic may be removed now that the
1997 // migration is no longer supported. 2013 // migration is no longer supported.
1998 if (is_explicit) 2014 if (is_explicit)
(...skipping 25 matching lines...) Expand all
2024 sync_pb::NigoriSpecifics specifics(node.GetNigoriSpecifics()); 2040 sync_pb::NigoriSpecifics specifics(node.GetNigoriSpecifics());
2025 specifics.clear_encrypted(); 2041 specifics.clear_encrypted();
2026 cryptographer->GetKeys(specifics.mutable_encrypted()); 2042 cryptographer->GetKeys(specifics.mutable_encrypted());
2027 specifics.set_using_explicit_passphrase(is_explicit); 2043 specifics.set_using_explicit_passphrase(is_explicit);
2028 node.SetNigoriSpecifics(specifics); 2044 node.SetNigoriSpecifics(specifics);
2029 ReEncryptEverything(&trans); 2045 ReEncryptEverything(&trans);
2030 } 2046 }
2031 2047
2032 std::string bootstrap_token; 2048 std::string bootstrap_token;
2033 cryptographer->GetBootstrapToken(&bootstrap_token); 2049 cryptographer->GetBootstrapToken(&bootstrap_token);
2034 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2050 ObserverList<SyncManager::Observer> temp_obs_list;
2051 CopyObservers(&temp_obs_list);
2052 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2035 OnPassphraseAccepted(bootstrap_token)); 2053 OnPassphraseAccepted(bootstrap_token));
2036 } 2054 }
2037 2055
2038 bool SyncManager::SyncInternal::IsUsingExplicitPassphrase() { 2056 bool SyncManager::SyncInternal::IsUsingExplicitPassphrase() {
2039 ReadTransaction trans(&share_); 2057 ReadTransaction trans(&share_);
2040 ReadNode node(&trans); 2058 ReadNode node(&trans);
2041 if (!node.InitByTagLookup(kNigoriTag)) { 2059 if (!node.InitByTagLookup(kNigoriTag)) {
2042 // TODO(albertb): Plumb an UnrecoverableError all the way back to the PSS. 2060 // TODO(albertb): Plumb an UnrecoverableError all the way back to the PSS.
2043 NOTREACHED(); 2061 NOTREACHED();
2044 return false; 2062 return false;
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
2143 WriteNode child(trans); 2161 WriteNode child(trans);
2144 if (!child.InitByIdLookup(child_id)) { 2162 if (!child.InitByIdLookup(child_id)) {
2145 NOTREACHED(); 2163 NOTREACHED();
2146 return; 2164 return;
2147 } 2165 }
2148 child.SetPasswordSpecifics(child.GetPasswordSpecifics()); 2166 child.SetPasswordSpecifics(child.GetPasswordSpecifics());
2149 child_id = child.GetSuccessorId(); 2167 child_id = child.GetSuccessorId();
2150 } 2168 }
2151 } 2169 }
2152 2170
2153 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2171 ObserverList<SyncManager::Observer> temp_obs_list;
2172 CopyObservers(&temp_obs_list);
2173 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2154 OnEncryptionComplete(encrypted_types)); 2174 OnEncryptionComplete(encrypted_types));
2155 } 2175 }
2156 2176
2157 SyncManager::~SyncManager() { 2177 SyncManager::~SyncManager() {
2158 delete data_; 2178 delete data_;
2159 } 2179 }
2160 2180
2161 void SyncManager::AddObserver(Observer* observer) { 2181 void SyncManager::AddObserver(Observer* observer) {
2162 data_->AddObserver(observer); 2182 data_->AddObserver(observer);
2163 } 2183 }
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
2237 // TODO(akalin): CheckServerReachable() can block, which may cause 2257 // TODO(akalin): CheckServerReachable() can block, which may cause
2238 // jank if we try to shut down sync. Fix this. 2258 // jank if we try to shut down sync. Fix this.
2239 connection_manager()->CheckServerReachable(); 2259 connection_manager()->CheckServerReachable();
2240 } 2260 }
2241 2261
2242 void SyncManager::SyncInternal::OnServerConnectionEvent( 2262 void SyncManager::SyncInternal::OnServerConnectionEvent(
2243 const ServerConnectionEvent& event) { 2263 const ServerConnectionEvent& event) {
2244 allstatus_.HandleServerConnectionEvent(event); 2264 allstatus_.HandleServerConnectionEvent(event);
2245 if (event.connection_code == 2265 if (event.connection_code ==
2246 browser_sync::HttpResponse::SERVER_CONNECTION_OK) { 2266 browser_sync::HttpResponse::SERVER_CONNECTION_OK) {
2247 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2267 ObserverList<SyncManager::Observer> temp_obs_list;
2268 CopyObservers(&temp_obs_list);
2269 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2248 OnAuthError(AuthError::None())); 2270 OnAuthError(AuthError::None()));
2249 } 2271 }
2250 2272
2251 if (event.connection_code == browser_sync::HttpResponse::SYNC_AUTH_ERROR) { 2273 if (event.connection_code == browser_sync::HttpResponse::SYNC_AUTH_ERROR) {
2252 FOR_EACH_OBSERVER( 2274 ObserverList<SyncManager::Observer> temp_obs_list;
2253 SyncManager::Observer, observers_, 2275 CopyObservers(&temp_obs_list);
2276 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2254 OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS))); 2277 OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS)));
2255 } 2278 }
2256 } 2279 }
2257 2280
2258 void SyncManager::SyncInternal::HandleTransactionCompleteChangeEvent( 2281 void SyncManager::SyncInternal::HandleTransactionCompleteChangeEvent(
2259 const syncable::ModelTypeBitSet& models_with_changes) { 2282 const syncable::ModelTypeBitSet& models_with_changes) {
2260 // This notification happens immediately after the transaction mutex is 2283 // This notification happens immediately after the transaction mutex is
2261 // released. This allows work to be performed without blocking other threads 2284 // released. This allows work to be performed without blocking other threads
2262 // from acquiring a transaction. 2285 // from acquiring a transaction.
2263 if (observers_.size() <= 0) 2286 if (!HaveObservers())
2264 return; 2287 return;
2265 2288
2266 // Call commit. 2289 // Call commit.
2267 for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) { 2290 for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) {
2268 if (models_with_changes.test(i)) { 2291 if (models_with_changes.test(i)) {
2269 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2292 ObserverList<SyncManager::Observer> temp_obs_list;
2293 CopyObservers(&temp_obs_list);
2294 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2270 OnChangesComplete(syncable::ModelTypeFromInt(i))); 2295 OnChangesComplete(syncable::ModelTypeFromInt(i)));
2271 } 2296 }
2272 } 2297 }
2273 } 2298 }
2274 2299
2275 ModelTypeBitSet SyncManager::SyncInternal::HandleTransactionEndingChangeEvent( 2300 ModelTypeBitSet SyncManager::SyncInternal::HandleTransactionEndingChangeEvent(
2276 syncable::BaseTransaction* trans) { 2301 syncable::BaseTransaction* trans) {
2277 // This notification happens immediately before a syncable WriteTransaction 2302 // This notification happens immediately before a syncable WriteTransaction
2278 // falls out of scope. It happens while the channel mutex is still held, 2303 // falls out of scope. It happens while the channel mutex is still held,
2279 // and while the transaction mutex is held, so it cannot be re-entrant. 2304 // and while the transaction mutex is held, so it cannot be re-entrant.
2280 if (observers_.size() <= 0 || ChangeBuffersAreEmpty()) 2305 if (!HaveObservers() || ChangeBuffersAreEmpty())
2281 return ModelTypeBitSet(); 2306 return ModelTypeBitSet();
2282 2307
2283 // This will continue the WriteTransaction using a read only wrapper. 2308 // This will continue the WriteTransaction using a read only wrapper.
2284 // This is the last chance for read to occur in the WriteTransaction 2309 // This is the last chance for read to occur in the WriteTransaction
2285 // that's closing. This special ReadTransaction will not close the 2310 // that's closing. This special ReadTransaction will not close the
2286 // underlying transaction. 2311 // underlying transaction.
2287 ReadTransaction read_trans(GetUserShare(), trans); 2312 ReadTransaction read_trans(GetUserShare(), trans);
2288 2313
2289 syncable::ModelTypeBitSet models_with_changes; 2314 syncable::ModelTypeBitSet models_with_changes;
2290 for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) { 2315 for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) {
2291 if (change_buffers_[i].IsEmpty()) 2316 if (change_buffers_[i].IsEmpty())
2292 continue; 2317 continue;
2293 2318
2294 vector<ChangeRecord> ordered_changes; 2319 vector<ChangeRecord> ordered_changes;
2295 change_buffers_[i].GetAllChangesInTreeOrder(&read_trans, &ordered_changes); 2320 change_buffers_[i].GetAllChangesInTreeOrder(&read_trans, &ordered_changes);
2296 if (!ordered_changes.empty()) { 2321 if (!ordered_changes.empty()) {
2297 FOR_EACH_OBSERVER( 2322 ObserverList<SyncManager::Observer> temp_obs_list;
2298 SyncManager::Observer, observers_, 2323 CopyObservers(&temp_obs_list);
2324 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2299 OnChangesApplied(syncable::ModelTypeFromInt(i), &read_trans, 2325 OnChangesApplied(syncable::ModelTypeFromInt(i), &read_trans,
2300 &ordered_changes[0], ordered_changes.size())); 2326 &ordered_changes[0], ordered_changes.size()));
2301 models_with_changes.set(i, true); 2327 models_with_changes.set(i, true);
2302 } 2328 }
2303 change_buffers_[i].Clear(); 2329 change_buffers_[i].Clear();
2304 } 2330 }
2305 return models_with_changes; 2331 return models_with_changes;
2306 } 2332 }
2307 2333
2308 void SyncManager::SyncInternal::HandleCalculateChangesChangeEventFromSyncApi( 2334 void SyncManager::SyncInternal::HandleCalculateChangesChangeEventFromSyncApi(
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
2437 void SyncManager::SyncInternal::RequestNudgeWithDataTypes( 2463 void SyncManager::SyncInternal::RequestNudgeWithDataTypes(
2438 const TimeDelta& delay, 2464 const TimeDelta& delay,
2439 browser_sync::NudgeSource source, const ModelTypeBitSet& types, 2465 browser_sync::NudgeSource source, const ModelTypeBitSet& types,
2440 const tracked_objects::Location& nudge_location) { 2466 const tracked_objects::Location& nudge_location) {
2441 if (syncer_thread()) 2467 if (syncer_thread())
2442 syncer_thread()->ScheduleNudge(delay, source, types, nudge_location); 2468 syncer_thread()->ScheduleNudge(delay, source, types, nudge_location);
2443 } 2469 }
2444 2470
2445 void SyncManager::SyncInternal::OnSyncEngineEvent( 2471 void SyncManager::SyncInternal::OnSyncEngineEvent(
2446 const SyncEngineEvent& event) { 2472 const SyncEngineEvent& event) {
2447 if (observers_.size() <= 0) { 2473 if (!HaveObservers()) {
2448 VLOG(0) << "OnSyncEngineEvent returning because observers_.size() is zero"; 2474 VLOG(0) << "OnSyncEngineEvent returning because observers_.size() is zero";
2449 return; 2475 return;
2450 } 2476 }
2451 2477
2452 // Only send an event if this is due to a cycle ending and this cycle 2478 // Only send an event if this is due to a cycle ending and this cycle
2453 // concludes a canonical "sync" process; that is, based on what is known 2479 // concludes a canonical "sync" process; that is, based on what is known
2454 // locally we are "all happy" and up-to-date. There may be new changes on 2480 // locally we are "all happy" and up-to-date. There may be new changes on
2455 // the server, but we'll get them on a subsequent sync. 2481 // the server, but we'll get them on a subsequent sync.
2456 // 2482 //
2457 // Notifications are sent at the end of every sync cycle, regardless of 2483 // Notifications are sent at the end of every sync cycle, regardless of
(...skipping 27 matching lines...) Expand all
2485 if (!nigori.encrypted().blob().empty()) { 2511 if (!nigori.encrypted().blob().empty()) {
2486 DCHECK(!cryptographer->CanDecrypt(nigori.encrypted())); 2512 DCHECK(!cryptographer->CanDecrypt(nigori.encrypted()));
2487 cryptographer->SetPendingKeys(nigori.encrypted()); 2513 cryptographer->SetPendingKeys(nigori.encrypted());
2488 } 2514 }
2489 } 2515 }
2490 2516
2491 // If we've completed a sync cycle and the cryptographer isn't ready 2517 // If we've completed a sync cycle and the cryptographer isn't ready
2492 // yet, prompt the user for a passphrase. 2518 // yet, prompt the user for a passphrase.
2493 if (cryptographer->has_pending_keys()) { 2519 if (cryptographer->has_pending_keys()) {
2494 VLOG(1) << "OnPassPhraseRequired Sent"; 2520 VLOG(1) << "OnPassPhraseRequired Sent";
2495 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2521 ObserverList<SyncManager::Observer> temp_obs_list;
2522 CopyObservers(&temp_obs_list);
2523 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2496 OnPassphraseRequired(sync_api::REASON_DECRYPTION)); 2524 OnPassphraseRequired(sync_api::REASON_DECRYPTION));
2497 } else if (!cryptographer->is_ready()) { 2525 } else if (!cryptographer->is_ready()) {
2498 VLOG(1) << "OnPassphraseRequired sent because cryptographer is not " 2526 VLOG(1) << "OnPassphraseRequired sent because cryptographer is not "
2499 << "ready"; 2527 << "ready";
2500 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2528 ObserverList<SyncManager::Observer> temp_obs_list;
2529 CopyObservers(&temp_obs_list);
2530 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2501 OnPassphraseRequired(sync_api::REASON_ENCRYPTION)); 2531 OnPassphraseRequired(sync_api::REASON_ENCRYPTION));
2502 } 2532 }
2503 // If everything is in order(we have the passphrase) then there is no 2533 // If everything is in order(we have the passphrase) then there is no
2504 // need to inform the listeners. They will just wait for sync 2534 // need to inform the listeners. They will just wait for sync
2505 // completion event and if no errors have been raised it means 2535 // completion event and if no errors have been raised it means
2506 // encryption was succesful. 2536 // encryption was succesful.
2507 } 2537 }
2508 } 2538 }
2509 2539
2510 if (!initialized()) { 2540 if (!initialized()) {
2511 VLOG(0) << "OnSyncCycleCompleted not sent because sync api is not " 2541 VLOG(0) << "OnSyncCycleCompleted not sent because sync api is not "
2512 << "initialized"; 2542 << "initialized";
2513 return; 2543 return;
2514 } 2544 }
2515 2545
2516 if (!event.snapshot->has_more_to_sync) { 2546 if (!event.snapshot->has_more_to_sync) {
2517 VLOG(1) << "OnSyncCycleCompleted sent"; 2547 VLOG(1) << "OnSyncCycleCompleted sent";
2518 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2548 ObserverList<SyncManager::Observer> temp_obs_list;
2549 CopyObservers(&temp_obs_list);
2550 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2519 OnSyncCycleCompleted(event.snapshot)); 2551 OnSyncCycleCompleted(event.snapshot));
2520 } 2552 }
2521 2553
2522 // This is here for tests, which are still using p2p notifications. 2554 // This is here for tests, which are still using p2p notifications.
2523 // SendNotification does not do anything if we are using server based 2555 // SendNotification does not do anything if we are using server based
2524 // notifications. 2556 // notifications.
2525 // TODO(chron): Consider changing this back to track has_more_to_sync 2557 // TODO(chron): Consider changing this back to track has_more_to_sync
2526 // only notify peers if a successful commit has occurred. 2558 // only notify peers if a successful commit has occurred.
2527 bool is_notifiable_commit = 2559 bool is_notifiable_commit =
2528 (event.snapshot->syncer_status.num_successful_commits > 0); 2560 (event.snapshot->syncer_status.num_successful_commits > 0);
2529 if (is_notifiable_commit) { 2561 if (is_notifiable_commit) {
2530 allstatus_.IncrementNotifiableCommits(); 2562 allstatus_.IncrementNotifiableCommits();
2531 core_message_loop_->PostTask( 2563 core_message_loop_->PostTask(
2532 FROM_HERE, 2564 FROM_HERE,
2533 NewRunnableMethod( 2565 NewRunnableMethod(
2534 this, 2566 this,
2535 &SyncManager::SyncInternal::SendNotification)); 2567 &SyncManager::SyncInternal::SendNotification));
2536 } 2568 }
2537 } 2569 }
2538 2570
2539 if (event.what_happened == SyncEngineEvent::STOP_SYNCING_PERMANENTLY) { 2571 if (event.what_happened == SyncEngineEvent::STOP_SYNCING_PERMANENTLY) {
2540 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2572 ObserverList<SyncManager::Observer> temp_obs_list;
2573 CopyObservers(&temp_obs_list);
2574 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2541 OnStopSyncingPermanently()); 2575 OnStopSyncingPermanently());
2542 return; 2576 return;
2543 } 2577 }
2544 2578
2545 if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_SUCCEEDED) { 2579 if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_SUCCEEDED) {
2546 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2580 ObserverList<SyncManager::Observer> temp_obs_list;
2581 CopyObservers(&temp_obs_list);
2582 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2547 OnClearServerDataSucceeded()); 2583 OnClearServerDataSucceeded());
2548 return; 2584 return;
2549 } 2585 }
2550 2586
2551 if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_FAILED) { 2587 if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_FAILED) {
2552 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2588 ObserverList<SyncManager::Observer> temp_obs_list;
2589 CopyObservers(&temp_obs_list);
2590 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2553 OnClearServerDataFailed()); 2591 OnClearServerDataFailed());
2554 return; 2592 return;
2555 } 2593 }
2556 2594
2557 if (event.what_happened == SyncEngineEvent::UPDATED_TOKEN) { 2595 if (event.what_happened == SyncEngineEvent::UPDATED_TOKEN) {
2558 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 2596 ObserverList<SyncManager::Observer> temp_obs_list;
2597 CopyObservers(&temp_obs_list);
2598 FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
2559 OnUpdatedToken(event.updated_token)); 2599 OnUpdatedToken(event.updated_token));
2560 return; 2600 return;
2561 } 2601 }
2562 } 2602 }
2563 2603
2564 void SyncManager::SyncInternal::SetParentJsEventRouter( 2604 void SyncManager::SyncInternal::SetParentJsEventRouter(
2565 browser_sync::JsEventRouter* router) { 2605 browser_sync::JsEventRouter* router) {
2566 DCHECK(router); 2606 DCHECK(router);
2567 parent_router_ = router; 2607 parent_router_ = router;
2568 2608
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after
2853 } 2893 }
2854 if (VLOG_IS_ON(1)) { 2894 if (VLOG_IS_ON(1)) {
2855 std::string encoded_state; 2895 std::string encoded_state;
2856 base::Base64Encode(state, &encoded_state); 2896 base::Base64Encode(state, &encoded_state);
2857 VLOG(1) << "Writing notification state: " << encoded_state; 2897 VLOG(1) << "Writing notification state: " << encoded_state;
2858 } 2898 }
2859 lookup->SetNotificationState(state); 2899 lookup->SetNotificationState(state);
2860 lookup->SaveChanges(); 2900 lookup->SaveChanges();
2861 } 2901 }
2862 2902
2903
2904 // Note: it is possible that an observer will remove itself after we have made
2905 // a copy, but before the copy is consumed. This could theoretically result
2906 // in accessing a garbage pointer, but can only occur when an about:sync window
2907 // is closed in the middle of a notification.
2908 // See crbug.com/85481.
2909 void SyncManager::SyncInternal::CopyObservers(
2910 ObserverList<SyncManager::Observer>* observers_copy) {
2911 DCHECK_EQ(0U, observers_copy->size());
2912 base::AutoLock lock(observers_lock_);
2913 if (observers_.size() == 0)
2914 return;
2915 ObserverListBase<SyncManager::Observer>::Iterator it(observers_);
2916 SyncManager::Observer* obs;
2917 while ((obs = it.GetNext()) != NULL)
2918 observers_copy->AddObserver(obs);
2919 }
2920
2921 bool SyncManager::SyncInternal::HaveObservers() const {
2922 base::AutoLock lock(observers_lock_);
2923 return observers_.size() > 0;
2924 }
2925
2863 void SyncManager::SyncInternal::AddObserver( 2926 void SyncManager::SyncInternal::AddObserver(
2864 SyncManager::Observer* observer) { 2927 SyncManager::Observer* observer) {
2928 base::AutoLock lock(observers_lock_);
2865 observers_.AddObserver(observer); 2929 observers_.AddObserver(observer);
2866 } 2930 }
2867 2931
2868 void SyncManager::SyncInternal::RemoveObserver( 2932 void SyncManager::SyncInternal::RemoveObserver(
2869 SyncManager::Observer* observer) { 2933 SyncManager::Observer* observer) {
2934 base::AutoLock lock(observers_lock_);
2870 observers_.RemoveObserver(observer); 2935 observers_.RemoveObserver(observer);
2871 } 2936 }
2872 2937
2873 SyncManager::Status::Summary SyncManager::GetStatusSummary() const { 2938 SyncManager::Status::Summary SyncManager::GetStatusSummary() const {
2874 return data_->GetStatus().summary; 2939 return data_->GetStatus().summary;
2875 } 2940 }
2876 2941
2877 SyncManager::Status SyncManager::GetDetailedStatus() const { 2942 SyncManager::Status SyncManager::GetDetailedStatus() const {
2878 return data_->GetStatus(); 2943 return data_->GetStatus();
2879 } 2944 }
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
2950 void SyncManager::TriggerOnIncomingNotificationForTest( 3015 void SyncManager::TriggerOnIncomingNotificationForTest(
2951 const syncable::ModelTypeBitSet& model_types) { 3016 const syncable::ModelTypeBitSet& model_types) {
2952 syncable::ModelTypePayloadMap model_types_with_payloads = 3017 syncable::ModelTypePayloadMap model_types_with_payloads =
2953 syncable::ModelTypePayloadMapFromBitSet(model_types, 3018 syncable::ModelTypePayloadMapFromBitSet(model_types,
2954 std::string()); 3019 std::string());
2955 3020
2956 data_->OnIncomingNotification(model_types_with_payloads); 3021 data_->OnIncomingNotification(model_types_with_payloads);
2957 } 3022 }
2958 3023
2959 } // namespace sync_api 3024 } // namespace sync_api
OLDNEW
« no previous file with comments | « no previous file | tools/valgrind/tsan/suppressions.txt » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698