| Index: services/media/framework_mojo/mojo_pull_mode_producer.cc
|
| diff --git a/services/media/framework_mojo/mojo_pull_mode_producer.cc b/services/media/framework_mojo/mojo_pull_mode_producer.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..596d618b14cc462fc00308821123a5861cbc4c36
|
| --- /dev/null
|
| +++ b/services/media/framework_mojo/mojo_pull_mode_producer.cc
|
| @@ -0,0 +1,208 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/logging.h"
|
| +#include "services/media/framework_mojo/mojo_pull_mode_producer.h"
|
| +
|
| +namespace mojo {
|
| +namespace media {
|
| +
|
| +MojoPullModeProducer::MojoPullModeProducer() :
|
| + state_(MediaState::UNPREPARED),
|
| + demand_(Demand::kNegative),
|
| + presentation_time_(0),
|
| + cached_packet_(nullptr) {}
|
| +
|
| +MojoPullModeProducer::~MojoPullModeProducer() {
|
| + base::AutoLock lock(lock_);
|
| +}
|
| +
|
| +void MojoPullModeProducer::AddBinding(
|
| + InterfaceRequest<MediaPullModeProducer> producer) {
|
| + bindings_.AddBinding(this, producer.Pass());
|
| +}
|
| +
|
| +void MojoPullModeProducer::GetBuffer(const GetBufferCallback& callback) {
|
| + if (!mojo_allocator_.initialized()) {
|
| + mojo_allocator_.InitNew(256 * 1024); // TODO(dalesat): Made up!
|
| + }
|
| +
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + if (state_ == MediaState::UNPREPARED) {
|
| + state_ = MediaState::PAUSED;
|
| + }
|
| + }
|
| +
|
| + callback.Run(mojo_allocator_.GetDuplicateHandle());
|
| +
|
| + DCHECK(!cached_packet_);
|
| + DCHECK(demand_callback_);
|
| + demand_callback_(Demand::kPositive);
|
| +}
|
| +
|
| +void MojoPullModeProducer::PullPacket(
|
| + MediaPacketPtr to_release,
|
| + const PullPacketCallback& callback) {
|
| + if (to_release) {
|
| + // The client has piggy-backed a release on this pull request.
|
| + ReleasePacket(to_release.Pass());
|
| + }
|
| +
|
| + {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (state_ == MediaState::UNPREPARED) {
|
| + // The consumer has yet to call GetBuffer. This request will have to wait.
|
| + pending_pulls_.push_back(callback);
|
| + return;
|
| + }
|
| +
|
| + DCHECK(mojo_allocator_.initialized());
|
| +
|
| + // If there are no pending requests, see if we can handle this now. If
|
| + // requests are pending, add the callback to the pending queue.
|
| + if (!pending_pulls_.empty() || !MaybeHandlePullUnsafe(callback)) {
|
| + pending_pulls_.push_back(callback);
|
| + }
|
| +
|
| + DCHECK(!cached_packet_);
|
| + }
|
| +
|
| + DCHECK(demand_callback_);
|
| + demand_callback_(Demand::kPositive);
|
| +}
|
| +
|
| +void MojoPullModeProducer::ReleasePacket(MediaPacketPtr to_release) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + uint64_t size = to_release->payload ? to_release->payload->length : 0;
|
| + void* payload = size == 0 ? nullptr :
|
| + mojo_allocator_.PtrFromOffset(to_release->payload->offset);
|
| +
|
| + for (auto iterator = unreleased_packets_.begin(); true; ++iterator) {
|
| + if (iterator == unreleased_packets_.end()) {
|
| + DCHECK(false) << "released packet has bad offset and/or size";
|
| + break;
|
| + }
|
| +
|
| + if ((*iterator)->payload() == payload && (*iterator)->size() == size) {
|
| + unreleased_packets_.erase(iterator);
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // TODO(dalesat): What if the allocator has starved?
|
| + }
|
| +
|
| + DCHECK(demand_callback_);
|
| + demand_callback_(cached_packet_ ? Demand::kNegative : Demand::kPositive);
|
| +}
|
| +
|
| +PayloadAllocator* MojoPullModeProducer::allocator() {
|
| + return mojo_allocator_.initialized() ? &mojo_allocator_ : nullptr;
|
| +}
|
| +
|
| +void MojoPullModeProducer::SetDemandCallback(
|
| + const DemandCallback& demand_callback) {
|
| + demand_callback_ = demand_callback;
|
| +}
|
| +
|
| +void MojoPullModeProducer::Prime() {
|
| + DCHECK(demand_callback_);
|
| + demand_callback_(Demand::kNeutral);
|
| +}
|
| +
|
| +Demand MojoPullModeProducer::SupplyPacket(PacketPtr packet) {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(demand_ != Demand::kNegative) << "packet pushed with negative demand";
|
| + DCHECK(state_ != MediaState::ENDED) << "packet pushed after end-of-stream";
|
| +
|
| + DCHECK(!cached_packet_);
|
| +
|
| + // If there's no binding on the stream, throw the packet away. This can
|
| + // happen if a pull client disconnects unexpectedly.
|
| + if (bindings_.size() == 0) {
|
| + demand_ = Demand::kNegative;
|
| + state_ = MediaState::UNPREPARED;
|
| + // TODO(dalesat): More shutdown?
|
| + return demand_;
|
| + }
|
| +
|
| + // Accept the packet and handle pending pulls with it.
|
| + cached_packet_ = std::move(packet);
|
| +
|
| + HandlePendingPullsUnsafe();
|
| +
|
| + demand_ = cached_packet_ ? Demand::kNegative : Demand::kPositive;
|
| + return demand_;
|
| +}
|
| +
|
| +void MojoPullModeProducer::HandlePendingPullsUnsafe() {
|
| + lock_.AssertAcquired();
|
| +
|
| + while (!pending_pulls_.empty()) {
|
| + DCHECK(mojo_allocator_.initialized());
|
| +
|
| + if (MaybeHandlePullUnsafe(pending_pulls_.front())) {
|
| + pending_pulls_.pop_front();
|
| + } else {
|
| + break;
|
| + }
|
| + }
|
| +}
|
| +
|
| +bool MojoPullModeProducer::MaybeHandlePullUnsafe(
|
| + const PullPacketCallback& callback) {
|
| + DCHECK(!callback.is_null());
|
| + lock_.AssertAcquired();
|
| +
|
| + if (state_ == MediaState::ENDED) {
|
| + // At end-of-stream. Respond with empty end-of-stream packet.
|
| + HandlePullWithPacketUnsafe(
|
| + callback,
|
| + Packet::CreateEndOfStream(presentation_time_));
|
| + return true;
|
| + }
|
| +
|
| + if (!cached_packet_) {
|
| + // Waiting for packet or end-of-stream indication.
|
| + return false;
|
| + }
|
| +
|
| + HandlePullWithPacketUnsafe(callback, std::move(cached_packet_));
|
| + return true;
|
| +}
|
| +
|
| +void MojoPullModeProducer::HandlePullWithPacketUnsafe(
|
| + const PullPacketCallback& callback,
|
| + PacketPtr packet) {
|
| + DCHECK(packet);
|
| + lock_.AssertAcquired();
|
| +
|
| + // TODO(dalesat): Use TaskRunner for this callback.
|
| + callback.Run(CreateMediaPacket(packet));
|
| + unreleased_packets_.push_back(std::move(packet));
|
| +}
|
| +
|
| +MediaPacketPtr MojoPullModeProducer::CreateMediaPacket(
|
| + const PacketPtr& packet) {
|
| + DCHECK(packet);
|
| +
|
| + MediaPacketRegionPtr region = MediaPacketRegion::New();
|
| + region->offset = mojo_allocator_.OffsetFromPtr(packet->payload());
|
| + region->length = packet->size();
|
| +
|
| + MediaPacketPtr media_packet = MediaPacket::New();
|
| + media_packet->pts = packet->presentation_time();
|
| + media_packet->duration = packet->duration();
|
| + media_packet->end_of_stream = packet->end_of_stream();
|
| + media_packet->payload = region.Pass();
|
| + presentation_time_ = packet->presentation_time() + packet->duration();
|
| +
|
| + return media_packet.Pass();
|
| +}
|
| +
|
| +} // namespace media
|
| +} // namespace mojo
|
|
|