Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: jingle/notifier/listener/push_client.cc

Issue 10398051: [Sync] Replace TalkMediator*/MediatorThread* with PushClient (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "jingle/notifier/listener/push_client.h"
6
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/observer_list_threadsafe.h"
13 #include "jingle/notifier/base/notifier_options_util.h"
14 #include "jingle/notifier/communicator/login.h"
15 #include "jingle/notifier/listener/push_notifications_listen_task.h"
16 #include "jingle/notifier/listener/push_notifications_send_update_task.h"
17 #include "jingle/notifier/listener/push_notifications_subscribe_task.h"
18 #include "talk/xmpp/xmppclientsettings.h"
19
20 namespace notifier {
21
22 PushClient::Observer::~Observer() {}
23
24 // All member functions except for the constructor, destructor, and
25 // {Add,Remove}Observer() must be called on the IO thread (as taken from
26 // |notifier_options|).
27 class PushClient::Core
rlarocque 2012/05/16 17:46:00 There's a lot of scary thread code here. Is this
akalin 2012/05/16 17:53:16 This was recycled from MediatorThreadImpl, but git
rlarocque 2012/05/16 19:07:50 Don't worry about it. I doubt the code review sit
28 : public base::RefCountedThreadSafe<PushClient::Core>,
29 public LoginDelegate,
30 public PushNotificationsListenTaskDelegate,
31 public PushNotificationsSubscribeTaskDelegate {
32 public:
33 // Called on the parent thread.
34 explicit Core(const NotifierOptions& notifier_options);
35
36 // Must be called before being destroyed.
37 void DestroyOnIOThread();
38
39 // Login::Delegate implementation.
40 virtual void OnConnect(
41 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) OVERRIDE;
42 virtual void OnDisconnect();
43
44 // PushNotificationsListenTaskDelegate implementation.
45 virtual void OnNotificationReceived(
46 const Notification& notification) OVERRIDE;
47
48 // PushNotificationsSubscribeTaskDelegate implementation.
49 virtual void OnSubscribed() OVERRIDE;
50 virtual void OnSubscriptionError() OVERRIDE;
51
52 // Called on the parent thread.
53 void AddObserver(Observer* observer);
54 void RemoveObserver(Observer* observer);
55
56 void UpdateSubscriptions(const SubscriptionList& subscriptions);
57 void UpdateCredentials(const std::string& email, const std::string& token);
58 void SendNotification(const Notification& data);
59
60 private:
61 friend class base::RefCountedThreadSafe<PushClient::Core>;
62
63 // Called on either the parent thread or the I/O thread.
64 virtual ~Core();
65
66 const NotifierOptions notifier_options_;
67 const scoped_refptr<base::MessageLoopProxy> parent_message_loop_proxy_;
68 const scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_;
69 const scoped_refptr<ObserverListThreadSafe<Observer> > observers_;
70
71 // XMPP connection settings.
72 SubscriptionList subscriptions_;
73 buzz::XmppClientSettings xmpp_settings_;
74
75 // Must be created/used/destroyed only on the IO thread.
76 scoped_ptr<notifier::Login> login_;
77
78 // The XMPP connection.
79 base::WeakPtr<buzz::XmppTaskParentInterface> base_task_;
80
81 std::vector<Notification> pending_notifications_to_send_;
82
83 DISALLOW_COPY_AND_ASSIGN(Core);
84 };
85
86 PushClient::Core::Core(const NotifierOptions& notifier_options)
87 : notifier_options_(notifier_options),
88 parent_message_loop_proxy_(base::MessageLoopProxy::current()),
89 io_message_loop_proxy_(
90 notifier_options_.request_context_getter->GetIOMessageLoopProxy()),
91 observers_(new ObserverListThreadSafe<Observer>()) {}
92
93 PushClient::Core::~Core() {
94 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread() ||
95 io_message_loop_proxy_->BelongsToCurrentThread());
96 DCHECK(!login_.get());
97 DCHECK(!base_task_.get());
98 }
99
100 void PushClient::Core::DestroyOnIOThread() {
101 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
102 login_.reset();
103 base_task_.reset();
104 }
105
106 void PushClient::Core::OnConnect(
107 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) {
108 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
109 base_task_ = base_task;
110
111 if (!base_task_.get()) {
112 NOTREACHED();
113 return;
114 }
115
116 // Listen for notifications.
117 {
118 // Owned by |base_task_|.
119 PushNotificationsListenTask* listener =
120 new PushNotificationsListenTask(base_task_, this);
121 listener->Start();
122 }
123
124 // Send subscriptions.
125 {
126 // Owned by |base_task_|.
127 PushNotificationsSubscribeTask* subscribe_task =
128 new PushNotificationsSubscribeTask(base_task_, subscriptions_, this);
129 subscribe_task->Start();
130 }
131
132 std::vector<Notification> notifications_to_send;
133 notifications_to_send.swap(pending_notifications_to_send_);
134 for (std::vector<Notification>::const_iterator it =
135 notifications_to_send.begin();
136 it != notifications_to_send.end(); ++it) {
137 DVLOG(1) << "Push: Sending pending notification " << it->ToString();
138 SendNotification(*it);
139 }
140 }
141
142 void PushClient::Core::OnDisconnect() {
143 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
144 base_task_.reset();
145 observers_->Notify(&Observer::OnNotificationStateChange, false);
146 }
147
148 void PushClient::Core::OnNotificationReceived(
149 const Notification& notification) {
150 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
151 observers_->Notify(&Observer::OnIncomingNotification, notification);
152 }
153
154 void PushClient::Core::OnSubscribed() {
155 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
156 observers_->Notify(&Observer::OnNotificationStateChange, true);
157 }
158
159 void PushClient::Core::OnSubscriptionError() {
160 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
161 observers_->Notify(&Observer::OnNotificationStateChange, false);
162 }
163
164 void PushClient::Core::AddObserver(Observer* observer) {
165 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
166 observers_->AddObserver(observer);
167 }
168
169 void PushClient::Core::RemoveObserver(Observer* observer) {
170 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
171 observers_->RemoveObserver(observer);
172 }
173
174 void PushClient::Core::UpdateSubscriptions(
175 const SubscriptionList& subscriptions) {
176 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
177 subscriptions_ = subscriptions;
178 }
179
180 void PushClient::Core::UpdateCredentials(
181 const std::string& email, const std::string& token) {
182 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
183 DVLOG(1) << "Push: Updating credentials for " << email;
184 xmpp_settings_ = MakeXmppClientSettings(notifier_options_, email, token);
185 if (login_.get()) {
186 login_->UpdateXmppSettings(xmpp_settings_);
187 } else {
188 DVLOG(1) << "Push: Starting XMPP connection";
189 base_task_.reset();
190 login_.reset(new notifier::Login(this,
191 xmpp_settings_,
192 notifier_options_.request_context_getter,
193 GetServerList(notifier_options_),
194 notifier_options_.try_ssltcp_first,
195 notifier_options_.auth_mechanism));
196 login_->StartConnection();
197 }
198 }
199
200 void PushClient::Core::SendNotification(const Notification& data) {
201 DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
202 if (!base_task_.get()) {
203 DVLOG(1) << "Push: Cannot send notification " << data.ToString()
204 << "; sending later";
205 pending_notifications_to_send_.push_back(data);
206 return;
207 }
208 // Owned by |base_task_|.
209 PushNotificationsSendUpdateTask* task =
210 new PushNotificationsSendUpdateTask(base_task_, data);
211 task->Start();
212 }
213
214 PushClient::PushClient(const NotifierOptions& notifier_options)
215 : core_(new Core(notifier_options)),
216 parent_message_loop_proxy_(base::MessageLoopProxy::current()),
217 io_message_loop_proxy_(
218 notifier_options.request_context_getter->GetIOMessageLoopProxy()) {
219 }
220
221 PushClient::~PushClient() {
222 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
rlarocque 2012/05/16 19:07:50 Can we DCHECK in this function that the core's obs
akalin 2012/05/16 22:47:42 Done. Not here, but up in ~Core()
rlarocque 2012/05/16 23:04:31 Actually, I was hoping it could be done here. I s
223 io_message_loop_proxy_->PostTask(
224 FROM_HERE,
225 base::Bind(&PushClient::Core::DestroyOnIOThread, core_.get()));
226 }
227
228 void PushClient::AddObserver(Observer* observer) {
229 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
230 core_->AddObserver(observer);
231 }
232
233 void PushClient::RemoveObserver(Observer* observer) {
234 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
235 core_->RemoveObserver(observer);
236 }
237
238 void PushClient::UpdateSubscriptions(const SubscriptionList& subscriptions) {
239 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
240 io_message_loop_proxy_->PostTask(
241 FROM_HERE,
242 base::Bind(&PushClient::Core::UpdateSubscriptions,
243 core_.get(), subscriptions));
244 }
245
246 void PushClient::UpdateCredentials(
247 const std::string& email, const std::string& token) {
248 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
249 io_message_loop_proxy_->PostTask(
250 FROM_HERE,
251 base::Bind(&PushClient::Core::UpdateCredentials,
252 core_.get(), email, token));
253 }
254
255 void PushClient::SendNotification(const Notification& data) {
256 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
257 io_message_loop_proxy_->PostTask(
258 FROM_HERE,
259 base::Bind(&PushClient::Core::SendNotification, core_.get(),
260 data));
261 }
262
263 void PushClient::SimulateOnNotificationReceivedForTest(
264 const Notification& notification) {
265 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
266 io_message_loop_proxy_->PostTask(
267 FROM_HERE,
268 base::Bind(&PushClient::Core::OnNotificationReceived,
269 core_.get(), notification));
270 }
271
272 void PushClient::SimulateConnectAndSubscribeForTest(
273 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) {
274 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
275 io_message_loop_proxy_->PostTask(
276 FROM_HERE,
277 base::Bind(&PushClient::Core::OnConnect, core_.get(), base_task));
278 io_message_loop_proxy_->PostTask(
279 FROM_HERE,
280 base::Bind(&PushClient::Core::OnSubscribed, core_.get()));
281 }
282
283 void PushClient::SimulateDisconnectForTest() {
284 DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
285 io_message_loop_proxy_->PostTask(
286 FROM_HERE,
287 base::Bind(&PushClient::Core::OnDisconnect, core_.get()));
288 }
289
290 void PushClient::SimulateSubscriptionErrorForTest() {
291 io_message_loop_proxy_->PostTask(
292 FROM_HERE,
293 base::Bind(&PushClient::Core::OnSubscriptionError, core_.get()));
294 }
295
296 } // namespace notifier
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698