OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
| 5 #include <math.h> |
5 #include <stdlib.h> | 6 #include <stdlib.h> |
| 7 #include <vector> |
6 | 8 |
7 #include "media/cast/test/utility/udp_proxy.h" | 9 #include "media/cast/test/utility/udp_proxy.h" |
8 | 10 |
9 #include "base/logging.h" | 11 #include "base/logging.h" |
10 #include "base/memory/linked_ptr.h" | |
11 #include "base/rand_util.h" | 12 #include "base/rand_util.h" |
12 #include "base/synchronization/waitable_event.h" | 13 #include "base/synchronization/waitable_event.h" |
13 #include "base/threading/thread.h" | 14 #include "base/threading/thread.h" |
14 #include "base/time/default_tick_clock.h" | 15 #include "base/time/default_tick_clock.h" |
15 #include "net/base/io_buffer.h" | 16 #include "net/base/io_buffer.h" |
16 #include "net/base/net_errors.h" | 17 #include "net/base/net_errors.h" |
17 #include "net/udp/udp_socket.h" | 18 #include "net/udp/udp_socket.h" |
18 | 19 |
19 namespace media { | 20 namespace media { |
20 namespace cast { | 21 namespace cast { |
(...skipping 282 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
303 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; | 304 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; |
304 }; | 305 }; |
305 | 306 |
306 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, | 307 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, |
307 double average_outage_time) { | 308 double average_outage_time) { |
308 return scoped_ptr<PacketPipe>( | 309 return scoped_ptr<PacketPipe>( |
309 new NetworkGlitchPipe(average_work_time, average_outage_time)) | 310 new NetworkGlitchPipe(average_work_time, average_outage_time)) |
310 .Pass(); | 311 .Pass(); |
311 } | 312 } |
312 | 313 |
| 314 |
| 315 // Internal buffer object for a client of the IPP model. |
| 316 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe { |
| 317 public: |
| 318 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp, |
| 319 size_t size) |
| 320 : ipp_(ipp), |
| 321 stored_size_(0), |
| 322 stored_limit_(size), |
| 323 clock_(NULL), |
| 324 weak_factory_(this) { |
| 325 } |
| 326 |
| 327 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { |
| 328 // Drop if buffer is full. |
| 329 if (stored_size_ >= stored_limit_) |
| 330 return; |
| 331 stored_size_ += packet->size(); |
| 332 buffer_.push_back(linked_ptr<transport::Packet>(packet.release())); |
| 333 buffer_time_.push_back(clock_->NowTicks()); |
| 334 DCHECK(buffer_.size() == buffer_time_.size()); |
| 335 } |
| 336 |
| 337 virtual void InitOnIOThread( |
| 338 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
| 339 base::TickClock* clock) OVERRIDE { |
| 340 clock_ = clock; |
| 341 if (ipp_) |
| 342 ipp_->InitOnIOThread(task_runner, clock); |
| 343 PacketPipe::InitOnIOThread(task_runner, clock); |
| 344 } |
| 345 |
| 346 void SendOnePacket() { |
| 347 scoped_ptr<transport::Packet> packet(buffer_.front().release()); |
| 348 stored_size_ -= packet->size(); |
| 349 buffer_.pop_front(); |
| 350 buffer_time_.pop_front(); |
| 351 pipe_->Send(packet.Pass()); |
| 352 DCHECK(buffer_.size() == buffer_time_.size()); |
| 353 } |
| 354 |
| 355 bool Empty() const { |
| 356 return buffer_.empty(); |
| 357 } |
| 358 |
| 359 base::TimeTicks FirstPacketTime() const { |
| 360 DCHECK(!buffer_time_.empty()); |
| 361 return buffer_time_.front(); |
| 362 } |
| 363 |
| 364 base::WeakPtr<InternalBuffer> GetWeakPtr() { |
| 365 return weak_factory_.GetWeakPtr(); |
| 366 |
| 367 } |
| 368 |
| 369 private: |
| 370 const base::WeakPtr<InterruptedPoissonProcess> ipp_; |
| 371 size_t stored_size_; |
| 372 const size_t stored_limit_; |
| 373 std::deque<linked_ptr<transport::Packet> > buffer_; |
| 374 std::deque<base::TimeTicks> buffer_time_; |
| 375 base::TickClock* clock_; |
| 376 base::WeakPtrFactory<InternalBuffer> weak_factory_; |
| 377 |
| 378 DISALLOW_COPY_AND_ASSIGN(InternalBuffer); |
| 379 }; |
| 380 |
| 381 InterruptedPoissonProcess::InterruptedPoissonProcess( |
| 382 const std::vector<double>& average_rates, |
| 383 double coef_burstiness, |
| 384 double coef_variance, |
| 385 uint32 rand_seed) |
| 386 : clock_(NULL), |
| 387 average_rates_(average_rates), |
| 388 coef_burstiness_(coef_burstiness), |
| 389 coef_variance_(coef_variance), |
| 390 rate_index_(0), |
| 391 on_state_(true), |
| 392 weak_factory_(this) { |
| 393 mt_rand_.init_genrand(rand_seed); |
| 394 DCHECK(!average_rates.empty()); |
| 395 ComputeRates(); |
| 396 } |
| 397 |
| 398 void InterruptedPoissonProcess::InitOnIOThread( |
| 399 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
| 400 base::TickClock* clock) { |
| 401 // Already initialized and started. |
| 402 if (task_runner_ && clock_) |
| 403 return; |
| 404 task_runner_ = task_runner; |
| 405 clock_ = clock; |
| 406 UpdateRates(); |
| 407 SwitchOn(); |
| 408 SendPacket(); |
| 409 } |
| 410 |
| 411 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) { |
| 412 scoped_ptr<InternalBuffer> buffer( |
| 413 new InternalBuffer(weak_factory_.GetWeakPtr(), size)); |
| 414 send_buffers_.push_back(buffer->GetWeakPtr()); |
| 415 return buffer.PassAs<PacketPipe>(); |
| 416 } |
| 417 |
| 418 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) { |
| 419 // Rate is per milliseconds. |
| 420 // The time until next event is exponentially distributed to the |
| 421 // inverse of |rate|. |
| 422 return base::TimeDelta::FromMillisecondsD( |
| 423 fabs(-log(1.0 - RandDouble()) / rate)); |
| 424 } |
| 425 |
| 426 double InterruptedPoissonProcess::RandDouble() { |
| 427 // Generate a 64-bits random number from MT19937 and then convert |
| 428 // it to double. |
| 429 uint64 rand = mt_rand_.genrand_int32(); |
| 430 rand <<= 32; |
| 431 rand |= mt_rand_.genrand_int32(); |
| 432 return base::BitsToOpenEndedUnitInterval(rand); |
| 433 } |
| 434 |
| 435 void InterruptedPoissonProcess::ComputeRates() { |
| 436 double avg_rate = average_rates_[rate_index_]; |
| 437 |
| 438 send_rate_ = avg_rate / coef_burstiness_; |
| 439 switch_off_rate_ = |
| 440 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) / |
| 441 coef_burstiness_ / (coef_variance_ - 1); |
| 442 switch_on_rate_ = |
| 443 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1); |
| 444 } |
| 445 |
| 446 void InterruptedPoissonProcess::UpdateRates() { |
| 447 ComputeRates(); |
| 448 |
| 449 // Rates are updated once per second. |
| 450 rate_index_ = (rate_index_ + 1) % average_rates_.size(); |
| 451 task_runner_->PostDelayedTask( |
| 452 FROM_HERE, |
| 453 base::Bind(&InterruptedPoissonProcess::UpdateRates, |
| 454 weak_factory_.GetWeakPtr()), |
| 455 base::TimeDelta::FromSeconds(1)); |
| 456 } |
| 457 |
| 458 void InterruptedPoissonProcess::SwitchOff() { |
| 459 on_state_ = false; |
| 460 task_runner_->PostDelayedTask( |
| 461 FROM_HERE, |
| 462 base::Bind(&InterruptedPoissonProcess::SwitchOn, |
| 463 weak_factory_.GetWeakPtr()), |
| 464 NextEvent(switch_on_rate_)); |
| 465 } |
| 466 |
| 467 void InterruptedPoissonProcess::SwitchOn() { |
| 468 on_state_ = true; |
| 469 task_runner_->PostDelayedTask( |
| 470 FROM_HERE, |
| 471 base::Bind(&InterruptedPoissonProcess::SwitchOff, |
| 472 weak_factory_.GetWeakPtr()), |
| 473 NextEvent(switch_off_rate_)); |
| 474 } |
| 475 |
| 476 void InterruptedPoissonProcess::SendPacket() { |
| 477 task_runner_->PostDelayedTask( |
| 478 FROM_HERE, |
| 479 base::Bind(&InterruptedPoissonProcess::SendPacket, |
| 480 weak_factory_.GetWeakPtr()), |
| 481 NextEvent(send_rate_)); |
| 482 |
| 483 // If OFF then don't send. |
| 484 if (!on_state_) |
| 485 return; |
| 486 |
| 487 // Find the earliest packet to send. |
| 488 base::TimeTicks earliest_time; |
| 489 for (size_t i = 0; i < send_buffers_.size(); ++i) { |
| 490 if (!send_buffers_[i]) |
| 491 continue; |
| 492 if (send_buffers_[i]->Empty()) |
| 493 continue; |
| 494 if (earliest_time.is_null() || |
| 495 send_buffers_[i]->FirstPacketTime() < earliest_time) |
| 496 earliest_time = send_buffers_[i]->FirstPacketTime(); |
| 497 } |
| 498 for (size_t i = 0; i < send_buffers_.size(); ++i) { |
| 499 if (!send_buffers_[i]) |
| 500 continue; |
| 501 if (send_buffers_[i]->Empty()) |
| 502 continue; |
| 503 if (send_buffers_[i]->FirstPacketTime() != earliest_time) |
| 504 continue; |
| 505 send_buffers_[i]->SendOnePacket(); |
| 506 break; |
| 507 } |
| 508 } |
| 509 |
313 class UDPProxyImpl; | 510 class UDPProxyImpl; |
314 | 511 |
315 class PacketSender : public PacketPipe { | 512 class PacketSender : public PacketPipe { |
316 public: | 513 public: |
317 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) | 514 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) |
318 : udp_proxy_(udp_proxy), destination_(destination) {} | 515 : udp_proxy_(udp_proxy), destination_(destination) {} |
319 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE; | 516 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE; |
320 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { | 517 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { |
321 NOTREACHED(); | 518 NOTREACHED(); |
322 } | 519 } |
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
584 destination, | 781 destination, |
585 to_dest_pipe.Pass(), | 782 to_dest_pipe.Pass(), |
586 from_dest_pipe.Pass(), | 783 from_dest_pipe.Pass(), |
587 net_log)); | 784 net_log)); |
588 return ret.Pass(); | 785 return ret.Pass(); |
589 } | 786 } |
590 | 787 |
591 } // namespace test | 788 } // namespace test |
592 } // namespace cast | 789 } // namespace cast |
593 } // namespace media | 790 } // namespace media |
OLD | NEW |