Index: media/cast/test/utility/udp_proxy.cc |
diff --git a/media/cast/test/utility/udp_proxy.cc b/media/cast/test/utility/udp_proxy.cc |
index 9fc3b4a44db21e67f84d069e086c6e0a721c543f..5c765f6778325c18ab4f537cfd6b2ed57eaa6144 100644 |
--- a/media/cast/test/utility/udp_proxy.cc |
+++ b/media/cast/test/utility/udp_proxy.cc |
@@ -2,12 +2,13 @@ |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
+#include <math.h> |
#include <stdlib.h> |
+#include <vector> |
#include "media/cast/test/utility/udp_proxy.h" |
#include "base/logging.h" |
-#include "base/memory/linked_ptr.h" |
#include "base/rand_util.h" |
#include "base/synchronization/waitable_event.h" |
#include "base/threading/thread.h" |
@@ -310,6 +311,202 @@ scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, |
.Pass(); |
} |
+ |
+// Internal buffer object for a client of the IPP model. |
+class InterruptedPoissonProcess::InternalBuffer : public PacketPipe { |
+ public: |
+ InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp, |
+ size_t size) |
+ : ipp_(ipp), |
+ stored_size_(0), |
+ stored_limit_(size), |
+ clock_(NULL), |
+ weak_factory_(this) { |
+ } |
+ |
+ virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { |
+ // Drop if buffer is full. |
+ if (stored_size_ >= stored_limit_) |
+ return; |
+ stored_size_ += packet->size(); |
+ buffer_.push_back(linked_ptr<transport::Packet>(packet.release())); |
+ buffer_time_.push_back(clock_->NowTicks()); |
+ DCHECK(buffer_.size() == buffer_time_.size()); |
+ } |
+ |
+ virtual void InitOnIOThread( |
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
+ base::TickClock* clock) OVERRIDE { |
+ clock_ = clock; |
+ if (ipp_) |
+ ipp_->InitOnIOThread(task_runner, clock); |
+ PacketPipe::InitOnIOThread(task_runner, clock); |
+ } |
+ |
+ void SendOnePacket() { |
+ scoped_ptr<transport::Packet> packet(buffer_.front().release()); |
+ stored_size_ -= packet->size(); |
+ buffer_.pop_front(); |
+ buffer_time_.pop_front(); |
+ pipe_->Send(packet.Pass()); |
+ DCHECK(buffer_.size() == buffer_time_.size()); |
+ } |
+ |
+ bool Empty() const { |
+ return buffer_.empty(); |
+ } |
+ |
+ base::TimeTicks FirstPacketTime() const { |
+ DCHECK(!buffer_time_.empty()); |
+ return buffer_time_.front(); |
+ } |
+ |
+ base::WeakPtr<InternalBuffer> GetWeakPtr() { |
+ return weak_factory_.GetWeakPtr(); |
+ |
+ } |
+ |
+ private: |
+ const base::WeakPtr<InterruptedPoissonProcess> ipp_; |
+ size_t stored_size_; |
+ const size_t stored_limit_; |
+ std::deque<linked_ptr<transport::Packet> > buffer_; |
+ std::deque<base::TimeTicks> buffer_time_; |
+ base::TickClock* clock_; |
+ base::WeakPtrFactory<InternalBuffer> weak_factory_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(InternalBuffer); |
+}; |
+ |
+InterruptedPoissonProcess::InterruptedPoissonProcess( |
+ const std::vector<double>& average_rates, |
+ double coef_burstiness, |
+ double coef_variance, |
+ uint32 rand_seed) |
+ : clock_(NULL), |
+ average_rates_(average_rates), |
+ coef_burstiness_(coef_burstiness), |
+ coef_variance_(coef_variance), |
+ rate_index_(0), |
+ on_state_(true), |
+ weak_factory_(this) { |
+ mt_rand_.init_genrand(rand_seed); |
+ DCHECK(!average_rates.empty()); |
+ ComputeRates(); |
+} |
+ |
+void InterruptedPoissonProcess::InitOnIOThread( |
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
+ base::TickClock* clock) { |
+ // Already initialized and started. |
+ if (task_runner_ && clock_) |
+ return; |
+ task_runner_ = task_runner; |
+ clock_ = clock; |
+ UpdateRates(); |
+ SwitchOn(); |
+ SendPacket(); |
+} |
+ |
+scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) { |
+ scoped_ptr<InternalBuffer> buffer( |
+ new InternalBuffer(weak_factory_.GetWeakPtr(), size)); |
+ send_buffers_.push_back(buffer->GetWeakPtr()); |
+ return buffer.PassAs<PacketPipe>(); |
+} |
+ |
+base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) { |
+ // Rate is per milliseconds. |
+ // The time until next event is exponentially distributed to the |
+ // inverse of |rate|. |
+ return base::TimeDelta::FromMillisecondsD( |
+ fabs(-log(1.0 - RandDouble()) / rate)); |
+} |
+ |
+double InterruptedPoissonProcess::RandDouble() { |
+ // Generate a 64-bits random number from MT19937 and then convert |
+ // it to double. |
+ uint64 rand = mt_rand_.genrand_int32(); |
+ rand <<= 32; |
+ rand |= mt_rand_.genrand_int32(); |
+ return base::BitsToOpenEndedUnitInterval(rand); |
+} |
+ |
+void InterruptedPoissonProcess::ComputeRates() { |
+ double avg_rate = average_rates_[rate_index_]; |
+ |
+ send_rate_ = avg_rate / coef_burstiness_; |
+ switch_off_rate_ = |
+ 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) / |
+ coef_burstiness_ / (coef_variance_ - 1); |
+ switch_on_rate_ = |
+ 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1); |
+} |
+ |
+void InterruptedPoissonProcess::UpdateRates() { |
+ ComputeRates(); |
+ |
+ // Rates are updated once per second. |
+ rate_index_ = (rate_index_ + 1) % average_rates_.size(); |
+ task_runner_->PostDelayedTask( |
+ FROM_HERE, |
+ base::Bind(&InterruptedPoissonProcess::UpdateRates, |
+ weak_factory_.GetWeakPtr()), |
+ base::TimeDelta::FromSeconds(1)); |
+} |
+ |
+void InterruptedPoissonProcess::SwitchOff() { |
+ on_state_ = false; |
+ task_runner_->PostDelayedTask( |
+ FROM_HERE, |
+ base::Bind(&InterruptedPoissonProcess::SwitchOn, |
+ weak_factory_.GetWeakPtr()), |
+ NextEvent(switch_on_rate_)); |
+} |
+ |
+void InterruptedPoissonProcess::SwitchOn() { |
+ on_state_ = true; |
+ task_runner_->PostDelayedTask( |
+ FROM_HERE, |
+ base::Bind(&InterruptedPoissonProcess::SwitchOff, |
+ weak_factory_.GetWeakPtr()), |
+ NextEvent(switch_off_rate_)); |
+} |
+ |
+void InterruptedPoissonProcess::SendPacket() { |
+ task_runner_->PostDelayedTask( |
+ FROM_HERE, |
+ base::Bind(&InterruptedPoissonProcess::SendPacket, |
+ weak_factory_.GetWeakPtr()), |
+ NextEvent(send_rate_)); |
+ |
+ // If OFF then don't send. |
+ if (!on_state_) |
+ return; |
+ |
+ // Find the earliest packet to send. |
+ base::TimeTicks earliest_time; |
+ for (size_t i = 0; i < send_buffers_.size(); ++i) { |
+ if (!send_buffers_[i]) |
+ continue; |
+ if (send_buffers_[i]->Empty()) |
+ continue; |
+ if (earliest_time.is_null() || |
+ send_buffers_[i]->FirstPacketTime() < earliest_time) |
+ earliest_time = send_buffers_[i]->FirstPacketTime(); |
+ } |
+ for (size_t i = 0; i < send_buffers_.size(); ++i) { |
+ if (!send_buffers_[i]) |
+ continue; |
+ if (send_buffers_[i]->Empty()) |
+ continue; |
+ if (send_buffers_[i]->FirstPacketTime() != earliest_time) |
+ continue; |
+ send_buffers_[i]->SendOnePacket(); |
+ break; |
+ } |
+} |
+ |
class UDPProxyImpl; |
class PacketSender : public PacketPipe { |