Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(394)

Side by Side Diff: services/media/framework_mojo/mojo_pull_mode_producer.cc

Issue 1692443002: Motown: Framework parts for mojo transport (producer/consumer/mediapipe) and control (audiotrack). (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Sync and fix: InterfaceHandle<X> vs XPtr, changes to MediaPipe Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/logging.h"
6 #include "services/media/framework_mojo/mojo_pull_mode_producer.h"
7
8 namespace mojo {
9 namespace media {
10
11 MojoPullModeProducer::MojoPullModeProducer() :
12 state_(MediaState::UNPREPARED),
13 demand_(Demand::kNegative),
14 presentation_time_(0),
15 cached_packet_(nullptr) {}
16
17 MojoPullModeProducer::~MojoPullModeProducer() {
18 base::AutoLock lock(lock_);
19 }
20
21 void MojoPullModeProducer::AddBinding(
22 InterfaceRequest<MediaPullModeProducer> producer) {
23 bindings_.AddBinding(this, producer.Pass());
24 }
25
26 void MojoPullModeProducer::GetBuffer(const GetBufferCallback& callback) {
27 if (!mojo_allocator_.initialized()) {
28 mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up!
29 }
30
31 {
32 base::AutoLock lock(lock_);
33 if (state_ == MediaState::UNPREPARED) {
34 state_ = MediaState::PAUSED;
35 }
36 }
37
38 callback.Run(mojo_allocator_.GetDuplicateHandle());
39
40 DCHECK(!cached_packet_);
41 DCHECK(demand_callback_);
42 demand_callback_(Demand::kPositive);
43 }
44
45 void MojoPullModeProducer::PullPacket(
46 MediaPacketPtr to_release,
47 const PullPacketCallback& callback) {
48 if (to_release) {
49 // The client has piggy-backed a release on this pull request.
50 ReleasePacket(to_release.Pass());
51 }
52
53 {
54 base::AutoLock lock(lock_);
55
56 if (state_ == MediaState::UNPREPARED) {
57 // The consumer has yet to call GetBuffer. This request will have to wait.
58 pending_pulls_.push_back(callback);
59 return;
60 }
61
62 DCHECK(mojo_allocator_.initialized());
63
64 // If there are no pending requests, see if we can handle this now. If
65 // requests are pending, add the callback to the pending queue.
66 if (!pending_pulls_.empty() || !MaybeHandlePullUnsafe(callback)) {
67 pending_pulls_.push_back(callback);
68 }
69
70 DCHECK(!cached_packet_);
71 }
72
73 DCHECK(demand_callback_);
74 demand_callback_(Demand::kPositive);
75 }
76
77 void MojoPullModeProducer::ReleasePacket(MediaPacketPtr to_release) {
78 {
79 base::AutoLock lock(lock_);
80 uint64_t size = to_release->payload ? to_release->payload->length : 0;
81 void* payload = size == 0 ? nullptr :
82 mojo_allocator_.PtrFromOffset(to_release->payload->offset);
83
84 for (auto iterator = unreleased_packets_.begin(); true; ++iterator) {
85 if (iterator == unreleased_packets_.end()) {
86 DCHECK(false) << "released packet has bad offset";
87 break;
88 }
89
90 if ((*iterator)->payload() == payload) {
johngro 2016/02/23 00:49:42 what happens if there are multiple payloads of 0 l
dalesat 2016/02/23 20:34:34 I've changed this to look for a match on both payl
91 DCHECK_EQ(size, (*iterator)->size()) << "released packet has bad size";
92 unreleased_packets_.erase(iterator);
93 break;
94 }
95 }
96
97 // TODO(dalesat): What if the allocator has starved?
98 }
99
100 DCHECK(demand_callback_);
101 demand_callback_(cached_packet_ ? Demand::kNegative : Demand::kPositive);
102 }
103
104 PayloadAllocator* MojoPullModeProducer::allocator() {
105 return mojo_allocator_.initialized() ? &mojo_allocator_ : nullptr;
106 }
107
108 void MojoPullModeProducer::SetDemandCallback(
109 const DemandCallback& demand_callback) {
110 demand_callback_ = demand_callback;
111 }
112
113 void MojoPullModeProducer::Prime() {
114 DCHECK(demand_callback_);
115 demand_callback_(Demand::kNeutral);
116 }
117
118 Demand MojoPullModeProducer::SupplyPacket(PacketPtr packet) {
119 base::AutoLock lock(lock_);
120 DCHECK(demand_ != Demand::kNegative) << "packet pushed with negative demand";
121 DCHECK(state_ != MediaState::ENDED) << "packet pushed after end-of-stream";
122
123 DCHECK(!cached_packet_);
124
125 // If there's no binding on the stream, throw the packet away. This can
126 // happen if a pull client disconnects unexpectedly.
127 if (bindings_.size() == 0) {
128 demand_ = Demand::kNegative;
129 state_ = MediaState::UNPREPARED;
130 // TODO(dalesat): More shutdown?
131 return demand_;
132 }
133
134 // Accept the packet and handle pending pulls with it.
135 cached_packet_ = std::move(packet);
136
137 HandlePendingPullsUnsafe();
138
139 demand_ = cached_packet_ ? Demand::kNegative : Demand::kPositive;
140 return demand_;
141 }
142
143 void MojoPullModeProducer::HandlePendingPullsUnsafe() {
144 lock_.AssertAcquired();
145
146 while (!pending_pulls_.empty()) {
147 DCHECK(mojo_allocator_.initialized());
148
149 if (MaybeHandlePullUnsafe(pending_pulls_.front())) {
150 pending_pulls_.pop_front();
151 } else {
152 break;
153 }
154 }
155 }
156
157 bool MojoPullModeProducer::MaybeHandlePullUnsafe(
158 const PullPacketCallback& callback) {
159 DCHECK(!callback.is_null());
160 lock_.AssertAcquired();
161
162 if (state_ == MediaState::ENDED) {
johngro 2016/02/23 00:49:42 If we are in State::ENDED, and there are N pending
dalesat 2016/02/23 20:34:34 This behavior is correct, I think. Keep in mind th
163 // At end-of-stream. Respond with empty end-of-stream packet.
164 HandlePullWithPacketUnsafe(
165 callback,
166 Packet::CreateEndOfStream(presentation_time_));
167 return true;
168 }
169
170 if (!cached_packet_) {
171 // Waiting for packet or end-of-stream indication.
172 return false;
173 }
174
175 HandlePullWithPacketUnsafe(callback, std::move(cached_packet_));
176 return true;
177 }
178
179 void MojoPullModeProducer::HandlePullWithPacketUnsafe(
180 const PullPacketCallback& callback,
181 PacketPtr packet) {
182 DCHECK(packet);
183 lock_.AssertAcquired();
184
185 // TODO(dalesat): Use TaskRunner for this callback.
186 callback.Run(CreateMediaPacket(packet));
187 unreleased_packets_.push_back(std::move(packet));
188 }
189
190 MediaPacketPtr MojoPullModeProducer::CreateMediaPacket(
191 const PacketPtr& packet) {
192 DCHECK(packet);
193
194 MediaPacketRegionPtr region = MediaPacketRegion::New();
195 region->offset = mojo_allocator_.OffsetFromPtr(packet->payload());
196 region->length = packet->size();
197
198 MediaPacketPtr media_packet = MediaPacket::New();
199 media_packet->pts = packet->presentation_time();
200 media_packet->duration = packet->duration();
201 media_packet->end_of_stream = packet->end_of_stream();
202 media_packet->payload = region.Pass();
203 presentation_time_ = packet->presentation_time() + packet->duration();
204
205 return media_packet.Pass();
206 }
207
208 } // namespace media
209 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698