Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: jingle/glue/thread_wrapper.cc

Issue 10823224: Update JingleThreadWrapper to allow it to be created using task runner. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« jingle/glue/thread_wrapper.h ('K') | « jingle/glue/thread_wrapper.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "jingle/glue/thread_wrapper.h" 5 #include "jingle/glue/thread_wrapper.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/bind_helpers.h" 8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h" 9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h" 10 #include "base/threading/thread_local.h"
(...skipping 13 matching lines...) Expand all
24 talk_base::Message message; 24 talk_base::Message message;
25 base::WaitableEvent done_event; 25 base::WaitableEvent done_event;
26 }; 26 };
27 27
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > 28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; 29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30 30
31 // static 31 // static
32 void JingleThreadWrapper::EnsureForCurrentThread() { 32 void JingleThreadWrapper::EnsureForCurrentThread() {
33 if (JingleThreadWrapper::current() == NULL) { 33 if (JingleThreadWrapper::current() == NULL) {
34 g_jingle_thread_wrapper.Get().Set( 34 MessageLoop* message_loop = MessageLoop::current();
Wez 2012/08/09 23:30:25 This will only work if called from a MessageLoop;
Sergey Ulanov 2012/08/13 21:15:34 Yes. That's intentional. We need it to add destruc
Wez 2012/08/14 00:06:04 Would it be useful to have a LeakyEnsureForCurrent
Sergey Ulanov 2012/08/14 01:02:14 Don't think it's necessary. It's always possible t
35 new JingleThreadWrapper(MessageLoop::current())); 35 g_jingle_thread_wrapper.Get().Set(new JingleThreadWrapper(
36 message_loop->message_loop_proxy()));
37 message_loop->AddDestructionObserver(current());
36 } 38 }
37 39
38 DCHECK_EQ(talk_base::Thread::Current(), current()); 40 DCHECK_EQ(talk_base::Thread::Current(), current());
39 } 41 }
40 42
41 // static 43 // static
42 JingleThreadWrapper* JingleThreadWrapper::current() { 44 JingleThreadWrapper* JingleThreadWrapper::current() {
43 return g_jingle_thread_wrapper.Get().Get(); 45 return g_jingle_thread_wrapper.Get().Get();
44 } 46 }
45 47
46 JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) 48 JingleThreadWrapper::JingleThreadWrapper(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
47 : talk_base::Thread(new talk_base::NullSocketServer()), 50 : talk_base::Thread(new talk_base::NullSocketServer()),
48 message_loop_(message_loop), 51 task_runner_(task_runner),
49 send_allowed_(false), 52 send_allowed_(false),
50 last_task_id_(0), 53 last_task_id_(0),
51 pending_send_event_(true, false) { 54 pending_send_event_(true, false) {
52 DCHECK_EQ(message_loop_, MessageLoop::current()); 55 DCHECK(task_runner->BelongsToCurrentThread());
53 56 DCHECK(!talk_base::Thread::Current());
54 talk_base::ThreadManager::Instance()->UnwrapCurrentThread();
55 talk_base::ThreadManager::Instance()->SetCurrentThread(this);
56 talk_base::MessageQueueManager::Instance()->Add(this); 57 talk_base::MessageQueueManager::Instance()->Add(this);
57 message_loop_->AddDestructionObserver(this);
58
59 WrapCurrent(); 58 WrapCurrent();
60 } 59 }
61 60
62 JingleThreadWrapper::~JingleThreadWrapper() { 61 JingleThreadWrapper::~JingleThreadWrapper() {
63 Clear(NULL, talk_base::MQID_ANY, NULL); 62 Clear(NULL, talk_base::MQID_ANY, NULL);
64 } 63 }
65 64
66 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { 65 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
67 DCHECK_EQ(talk_base::Thread::Current(), current()); 66 DCHECK_EQ(talk_base::Thread::Current(), current());
68 UnwrapCurrent(); 67 UnwrapCurrent();
69 g_jingle_thread_wrapper.Get().Set(NULL); 68 g_jingle_thread_wrapper.Get().Set(NULL);
70 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL); 69 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
71 talk_base::MessageQueueManager::Instance()->Remove(this); 70 talk_base::MessageQueueManager::Instance()->Remove(this);
72 message_loop_->RemoveDestructionObserver(this);
73 talk_base::SocketServer* ss = socketserver(); 71 talk_base::SocketServer* ss = socketserver();
74 delete this; 72 delete this;
75 delete ss; 73 delete ss;
76 } 74 }
77 75
78 void JingleThreadWrapper::Post( 76 void JingleThreadWrapper::Post(
79 talk_base::MessageHandler* handler, uint32 message_id, 77 talk_base::MessageHandler* handler, uint32 message_id,
80 talk_base::MessageData* data, bool time_sensitive) { 78 talk_base::MessageData* data, bool time_sensitive) {
81 PostTaskInternal(0, handler, message_id, data); 79 PostTaskInternal(0, handler, message_id, data);
82 } 80 }
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
155 153
156 PendingSend pending_send(message); 154 PendingSend pending_send(message);
157 { 155 {
158 base::AutoLock auto_lock(lock_); 156 base::AutoLock auto_lock(lock_);
159 pending_send_messages_.push_back(&pending_send); 157 pending_send_messages_.push_back(&pending_send);
160 } 158 }
161 159
162 // Need to signal |pending_send_event_| here in case the thread is 160 // Need to signal |pending_send_event_| here in case the thread is
163 // sending message to another thread. 161 // sending message to another thread.
164 pending_send_event_.Signal(); 162 pending_send_event_.Signal();
165 message_loop_->PostTask(FROM_HERE, 163 task_runner_->PostTask(FROM_HERE,
166 base::Bind(&JingleThreadWrapper::ProcessPendingSends, 164 base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167 base::Unretained(this))); 165 base::Unretained(this)));
Wez 2012/08/09 23:30:25 Should this be a WeakPtr?
Sergey Ulanov 2012/08/13 21:15:34 Yes. Thanks for catching this. Fixed.
168 166
169 167
170 while (!pending_send.done_event.IsSignaled()) { 168 while (!pending_send.done_event.IsSignaled()) {
171 base::WaitableEvent* events[] = {&pending_send.done_event, 169 base::WaitableEvent* events[] = {&pending_send.done_event,
172 &current_thread->pending_send_event_}; 170 &current_thread->pending_send_event_};
173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); 171 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174 DCHECK(event == 0 || event == 1); 172 DCHECK(event == 0 || event == 1);
175 173
176 if (event == 1) 174 if (event == 1)
177 current_thread->ProcessPendingSends(); 175 current_thread->ProcessPendingSends();
(...skipping 29 matching lines...) Expand all
207 message.phandler = handler; 205 message.phandler = handler;
208 message.message_id = message_id; 206 message.message_id = message_id;
209 message.pdata = data; 207 message.pdata = data;
210 { 208 {
211 base::AutoLock auto_lock(lock_); 209 base::AutoLock auto_lock(lock_);
212 task_id = ++last_task_id_; 210 task_id = ++last_task_id_;
213 messages_.insert(std::pair<int, talk_base::Message>(task_id, message)); 211 messages_.insert(std::pair<int, talk_base::Message>(task_id, message));
214 } 212 }
215 213
216 if (delay_ms <= 0) { 214 if (delay_ms <= 0) {
217 message_loop_->PostTask(FROM_HERE, 215 task_runner_->PostTask(FROM_HERE,
218 base::Bind(&JingleThreadWrapper::RunTask, 216 base::Bind(&JingleThreadWrapper::RunTask,
219 base::Unretained(this), task_id)); 217 base::Unretained(this), task_id));
Wez 2012/08/09 23:30:25 WeakPtr?
Sergey Ulanov 2012/08/13 21:15:34 Done.
220 } else { 218 } else {
221 message_loop_->PostDelayedTask(FROM_HERE, 219 task_runner_->PostDelayedTask(FROM_HERE,
222 base::Bind(&JingleThreadWrapper::RunTask, 220 base::Bind(&JingleThreadWrapper::RunTask,
223 base::Unretained(this), task_id), 221 base::Unretained(this), task_id),
Wez 2012/08/09 23:30:25 WeakPtr?
Sergey Ulanov 2012/08/13 21:15:34 Done.
224 base::TimeDelta::FromMilliseconds(delay_ms)); 222 base::TimeDelta::FromMilliseconds(delay_ms));
225 } 223 }
226 } 224 }
227 225
228 void JingleThreadWrapper::RunTask(int task_id) { 226 void JingleThreadWrapper::RunTask(int task_id) {
229 bool have_message = false; 227 bool have_message = false;
230 talk_base::Message message; 228 talk_base::Message message;
231 { 229 {
232 base::AutoLock auto_lock(lock_); 230 base::AutoLock auto_lock(lock_);
233 MessagesQueue::iterator it = messages_.find(task_id); 231 MessagesQueue::iterator it = messages_.find(task_id);
234 if (it != messages_.end()) { 232 if (it != messages_.end()) {
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
293 291
294 void JingleThreadWrapper::Stop() { 292 void JingleThreadWrapper::Stop() {
295 NOTREACHED(); 293 NOTREACHED();
296 } 294 }
297 295
298 void JingleThreadWrapper::Run() { 296 void JingleThreadWrapper::Run() {
299 NOTREACHED(); 297 NOTREACHED();
300 } 298 }
301 299
302 } // namespace jingle_glue 300 } // namespace jingle_glue
OLDNEW
« jingle/glue/thread_wrapper.h ('K') | « jingle/glue/thread_wrapper.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698