Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(463)

Side by Side Diff: chrome/browser/sync/engine/syncapi.cc

Issue 265054: Sync: Convert AddressWatchThread from pthreads to chrome threads. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2006-2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncapi.h" 5 #include "chrome/browser/sync/engine/syncapi.h"
6 6
7 #include "build/build_config.h" 7 #include "build/build_config.h"
8 8
9 #if defined(OS_WIN) 9 #if defined(OS_WIN)
10 #include <windows.h> 10 #include <windows.h>
11 #include <iphlpapi.h> 11 #include <iphlpapi.h>
12 #endif 12 #endif
13 13
14 #include <iomanip> 14 #include <iomanip>
15 #include <list> 15 #include <list>
16 #include <string> 16 #include <string>
17 #include <vector> 17 #include <vector>
18 18
19 #include "base/basictypes.h" 19 #include "base/basictypes.h"
20 #include "base/command_line.h" 20 #include "base/command_line.h"
21 #include "base/platform_thread.h" 21 #include "base/platform_thread.h"
22 #include "base/scoped_ptr.h" 22 #include "base/scoped_ptr.h"
23 #include "base/string_util.h" 23 #include "base/string_util.h"
24 #include "base/task.h"
24 #include "chrome/browser/sync/engine/all_status.h" 25 #include "chrome/browser/sync/engine/all_status.h"
25 #include "chrome/browser/sync/engine/auth_watcher.h" 26 #include "chrome/browser/sync/engine/auth_watcher.h"
26 #include "chrome/browser/sync/engine/change_reorder_buffer.h" 27 #include "chrome/browser/sync/engine/change_reorder_buffer.h"
27 #include "chrome/browser/sync/engine/client_command_channel.h" 28 #include "chrome/browser/sync/engine/client_command_channel.h"
28 #include "chrome/browser/sync/engine/model_safe_worker.h" 29 #include "chrome/browser/sync/engine/model_safe_worker.h"
29 #include "chrome/browser/sync/engine/net/gaia_authenticator.h" 30 #include "chrome/browser/sync/engine/net/gaia_authenticator.h"
30 #include "chrome/browser/sync/engine/net/server_connection_manager.h" 31 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
31 #include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h" 32 #include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h"
32 #include "chrome/browser/sync/engine/syncer.h" 33 #include "chrome/browser/sync/engine/syncer.h"
33 #include "chrome/browser/sync/engine/syncer_thread.h" 34 #include "chrome/browser/sync/engine/syncer_thread.h"
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
65 using std::hex; 66 using std::hex;
66 using std::string; 67 using std::string;
67 using std::vector; 68 using std::vector;
68 using syncable::Directory; 69 using syncable::Directory;
69 using syncable::DirectoryManager; 70 using syncable::DirectoryManager;
70 71
71 static const int kServerReachablePollingIntervalMsec = 60000 * 60; 72 static const int kServerReachablePollingIntervalMsec = 60000 * 60;
72 static const int kThreadExitTimeoutMsec = 60000; 73 static const int kThreadExitTimeoutMsec = 60000;
73 static const int kSSLPort = 443; 74 static const int kSSLPort = 443;
74 75
75 struct ThreadParams { 76 struct AddressWatchTaskParams {
76 browser_sync::ServerConnectionManager* conn_mgr; 77 browser_sync::ServerConnectionManager* conn_mgr;
77 #if defined(OS_WIN) 78 #if defined(OS_WIN)
78 HANDLE exit_flag; 79 HANDLE exit_flag;
79 #endif 80 #endif
80 }; 81 };
81 82
82 // This thread calls CheckServerReachable() whenever a change occurs in the 83 // This thread calls CheckServerReachable() whenever a change occurs in the
83 // table that maps IP addresses to interfaces, for example when the user 84 // table that maps IP addresses to interfaces, for example when the user
84 // unplugs his network cable. 85 // unplugs his network cable.
85 void* AddressWatchThread(void* arg) { 86 class AddressWatchTask : public Task {
86 PlatformThread::SetName("SyncEngine_AddressWatcher"); 87 public:
87 LOG(INFO) << "starting the address watch thread"; 88 explicit AddressWatchTask(AddressWatchTaskParams* params)
89 : params_(params) {}
90 virtual ~AddressWatchTask() {}
91
92 virtual void Run() {
93 LOG(INFO) << "starting the address watch thread";
88 #if defined(OS_WIN) 94 #if defined(OS_WIN)
89 const ThreadParams* const params = reinterpret_cast<const ThreadParams*>(arg); 95 OVERLAPPED overlapped = {0};
90 OVERLAPPED overlapped = {0}; 96 overlapped.hEvent = CreateEvent(NULL, FALSE, TRUE, NULL);
91 overlapped.hEvent = CreateEvent(NULL, FALSE, TRUE, NULL); 97 HANDLE file;
92 HANDLE file; 98 DWORD rc = WAIT_OBJECT_0;
93 DWORD rc = WAIT_OBJECT_0; 99 while (true) {
94 while (true) { 100 // Only call NotifyAddrChange() after the IP address has changed or if
95 // Only call NotifyAddrChange() after the IP address has changed or if this 101 // this is the first time through the loop.
96 // is the first time through the loop. 102 if (WAIT_OBJECT_0 == rc) {
97 if (WAIT_OBJECT_0 == rc) { 103 ResetEvent(overlapped.hEvent);
98 ResetEvent(overlapped.hEvent); 104 DWORD notify_result = NotifyAddrChange(&file, &overlapped);
99 DWORD notify_result = NotifyAddrChange(&file, &overlapped); 105 if (ERROR_IO_PENDING != notify_result) {
100 if (ERROR_IO_PENDING != notify_result) { 106 LOG(ERROR) << "NotifyAddrChange() returned unexpected result "
101 LOG(ERROR) << "NotifyAddrChange() returned unexpected result " 107 << hex << notify_result;
102 << hex << notify_result; 108 break;
109 }
110 }
111 HANDLE events[] = { overlapped.hEvent, params_->exit_flag };
112 rc = WaitForMultipleObjects(ARRAYSIZE(events), events, FALSE,
113 kServerReachablePollingIntervalMsec);
114
115 // If the exit flag was signaled, the thread will exit.
116 if (WAIT_OBJECT_0 + 1 == rc)
103 break; 117 break;
104 } 118
119 params_->conn_mgr->CheckServerReachable();
105 } 120 }
106 HANDLE events[] = { overlapped.hEvent, params->exit_flag }; 121 CloseHandle(overlapped.hEvent);
107 rc = WaitForMultipleObjects(ARRAYSIZE(events), events, FALSE,
108 kServerReachablePollingIntervalMsec);
109
110 // If the exit flag was signaled, the thread will exit.
111 if (WAIT_OBJECT_0 + 1 == rc)
112 break;
113
114 params->conn_mgr->CheckServerReachable();
115 }
116 CloseHandle(overlapped.hEvent);
117 #else 122 #else
118 // TODO(zork): Add this functionality to Linux. 123 // TODO(zork): Add this functionality to Linux.
119 #endif 124 #endif
120 LOG(INFO) << "The address watch thread has stopped"; 125 LOG(INFO) << "The address watch thread has stopped";
121 return 0; 126 }
122 } 127
128 private:
129 AddressWatchTaskParams* const params_;
130 DISALLOW_COPY_AND_ASSIGN(AddressWatchTask);
131 };
123 132
124 namespace sync_api { 133 namespace sync_api {
125 class ModelSafeWorkerBridge; 134 class ModelSafeWorkerBridge;
126 135
127 static const PSTR_CHAR kBookmarkSyncUserSettingsDatabase[] = 136 static const PSTR_CHAR kBookmarkSyncUserSettingsDatabase[] =
128 PSTR("BookmarkSyncSettings.sqlite3"); 137 PSTR("BookmarkSyncSettings.sqlite3");
129 static const PSTR_CHAR kDefaultNameForNewNodes[] = PSTR(" "); 138 static const PSTR_CHAR kDefaultNameForNewNodes[] = PSTR(" ");
130 139
131 // The list of names which are reserved for use by the server. 140 // The list of names which are reserved for use by the server.
132 static const char16* kForbiddenServerNames[] = 141 static const char16* kForbiddenServerNames[] =
(...skipping 536 matching lines...) Expand 10 before | Expand all | Expand 10 after
669 private: 678 private:
670 const std::string gaia_source_; 679 const std::string gaia_source_;
671 scoped_ptr<HttpPostProviderFactory> post_factory_; 680 scoped_ptr<HttpPostProviderFactory> post_factory_;
672 DISALLOW_COPY_AND_ASSIGN(BridgedGaiaAuthenticator); 681 DISALLOW_COPY_AND_ASSIGN(BridgedGaiaAuthenticator);
673 }; 682 };
674 683
675 ////////////////////////////////////////////////////////////////////////// 684 //////////////////////////////////////////////////////////////////////////
676 // SyncManager's implementation: SyncManager::SyncInternal 685 // SyncManager's implementation: SyncManager::SyncInternal
677 class SyncManager::SyncInternal { 686 class SyncManager::SyncInternal {
678 public: 687 public:
679 typedef PThreadScopedLock<PThreadMutex> MutexLock;
680 explicit SyncInternal(SyncManager* sync_manager) 688 explicit SyncInternal(SyncManager* sync_manager)
681 : observer_(NULL), 689 : observer_(NULL),
682 command_channel_(0), 690 command_channel_(0),
683 auth_problem_(AUTH_PROBLEM_NONE), 691 auth_problem_(AUTH_PROBLEM_NONE),
684 sync_manager_(sync_manager), 692 sync_manager_(sync_manager),
685 notification_pending_(false), 693 notification_pending_(false),
694 address_watch_thread_("SyncEngine_AddressWatcher"),
686 initialized_(false) { 695 initialized_(false) {
687 } 696 }
688 697
689 ~SyncInternal() { } 698 ~SyncInternal() { }
690 699
691 bool Init(const PathString& database_location, 700 bool Init(const PathString& database_location,
692 const std::string& sync_server_and_path, 701 const std::string& sync_server_and_path,
693 int port, 702 int port,
694 const char* gaia_service_id, 703 const char* gaia_service_id,
695 const char* gaia_source, 704 const char* gaia_source,
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
775 784
776 // See SyncManager::SetupForTestMode for information. 785 // See SyncManager::SetupForTestMode for information.
777 void SetupForTestMode(const sync_char16* test_username); 786 void SetupForTestMode(const sync_char16* test_username);
778 787
779 // See SyncManager::Shutdown for information. 788 // See SyncManager::Shutdown for information.
780 void Shutdown(); 789 void Shutdown();
781 790
782 // Whether we're initialized to the point of being able to accept changes 791 // Whether we're initialized to the point of being able to accept changes
783 // (and hence allow transaction creation). See initialized_ for details. 792 // (and hence allow transaction creation). See initialized_ for details.
784 bool initialized() const { 793 bool initialized() const {
785 MutexLock lock(&initialized_mutex_); 794 AutoLock lock(initialized_mutex_);
786 return initialized_; 795 return initialized_;
787 } 796 }
788 private: 797 private:
789 // Try to authenticate using persisted credentials from a previous successful 798 // Try to authenticate using persisted credentials from a previous successful
790 // authentication. If no such credentials exist, calls OnAuthError on the 799 // authentication. If no such credentials exist, calls OnAuthError on the
791 // client to collect credentials. Otherwise, there exist local credentials 800 // client to collect credentials. Otherwise, there exist local credentials
792 // that were once used for a successful auth, so we'll try to re-use these. 801 // that were once used for a successful auth, so we'll try to re-use these.
793 // Failure of that attempt will be communicated as normal using OnAuthError. 802 // Failure of that attempt will be communicated as normal using OnAuthError.
794 // Since this entry point will bypass normal GAIA authentication and try to 803 // Since this entry point will bypass normal GAIA authentication and try to
795 // authenticate directly with the sync service using a cached token, 804 // authenticate directly with the sync service using a cached token,
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
900 909
901 // Our cache of a recent authentication problem. If no authentication problem 910 // Our cache of a recent authentication problem. If no authentication problem
902 // occurred, or if the last problem encountered has been cleared (by a 911 // occurred, or if the last problem encountered has been cleared (by a
903 // subsequent AuthWatcherEvent), this is set to AUTH_PROBLEM_NONE. 912 // subsequent AuthWatcherEvent), this is set to AUTH_PROBLEM_NONE.
904 AuthProblem auth_problem_; 913 AuthProblem auth_problem_;
905 914
906 // The sync dir_manager to which we belong. 915 // The sync dir_manager to which we belong.
907 SyncManager* const sync_manager_; 916 SyncManager* const sync_manager_;
908 917
909 // Parameters for our thread listening to network status changes. 918 // Parameters for our thread listening to network status changes.
910 ThreadParams address_watch_params_; 919 base::Thread address_watch_thread_;
911 thread_handle address_watch_thread_; 920 AddressWatchTaskParams address_watch_params_;
912 921
913 // True if the next SyncCycle should notify peers of an update. 922 // True if the next SyncCycle should notify peers of an update.
914 bool notification_pending_; 923 bool notification_pending_;
915 924
916 // Set to true once Init has been called, and we know of an authenticated 925 // Set to true once Init has been called, and we know of an authenticated
917 // valid) username either from a fresh authentication attempt (as in 926 // valid) username either from a fresh authentication attempt (as in
918 // first-use case) or from a previous attempt stored in our UserSettings 927 // first-use case) or from a previous attempt stored in our UserSettings
919 // (as in the steady-state), and the syncable::Directory has been opened, 928 // (as in the steady-state), and the syncable::Directory has been opened,
920 // meaning we are ready to accept changes. Protected by initialized_mutex_ 929 // meaning we are ready to accept changes. Protected by initialized_mutex_
921 // as it can get read/set by both the SyncerThread and the AuthWatcherThread. 930 // as it can get read/set by both the SyncerThread and the AuthWatcherThread.
922 bool initialized_; 931 bool initialized_;
923 mutable PThreadMutex initialized_mutex_; 932 mutable Lock initialized_mutex_;
924 }; 933 };
925 934
926 SyncManager::SyncManager() { 935 SyncManager::SyncManager() {
927 data_ = new SyncInternal(this); 936 data_ = new SyncInternal(this);
928 } 937 }
929 938
930 bool SyncManager::Init(const sync_char16* database_location, 939 bool SyncManager::Init(const sync_char16* database_location,
931 const char* sync_server_and_path, 940 const char* sync_server_and_path,
932 int sync_server_port, 941 int sync_server_port,
933 const char* gaia_service_id, 942 const char* gaia_service_id,
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
996 share_.dir_manager.reset(new DirectoryManager(database_location)); 1005 share_.dir_manager.reset(new DirectoryManager(database_location));
997 1006
998 string client_id = user_settings_->GetClientId(); 1007 string client_id = user_settings_->GetClientId();
999 connection_manager_.reset(new SyncAPIServerConnectionManager( 1008 connection_manager_.reset(new SyncAPIServerConnectionManager(
1000 sync_server_and_path, port, use_ssl, user_agent, client_id)); 1009 sync_server_and_path, port, use_ssl, user_agent, client_id));
1001 1010
1002 // TODO(timsteele): This is temporary windows crap needed to listen for 1011 // TODO(timsteele): This is temporary windows crap needed to listen for
1003 // network status changes. We should either pump this up to the embedder to 1012 // network status changes. We should either pump this up to the embedder to
1004 // do (and call us in CheckServerReachable, for ex), or at least make this 1013 // do (and call us in CheckServerReachable, for ex), or at least make this
1005 // platform independent in here. 1014 // platform independent in here.
1006 // TODO(ncarter): When this gets cleaned up, the implementation of
1007 // CreatePThread can also be removed.
1008 #if defined(OS_WIN) 1015 #if defined(OS_WIN)
1009 HANDLE exit_flag = CreateEvent(NULL, TRUE /*manual reset*/, FALSE, NULL); 1016 HANDLE exit_flag = CreateEvent(NULL, TRUE /*manual reset*/, FALSE, NULL);
1010 address_watch_params_.exit_flag = exit_flag; 1017 address_watch_params_.exit_flag = exit_flag;
1011 #endif 1018 #endif
1012 address_watch_params_.conn_mgr = connection_manager(); 1019 address_watch_params_.conn_mgr = connection_manager();
1013 address_watch_thread_ = CreatePThread(AddressWatchThread, 1020
1014 &address_watch_params_); 1021 bool address_watch_started = address_watch_thread_.Start();
1015 #if defined(OS_WIN) 1022 DCHECK(address_watch_started);
1016 DCHECK(NULL != address_watch_thread_); 1023 address_watch_thread_.message_loop()->PostTask(FROM_HERE,
1017 #endif 1024 new AddressWatchTask(&address_watch_params_));
1018 1025
1019 // Hand over the bridged POST factory to be owned by the connection 1026 // Hand over the bridged POST factory to be owned by the connection
1020 // dir_manager. 1027 // dir_manager.
1021 connection_manager()->SetHttpPostProviderFactory(post_factory); 1028 connection_manager()->SetHttpPostProviderFactory(post_factory);
1022 1029
1023 // Watch various objects for aggregated status. 1030 // Watch various objects for aggregated status.
1024 allstatus()->WatchConnectionManager(connection_manager()); 1031 allstatus()->WatchConnectionManager(connection_manager());
1025 1032
1026 std::string gaia_url = browser_sync::kGaiaUrl; 1033 std::string gaia_url = browser_sync::kGaiaUrl;
1027 const char* service_id = gaia_service_id ? 1034 const char* service_id = gaia_service_id ?
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1071 return true; 1078 return true;
1072 } 1079 }
1073 1080
1074 void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { 1081 void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
1075 // There is only one real time we need this mutex. If we get an auth 1082 // There is only one real time we need this mutex. If we get an auth
1076 // success, and before the initial sync ends we get an auth failure. In this 1083 // success, and before the initial sync ends we get an auth failure. In this
1077 // case we'll be listening to both the AuthWatcher and Syncer, and it's a race 1084 // case we'll be listening to both the AuthWatcher and Syncer, and it's a race
1078 // between their respective threads to call MarkAndNotify. We need to make 1085 // between their respective threads to call MarkAndNotify. We need to make
1079 // sure the observer is notified once and only once. 1086 // sure the observer is notified once and only once.
1080 { 1087 {
1081 MutexLock lock(&initialized_mutex_); 1088 AutoLock lock(initialized_mutex_);
1082 if (initialized_) 1089 if (initialized_)
1083 return; 1090 return;
1084 initialized_ = true; 1091 initialized_ = true;
1085 } 1092 }
1086 1093
1087 // Notify that initialization is complete. 1094 // Notify that initialization is complete.
1088 if (observer_) 1095 if (observer_)
1089 observer_->OnInitializationComplete(); 1096 observer_->OnInitializationComplete();
1090 } 1097 }
1091 1098
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
1199 1206
1200 // We don't want to process any more events. 1207 // We don't want to process any more events.
1201 dir_change_hookup_.reset(); 1208 dir_change_hookup_.reset();
1202 syncer_event_.reset(); 1209 syncer_event_.reset();
1203 authwatcher_hookup_.reset(); 1210 authwatcher_hookup_.reset();
1204 1211
1205 #if defined(OS_WIN) 1212 #if defined(OS_WIN)
1206 // Stop the address watch thread by signaling the exit flag. 1213 // Stop the address watch thread by signaling the exit flag.
1207 // TODO(timsteele): Same as todo in Init(). 1214 // TODO(timsteele): Same as todo in Init().
1208 SetEvent(address_watch_params_.exit_flag); 1215 SetEvent(address_watch_params_.exit_flag);
1209 const DWORD wait_result = WaitForSingleObject(address_watch_thread_, 1216 #endif
1210 kThreadExitTimeoutMsec); 1217
1211 LOG_IF(ERROR, WAIT_FAILED == wait_result) << "Waiting for addr change thread " 1218 address_watch_thread_.Stop();
1212 "to exit failed. GetLastError(): " << hex << GetLastError(); 1219
1213 LOG_IF(ERROR, WAIT_TIMEOUT == wait_result) << "Thread exit timeout expired"; 1220 #if defined(OS_WIN)
1214 CloseHandle(address_watch_params_.exit_flag); 1221 CloseHandle(address_watch_params_.exit_flag);
1215 #endif 1222 #endif
1216 } 1223 }
1217 1224
1218 // Listen to model changes, filter out ones initiated by the sync API, and 1225 // Listen to model changes, filter out ones initiated by the sync API, and
1219 // saves the rest (hopefully just backend Syncer changes resulting from 1226 // saves the rest (hopefully just backend Syncer changes resulting from
1220 // ApplyUpdates) to data_->changelist. 1227 // ApplyUpdates) to data_->changelist.
1221 void SyncManager::SyncInternal::HandleChangeEvent( 1228 void SyncManager::SyncInternal::HandleChangeEvent(
1222 const syncable::DirectoryChangeEvent& event) { 1229 const syncable::DirectoryChangeEvent& event) {
1223 if (event.todo == syncable::DirectoryChangeEvent::TRANSACTION_COMPLETE) { 1230 if (event.todo == syncable::DirectoryChangeEvent::TRANSACTION_COMPLETE) {
(...skipping 327 matching lines...) Expand 10 before | Expand all | Expand 10 after
1551 BaseTransaction::~BaseTransaction() { 1558 BaseTransaction::~BaseTransaction() {
1552 delete lookup_; 1559 delete lookup_;
1553 } 1560 }
1554 1561
1555 UserShare* SyncManager::GetUserShare() const { 1562 UserShare* SyncManager::GetUserShare() const {
1556 DCHECK(data_->initialized()) << "GetUserShare requires initialization!"; 1563 DCHECK(data_->initialized()) << "GetUserShare requires initialization!";
1557 return data_->GetUserShare(); 1564 return data_->GetUserShare();
1558 } 1565 }
1559 1566
1560 } // namespace sync_api 1567 } // namespace sync_api
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698