Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(757)

Side by Side Diff: media/base/callback_util.cc

Issue 10830146: Replace RunInSeries() and RunInParallel() with CallbackSeries helper class. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/base/callback_util.h" 5 #include "media/base/callback_util.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/synchronization/lock.h" 8 #include "base/callback_helpers.h"
9 #include "base/memory/ref_counted.h"
10 #include "base/message_loop.h" 9 #include "base/message_loop.h"
11 #include "base/message_loop_proxy.h" 10 #include "base/message_loop_proxy.h"
12 11
13 namespace media { 12 namespace media {
14 13
15 // Executes the given closure if and only if the closure returned by 14 void RunClosureFunc(const ClosureFunc& closure,
16 // GetClosure() has been executed exactly |count| times. 15 const PipelineStatusCB& status_cb) {
17 // 16 closure.Run(base::Bind(status_cb, PIPELINE_OK));
18 // |done_cb| will be executed on the same thread that created the CountingCB. 17 }
19 class CountingCB : public base::RefCountedThreadSafe<CountingCB> {
20 public:
21 CountingCB(int count, const base::Closure& done_cb)
22 : message_loop_(base::MessageLoopProxy::current()),
23 count_(count),
24 done_cb_(done_cb) {
25 }
26 18
27 // Returns a closure bound to this object. 19 // Runs |status_cb| with |last_status| on |message_loop|, trampolining if
28 base::Closure GetClosure() { 20 // necessary.
29 return base::Bind(&CountingCB::OnCallback, this); 21 static void RunOnMessageLoop(
30 } 22 const scoped_refptr<base::MessageLoopProxy>& message_loop,
31 23 const PipelineStatusCB& status_cb,
32 protected: 24 PipelineStatus last_status) {
33 friend class base::RefCountedThreadSafe<CountingCB>;
34 virtual ~CountingCB() {}
35
36 private:
37 void OnCallback() {
38 {
39 base::AutoLock l(lock_);
40 count_--;
41 DCHECK_GE(count_, 0) << "CountingCB executed too many times";
42 if (count_ != 0)
43 return;
44 }
45
46 if (!message_loop_->BelongsToCurrentThread()) {
47 message_loop_->PostTask(FROM_HERE, done_cb_);
48 return;
49 }
50
51 done_cb_.Run();
52 }
53
54 scoped_refptr<base::MessageLoopProxy> message_loop_;
55 base::Lock lock_;
56 int count_;
57 base::Closure done_cb_;
58
59 DISALLOW_COPY_AND_ASSIGN(CountingCB);
60 };
61
62 static void OnSeriesCallback(
63 scoped_refptr<base::MessageLoopProxy> message_loop,
64 scoped_ptr<std::queue<ClosureFunc> > closures,
65 const base::Closure& done_cb) {
66 if (!message_loop->BelongsToCurrentThread()) { 25 if (!message_loop->BelongsToCurrentThread()) {
67 message_loop->PostTask(FROM_HERE, base::Bind( 26 message_loop->PostTask(FROM_HERE, base::Bind(
68 &OnSeriesCallback, message_loop, base::Passed(&closures), done_cb)); 27 &RunOnMessageLoop, message_loop, status_cb, last_status));
28 return;
29 }
30 status_cb.Run(last_status);
31 }
32
33 CallbackSeries::CallbackSeries(
34 scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
35 const PipelineStatusCB& done_cb)
36 : weak_this_(this),
37 message_loop_(base::MessageLoopProxy::current()),
38 status_cbs_(status_cbs.Pass()),
39 done_cb_(done_cb) {
40 }
41
42 CallbackSeries::~CallbackSeries() {}
43
44 scoped_ptr<CallbackSeries> CallbackSeries::Run(
45 scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
46 const PipelineStatusCB& done_cb) {
47 scoped_ptr<CallbackSeries> callback_series(
48 new CallbackSeries(status_cbs.Pass(), done_cb));
49 callback_series->RunNextInSeries(PIPELINE_OK);
50 return callback_series.Pass();
51 }
52
53 void CallbackSeries::RunNextInSeries(PipelineStatus last_status) {
54 DCHECK(message_loop_->BelongsToCurrentThread());
55 DCHECK(!done_cb_.is_null());
56
57 if (status_cbs_->empty() || last_status != PIPELINE_OK) {
58 base::ResetAndReturn(&done_cb_).Run(last_status);
69 return; 59 return;
70 } 60 }
71 61
72 if (closures->empty()) { 62 PipelineStatusCBFunc status_cb = status_cbs_->front();
73 done_cb.Run(); 63 status_cbs_->pop();
74 return; 64 status_cb.Run(base::Bind(&RunOnMessageLoop, message_loop_, base::Bind(
75 } 65 &CallbackSeries::RunNextInSeries, weak_this_.GetWeakPtr())));
76
77 ClosureFunc cb = closures->front();
78 closures->pop();
79 cb.Run(base::Bind(
80 &OnSeriesCallback, message_loop, base::Passed(&closures), done_cb));
81 }
82
83 void RunInSeries(scoped_ptr<std::queue<ClosureFunc> > closures,
84 const base::Closure& done_cb) {
85 OnSeriesCallback(base::MessageLoopProxy::current(),
86 closures.Pass(), done_cb);
87 }
88
89 static void OnStatusCallback(
90 scoped_refptr<base::MessageLoopProxy> message_loop,
91 scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
92 const PipelineStatusCB& done_cb,
93 PipelineStatus last_status) {
94 if (!message_loop->BelongsToCurrentThread()) {
95 message_loop->PostTask(FROM_HERE, base::Bind(
96 &OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb,
97 last_status));
98 return;
99 }
100
101 if (status_cbs->empty() || last_status != PIPELINE_OK) {
102 done_cb.Run(last_status);
103 return;
104 }
105
106 PipelineStatusCBFunc status_cb = status_cbs->front();
107 status_cbs->pop();
108 status_cb.Run(base::Bind(
109 &OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb));
110 }
111
112 void RunInSeriesWithStatus(
113 scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
114 const PipelineStatusCB& done_cb) {
115 OnStatusCallback(base::MessageLoopProxy::current(),
116 status_cbs.Pass(), done_cb, PIPELINE_OK);
117 }
118
119 void RunInParallel(scoped_ptr<std::queue<ClosureFunc> > closures,
120 const base::Closure& done_cb) {
121 if (closures->empty()) {
122 done_cb.Run();
123 return;
124 }
125
126 scoped_refptr<CountingCB> counting_cb =
127 new CountingCB(closures->size(), done_cb);
128 while (!closures->empty()) {
129 closures->front().Run(counting_cb->GetClosure());
130 closures->pop();
131 }
132 } 66 }
133 67
134 } // namespace media 68 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698