Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(386)

Side by Side Diff: media/cast/test/utility/udp_proxy.cc

Issue 362123005: Cast: Update simulator tool with more inputs. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Diff Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <math.h>
5 #include <stdlib.h> 6 #include <stdlib.h>
7 #include <vector>
6 8
7 #include "media/cast/test/utility/udp_proxy.h" 9 #include "media/cast/test/utility/udp_proxy.h"
8 10
9 #include "base/logging.h" 11 #include "base/logging.h"
10 #include "base/memory/linked_ptr.h"
11 #include "base/rand_util.h" 12 #include "base/rand_util.h"
12 #include "base/synchronization/waitable_event.h" 13 #include "base/synchronization/waitable_event.h"
13 #include "base/threading/thread.h" 14 #include "base/threading/thread.h"
14 #include "base/time/default_tick_clock.h" 15 #include "base/time/default_tick_clock.h"
15 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
16 #include "net/base/net_errors.h" 17 #include "net/base/net_errors.h"
17 #include "net/udp/udp_socket.h" 18 #include "net/udp/udp_socket.h"
18 19
19 namespace media { 20 namespace media {
20 namespace cast { 21 namespace cast {
(...skipping 282 matching lines...) Expand 10 before | Expand all | Expand 10 after
303 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; 304 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
304 }; 305 };
305 306
306 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, 307 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
307 double average_outage_time) { 308 double average_outage_time) {
308 return scoped_ptr<PacketPipe>( 309 return scoped_ptr<PacketPipe>(
309 new NetworkGlitchPipe(average_work_time, average_outage_time)) 310 new NetworkGlitchPipe(average_work_time, average_outage_time))
310 .Pass(); 311 .Pass();
311 } 312 }
312 313
314
315 // Internal buffer object for a client of the IPP model.
316 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
317 public:
318 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
319 size_t size)
320 : ipp_(ipp),
321 stored_size_(0),
322 stored_limit_(size),
323 clock_(NULL),
324 weak_factory_(this) {
325 }
326
327 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
328 // Drop if buffer is full.
329 if (stored_size_ >= stored_limit_)
330 return;
331 stored_size_ += packet->size();
332 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
333 buffer_time_.push_back(clock_->NowTicks());
334 DCHECK(buffer_.size() == buffer_time_.size());
335 }
336
337 virtual void InitOnIOThread(
338 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
339 base::TickClock* clock) OVERRIDE {
340 clock_ = clock;
341 if (ipp_)
342 ipp_->InitOnIOThread(task_runner, clock);
343 PacketPipe::InitOnIOThread(task_runner, clock);
344 }
345
346 void SendOnePacket() {
347 scoped_ptr<transport::Packet> packet(buffer_.front().release());
348 stored_size_ -= packet->size();
349 buffer_.pop_front();
350 buffer_time_.pop_front();
351 pipe_->Send(packet.Pass());
352 DCHECK(buffer_.size() == buffer_time_.size());
353 }
354
355 bool Empty() const {
356 return buffer_.empty();
357 }
358
359 base::TimeTicks FirstPacketTime() const {
360 DCHECK(!buffer_time_.empty());
361 return buffer_time_.front();
362 }
363
364 base::WeakPtr<InternalBuffer> GetWeakPtr() {
365 return weak_factory_.GetWeakPtr();
366
367 }
368
369 private:
370 const base::WeakPtr<InterruptedPoissonProcess> ipp_;
371 size_t stored_size_;
372 const size_t stored_limit_;
373 std::deque<linked_ptr<transport::Packet> > buffer_;
374 std::deque<base::TimeTicks> buffer_time_;
375 base::TickClock* clock_;
376 base::WeakPtrFactory<InternalBuffer> weak_factory_;
377
378 DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
379 };
380
381 InterruptedPoissonProcess::InterruptedPoissonProcess(
382 const std::vector<double>& average_rates,
383 double coef_burstiness,
384 double coef_variance,
385 uint32 rand_seed)
386 : clock_(NULL),
387 average_rates_(average_rates),
388 coef_burstiness_(coef_burstiness),
389 coef_variance_(coef_variance),
390 rate_index_(0),
391 on_state_(true),
392 weak_factory_(this) {
393 mt_rand_.init_genrand(rand_seed);
394 DCHECK(!average_rates.empty());
395 ComputeRates();
396 }
397
398 void InterruptedPoissonProcess::InitOnIOThread(
399 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
400 base::TickClock* clock) {
401 // Already initialized and started.
402 if (task_runner_ && clock_)
403 return;
404 task_runner_ = task_runner;
405 clock_ = clock;
406 UpdateRates();
407 SwitchOn();
408 SendPacket();
409 }
410
411 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
412 scoped_ptr<InternalBuffer> buffer(
413 new InternalBuffer(weak_factory_.GetWeakPtr(), size));
414 send_buffers_.push_back(buffer->GetWeakPtr());
415 return buffer.PassAs<PacketPipe>();
416 }
417
418 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
419 // Rate is per milliseconds.
420 // The time until next event is exponentially distributed to the
421 // inverse of |rate|.
422 return base::TimeDelta::FromMillisecondsD(
423 fabs(-log(1.0 - RandDouble()) / rate));
424 }
425
426 double InterruptedPoissonProcess::RandDouble() {
427 // Generate a 64-bits random number from MT19937 and then convert
428 // it to double.
429 uint64 rand = mt_rand_.genrand_int32();
430 rand <<= 32;
431 rand |= mt_rand_.genrand_int32();
432 return base::BitsToOpenEndedUnitInterval(rand);
433 }
434
435 void InterruptedPoissonProcess::ComputeRates() {
436 double avg_rate = average_rates_[rate_index_];
437
438 send_rate_ = avg_rate / coef_burstiness_;
439 switch_off_rate_ =
440 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
441 coef_burstiness_ / (coef_variance_ - 1);
442 switch_on_rate_ =
443 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
444 }
445
446 void InterruptedPoissonProcess::UpdateRates() {
447 ComputeRates();
448
449 // Rates are updated once per second.
450 rate_index_ = (rate_index_ + 1) % average_rates_.size();
451 task_runner_->PostDelayedTask(
452 FROM_HERE,
453 base::Bind(&InterruptedPoissonProcess::UpdateRates,
454 weak_factory_.GetWeakPtr()),
455 base::TimeDelta::FromSeconds(1));
456 }
457
458 void InterruptedPoissonProcess::SwitchOff() {
459 on_state_ = false;
460 task_runner_->PostDelayedTask(
461 FROM_HERE,
462 base::Bind(&InterruptedPoissonProcess::SwitchOn,
463 weak_factory_.GetWeakPtr()),
464 NextEvent(switch_on_rate_));
465 }
466
467 void InterruptedPoissonProcess::SwitchOn() {
468 on_state_ = true;
469 task_runner_->PostDelayedTask(
470 FROM_HERE,
471 base::Bind(&InterruptedPoissonProcess::SwitchOff,
472 weak_factory_.GetWeakPtr()),
473 NextEvent(switch_off_rate_));
474 }
475
476 void InterruptedPoissonProcess::SendPacket() {
477 task_runner_->PostDelayedTask(
478 FROM_HERE,
479 base::Bind(&InterruptedPoissonProcess::SendPacket,
480 weak_factory_.GetWeakPtr()),
481 NextEvent(send_rate_));
482
483 // If OFF then don't send.
484 if (!on_state_)
485 return;
486
487 // Find the earliest packet to send.
488 base::TimeTicks earliest_time;
489 for (size_t i = 0; i < send_buffers_.size(); ++i) {
490 if (!send_buffers_[i])
491 continue;
492 if (send_buffers_[i]->Empty())
493 continue;
494 if (earliest_time.is_null() ||
495 send_buffers_[i]->FirstPacketTime() < earliest_time)
496 earliest_time = send_buffers_[i]->FirstPacketTime();
497 }
498 for (size_t i = 0; i < send_buffers_.size(); ++i) {
499 if (!send_buffers_[i])
500 continue;
501 if (send_buffers_[i]->Empty())
502 continue;
503 if (send_buffers_[i]->FirstPacketTime() != earliest_time)
504 continue;
505 send_buffers_[i]->SendOnePacket();
506 break;
507 }
508 }
509
313 class UDPProxyImpl; 510 class UDPProxyImpl;
314 511
315 class PacketSender : public PacketPipe { 512 class PacketSender : public PacketPipe {
316 public: 513 public:
317 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) 514 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
318 : udp_proxy_(udp_proxy), destination_(destination) {} 515 : udp_proxy_(udp_proxy), destination_(destination) {}
319 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE; 516 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
320 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { 517 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
321 NOTREACHED(); 518 NOTREACHED();
322 } 519 }
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after
584 destination, 781 destination,
585 to_dest_pipe.Pass(), 782 to_dest_pipe.Pass(),
586 from_dest_pipe.Pass(), 783 from_dest_pipe.Pass(),
587 net_log)); 784 net_log));
588 return ret.Pass(); 785 return ret.Pass();
589 } 786 }
590 787
591 } // namespace test 788 } // namespace test
592 } // namespace cast 789 } // namespace cast
593 } // namespace media 790 } // namespace media
OLDNEW
« media/cast/test/simulator.cc ('K') | « media/cast/test/utility/udp_proxy.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698