Chromium Code Reviews| Index: services/media/framework_mojo/mojo_pull_mode_producer.cc |
| diff --git a/services/media/framework_mojo/mojo_pull_mode_producer.cc b/services/media/framework_mojo/mojo_pull_mode_producer.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a6c5c914141a807c604d85909a16cc861cadae66 |
| --- /dev/null |
| +++ b/services/media/framework_mojo/mojo_pull_mode_producer.cc |
| @@ -0,0 +1,209 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "base/logging.h" |
| +#include "services/media/framework_mojo/mojo_pull_mode_producer.h" |
| + |
| +namespace mojo { |
| +namespace media { |
| + |
| +MojoPullModeProducer::MojoPullModeProducer() : |
| + state_(MediaState::UNPREPARED), |
| + demand_(Demand::kNegative), |
| + presentation_time_(0), |
| + cached_packet_(nullptr) {} |
| + |
| +MojoPullModeProducer::~MojoPullModeProducer() { |
| + base::AutoLock lock(lock_); |
| +} |
| + |
| +void MojoPullModeProducer::AddBinding( |
| + InterfaceRequest<MediaPullModeProducer> producer) { |
| + bindings_.AddBinding(this, producer.Pass()); |
| +} |
| + |
| +void MojoPullModeProducer::GetBuffer(const GetBufferCallback& callback) { |
| + if (!mojo_allocator_.initialized()) { |
| + mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up! |
| + } |
| + |
| + { |
| + base::AutoLock lock(lock_); |
| + if (state_ == MediaState::UNPREPARED) { |
| + state_ = MediaState::PAUSED; |
| + } |
| + } |
| + |
| + callback.Run(mojo_allocator_.GetDuplicateHandle()); |
| + |
| + DCHECK(!cached_packet_); |
| + DCHECK(demand_callback_); |
| + demand_callback_(Demand::kPositive); |
| +} |
| + |
| +void MojoPullModeProducer::PullPacket( |
| + MediaPacketPtr to_release, |
| + const PullPacketCallback& callback) { |
| + if (to_release) { |
| + // The client has piggy-backed a release on this pull request. |
| + ReleasePacket(to_release.Pass()); |
| + } |
| + |
| + { |
| + base::AutoLock lock(lock_); |
| + |
| + if (state_ == MediaState::UNPREPARED) { |
| + // The consumer has yet to call GetBuffer. This request will have to wait. |
| + pending_pulls_.push_back(callback); |
| + return; |
| + } |
| + |
| + DCHECK(mojo_allocator_.initialized()); |
| + |
| + // If there are no pending requests, see if we can handle this now. If |
| + // requests are pending, add the callback to the pending queue. |
| + if (!pending_pulls_.empty() || !MaybeHandlePullUnsafe(callback)) { |
| + pending_pulls_.push_back(callback); |
| + } |
| + |
| + DCHECK(!cached_packet_); |
| + } |
| + |
| + DCHECK(demand_callback_); |
| + demand_callback_(Demand::kPositive); |
| +} |
| + |
| +void MojoPullModeProducer::ReleasePacket(MediaPacketPtr to_release) { |
| + { |
| + base::AutoLock lock(lock_); |
| + uint64_t size = to_release->payload ? to_release->payload->length : 0; |
| + void* payload = size == 0 ? nullptr : |
| + mojo_allocator_.PtrFromOffset(to_release->payload->offset); |
| + |
| + for (auto iterator = unreleased_packets_.begin(); true; ++iterator) { |
| + if (iterator == unreleased_packets_.end()) { |
| + DCHECK(false) << "released packet has bad offset"; |
| + break; |
| + } |
| + |
| + if ((*iterator)->payload() == payload) { |
|
johngro
2016/02/23 00:49:42
what happens if there are multiple payloads of 0 l
dalesat
2016/02/23 20:34:34
I've changed this to look for a match on both payl
|
| + DCHECK_EQ(size, (*iterator)->size()) << "released packet has bad size"; |
| + unreleased_packets_.erase(iterator); |
| + break; |
| + } |
| + } |
| + |
| + // TODO(dalesat): What if the allocator has starved? |
| + } |
| + |
| + DCHECK(demand_callback_); |
| + demand_callback_(cached_packet_ ? Demand::kNegative : Demand::kPositive); |
| +} |
| + |
| +PayloadAllocator* MojoPullModeProducer::allocator() { |
| + return mojo_allocator_.initialized() ? &mojo_allocator_ : nullptr; |
| +} |
| + |
| +void MojoPullModeProducer::SetDemandCallback( |
| + const DemandCallback& demand_callback) { |
| + demand_callback_ = demand_callback; |
| +} |
| + |
| +void MojoPullModeProducer::Prime() { |
| + DCHECK(demand_callback_); |
| + demand_callback_(Demand::kNeutral); |
| +} |
| + |
| +Demand MojoPullModeProducer::SupplyPacket(PacketPtr packet) { |
| + base::AutoLock lock(lock_); |
| + DCHECK(demand_ != Demand::kNegative) << "packet pushed with negative demand"; |
| + DCHECK(state_ != MediaState::ENDED) << "packet pushed after end-of-stream"; |
| + |
| + DCHECK(!cached_packet_); |
| + |
| + // If there's no binding on the stream, throw the packet away. This can |
| + // happen if a pull client disconnects unexpectedly. |
| + if (bindings_.size() == 0) { |
| + demand_ = Demand::kNegative; |
| + state_ = MediaState::UNPREPARED; |
| + // TODO(dalesat): More shutdown? |
| + return demand_; |
| + } |
| + |
| + // Accept the packet and handle pending pulls with it. |
| + cached_packet_ = std::move(packet); |
| + |
| + HandlePendingPullsUnsafe(); |
| + |
| + demand_ = cached_packet_ ? Demand::kNegative : Demand::kPositive; |
| + return demand_; |
| +} |
| + |
| +void MojoPullModeProducer::HandlePendingPullsUnsafe() { |
| + lock_.AssertAcquired(); |
| + |
| + while (!pending_pulls_.empty()) { |
| + DCHECK(mojo_allocator_.initialized()); |
| + |
| + if (MaybeHandlePullUnsafe(pending_pulls_.front())) { |
| + pending_pulls_.pop_front(); |
| + } else { |
| + break; |
| + } |
| + } |
| +} |
| + |
| +bool MojoPullModeProducer::MaybeHandlePullUnsafe( |
| + const PullPacketCallback& callback) { |
| + DCHECK(!callback.is_null()); |
| + lock_.AssertAcquired(); |
| + |
| + if (state_ == MediaState::ENDED) { |
|
johngro
2016/02/23 00:49:42
If we are in State::ENDED, and there are N pending
dalesat
2016/02/23 20:34:34
This behavior is correct, I think. Keep in mind th
|
| + // At end-of-stream. Respond with empty end-of-stream packet. |
| + HandlePullWithPacketUnsafe( |
| + callback, |
| + Packet::CreateEndOfStream(presentation_time_)); |
| + return true; |
| + } |
| + |
| + if (!cached_packet_) { |
| + // Waiting for packet or end-of-stream indication. |
| + return false; |
| + } |
| + |
| + HandlePullWithPacketUnsafe(callback, std::move(cached_packet_)); |
| + return true; |
| +} |
| + |
| +void MojoPullModeProducer::HandlePullWithPacketUnsafe( |
| + const PullPacketCallback& callback, |
| + PacketPtr packet) { |
| + DCHECK(packet); |
| + lock_.AssertAcquired(); |
| + |
| + // TODO(dalesat): Use TaskRunner for this callback. |
| + callback.Run(CreateMediaPacket(packet)); |
| + unreleased_packets_.push_back(std::move(packet)); |
| +} |
| + |
| +MediaPacketPtr MojoPullModeProducer::CreateMediaPacket( |
| + const PacketPtr& packet) { |
| + DCHECK(packet); |
| + |
| + MediaPacketRegionPtr region = MediaPacketRegion::New(); |
| + region->offset = mojo_allocator_.OffsetFromPtr(packet->payload()); |
| + region->length = packet->size(); |
| + |
| + MediaPacketPtr media_packet = MediaPacket::New(); |
| + media_packet->pts = packet->presentation_time(); |
| + media_packet->duration = packet->duration(); |
| + media_packet->end_of_stream = packet->end_of_stream(); |
| + media_packet->payload = region.Pass(); |
| + presentation_time_ = packet->presentation_time() + packet->duration(); |
| + |
| + return media_packet.Pass(); |
| +} |
| + |
| +} // namespace media |
| +} // namespace mojo |