OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "remoting/jingle_glue/jingle_thread.h" | |
6 | |
7 #include "base/basictypes.h" | |
8 #include "base/logging.h" | |
9 #include "base/message_loop_proxy.h" | |
10 #include "base/message_pump.h" | |
11 #include "base/time.h" | |
12 #include "third_party/libjingle/source/talk/base/ssladapter.h" | |
13 | |
14 namespace remoting { | |
15 | |
16 const uint32 kRunTasksMessageId = 1; | |
17 const uint32 kStopMessageId = 2; | |
18 | |
19 namespace { | |
20 | |
21 class JingleMessagePump : public base::MessagePump, | |
22 public talk_base::MessageHandler { | |
23 public: | |
24 JingleMessagePump(talk_base::Thread* thread) | |
25 : thread_(thread), delegate_(NULL), stopping_(false) { | |
26 } | |
27 | |
28 virtual void Run(Delegate* delegate) { | |
29 delegate_ = delegate; | |
30 | |
31 thread_->Thread::Run(); | |
32 // Call Restart() so that we can run again. | |
33 thread_->Restart(); | |
34 | |
35 delegate_ = NULL; | |
36 } | |
37 | |
38 virtual void Quit() { | |
39 if (!stopping_) { | |
40 stopping_ = true; | |
41 | |
42 // Shutdown gracefully: make sure that we excute all messages | |
43 // left in the queue before exiting. Thread::Quit() would not do | |
44 // that. | |
45 thread_->Post(this, kStopMessageId); | |
46 } | |
47 } | |
48 | |
49 virtual void ScheduleWork() { | |
50 thread_->Post(this, kRunTasksMessageId); | |
51 } | |
52 | |
53 virtual void ScheduleDelayedWork(const base::TimeTicks& time) { | |
54 delayed_work_time_ = time; | |
55 ScheduleNextDelayedTask(); | |
56 } | |
57 | |
58 void OnMessage(talk_base::Message* msg) { | |
59 if (msg->message_id == kRunTasksMessageId) { | |
60 DCHECK(delegate_); | |
61 | |
62 // Clear currently pending messages in case there were delayed tasks. | |
63 // Will schedule it again from ScheduleNextDelayedTask() if neccessary. | |
64 thread_->Clear(this, kRunTasksMessageId); | |
65 | |
66 // Process all pending tasks. | |
67 while (true) { | |
68 if (delegate_->DoWork()) | |
69 continue; | |
70 if (delegate_->DoDelayedWork(&delayed_work_time_)) | |
71 continue; | |
72 if (delegate_->DoIdleWork()) | |
73 continue; | |
74 break; | |
75 } | |
76 | |
77 ScheduleNextDelayedTask(); | |
78 } else if (msg->message_id == kStopMessageId) { | |
79 DCHECK(stopping_); | |
80 // Stop the thread only if there are no more non-delayed | |
81 // messages left in the queue, otherwise post another task to | |
82 // try again later. | |
83 int delay = thread_->GetDelay(); | |
84 if (delay > 0 || delay == talk_base::kForever) { | |
85 stopping_ = false; | |
86 thread_->Quit(); | |
87 } else { | |
88 thread_->Post(this, kStopMessageId); | |
89 } | |
90 } else { | |
91 NOTREACHED(); | |
92 } | |
93 } | |
94 | |
95 protected: | |
96 virtual ~JingleMessagePump() {} | |
97 | |
98 private: | |
99 void ScheduleNextDelayedTask() { | |
100 if (!delayed_work_time_.is_null()) { | |
101 base::TimeTicks now = base::TimeTicks::Now(); | |
102 int delay = static_cast<int>((delayed_work_time_ - now).InMilliseconds()); | |
103 if (delay > 0) { | |
104 thread_->PostDelayed(delay, this, kRunTasksMessageId); | |
105 } else { | |
106 thread_->Post(this, kRunTasksMessageId); | |
107 } | |
108 } | |
109 } | |
110 | |
111 talk_base::Thread* thread_; | |
112 Delegate* delegate_; | |
113 base::TimeTicks delayed_work_time_; | |
114 bool stopping_; | |
115 }; | |
116 | |
117 } // namespace | |
118 | |
119 JingleThreadMessageLoop::JingleThreadMessageLoop(talk_base::Thread* thread) | |
120 : MessageLoop(MessageLoop::TYPE_IO) { | |
121 pump_ = new JingleMessagePump(thread); | |
122 } | |
123 | |
124 JingleThreadMessageLoop::~JingleThreadMessageLoop() { | |
125 } | |
126 | |
127 TaskPump::TaskPump() { | |
128 } | |
129 | |
130 void TaskPump::WakeTasks() { | |
131 talk_base::Thread::Current()->Post(this); | |
132 } | |
133 | |
134 int64 TaskPump::CurrentTime() { | |
135 return static_cast<int64>(talk_base::Time()); | |
136 } | |
137 | |
138 void TaskPump::OnMessage(talk_base::Message* pmsg) { | |
139 RunTasks(); | |
140 } | |
141 | |
142 JingleThread::JingleThread() | |
143 : task_pump_(NULL), | |
144 started_event_(true, false), | |
145 stopped_event_(true, false), | |
146 message_loop_(NULL) { | |
147 } | |
148 | |
149 JingleThread::~JingleThread() { | |
150 // It is important to call Stop here. If we wait for the base class to | |
151 // call Stop in its d'tor, then JingleThread::Run() will access member | |
152 // variables that are already gone. See similar comments in | |
153 // base/threading/thread.h. | |
154 if (message_loop_) | |
155 Stop(); | |
156 } | |
157 | |
158 bool JingleThread::Start() { | |
159 if (!Thread::Start()) | |
160 return false; | |
161 started_event_.Wait(); | |
162 return true; | |
163 } | |
164 | |
165 void JingleThread::Run() { | |
166 JingleThreadMessageLoop message_loop(this); | |
167 message_loop_ = &message_loop; | |
168 message_loop_proxy_ = base::MessageLoopProxy::current(); | |
169 | |
170 TaskPump task_pump; | |
171 task_pump_ = &task_pump; | |
172 | |
173 // Signal after we've initialized |message_loop_| and |task_pump_|. | |
174 started_event_.Signal(); | |
175 | |
176 message_loop.Run(); | |
177 | |
178 stopped_event_.Signal(); | |
179 | |
180 task_pump_ = NULL; | |
181 message_loop_ = NULL; | |
182 } | |
183 | |
184 void JingleThread::Stop() { | |
185 message_loop_->PostTask(FROM_HERE, MessageLoop::QuitClosure()); | |
186 stopped_event_.Wait(); | |
187 | |
188 // This will wait until the thread is actually finished. | |
189 Thread::Stop(); | |
190 } | |
191 | |
192 MessageLoop* JingleThread::message_loop() { | |
193 return message_loop_; | |
194 } | |
195 | |
196 base::MessageLoopProxy* JingleThread::message_loop_proxy() { | |
197 return message_loop_proxy_; | |
198 } | |
199 | |
200 TaskPump* JingleThread::task_pump() { | |
201 return task_pump_; | |
202 } | |
203 | |
204 } // namespace remoting | |
OLD | NEW |