Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: media/cast/test/utility/udp_proxy.cc

Issue 282333003: Cast: Make the udp proxy actually work for v1 mirroring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: added missing initializer Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/cast/test/utility/udp_proxy.h" 5 #include "media/cast/test/utility/udp_proxy.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h" 8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h" 10 #include "base/synchronization/waitable_event.h"
(...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; 298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
299 }; 299 };
300 300
301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, 301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
302 double average_outage_time) { 302 double average_outage_time) {
303 return scoped_ptr<PacketPipe>( 303 return scoped_ptr<PacketPipe>(
304 new NetworkGlitchPipe(average_work_time, average_outage_time)) 304 new NetworkGlitchPipe(average_work_time, average_outage_time))
305 .Pass(); 305 .Pass();
306 } 306 }
307 307
308 class UDPProxyImpl;
309
308 class PacketSender : public PacketPipe { 310 class PacketSender : public PacketPipe {
309 public: 311 public:
310 PacketSender(net::UDPSocket* udp_socket, 312 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
311 const net::IPEndPoint* destination) : 313 : udp_proxy_(udp_proxy), destination_(destination) {}
312 blocked_(false), 314 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
313 udp_socket_(udp_socket),
314 destination_(destination),
315 weak_factory_(this) {
316 }
317 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
318 if (blocked_) {
319 LOG(ERROR) << "Cannot write packet right now: blocked";
320 return;
321 }
322
323 VLOG(1) << "Sending packet, len = " << packet->size();
324 // We ignore all problems, callbacks and errors.
325 // If it didn't work we just drop the packet at and call it a day.
326 scoped_refptr<net::IOBuffer> buf =
327 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
328 size_t buf_size = packet->size();
329 int result;
330 if (destination_->address().empty()) {
331 VLOG(1) << "Destination has not been set yet.";
332 result = net::ERR_INVALID_ARGUMENT;
333 } else {
334 VLOG(1) << "Destination:" << destination_->ToString();
335 result = udp_socket_->SendTo(buf,
336 static_cast<int>(buf_size),
337 *destination_,
338 base::Bind(&PacketSender::AllowWrite,
339 weak_factory_.GetWeakPtr(),
340 buf,
341 base::Passed(&packet)));
342 }
343 if (result == net::ERR_IO_PENDING) {
344 blocked_ = true;
345 } else if (result < 0) {
346 LOG(ERROR) << "Failed to write packet.";
347 }
348 }
349 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { 315 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
350 NOTREACHED(); 316 NOTREACHED();
351 } 317 }
352 318
353 private: 319 private:
354 void AllowWrite(scoped_refptr<net::IOBuffer> buf, 320 UDPProxyImpl* udp_proxy_;
355 scoped_ptr<transport::Packet> packet,
356 int unused_len) {
357 DCHECK(blocked_);
358 blocked_ = false;
359 }
360 bool blocked_;
361 net::UDPSocket* udp_socket_;
362 const net::IPEndPoint* destination_; // not owned 321 const net::IPEndPoint* destination_; // not owned
363 base::WeakPtrFactory<PacketSender> weak_factory_;
364 }; 322 };
365 323
366 namespace { 324 namespace {
367 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { 325 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
368 if (*pipe) { 326 if (*pipe) {
369 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); 327 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
370 } else { 328 } else {
371 pipe->reset(next); 329 pipe->reset(next);
372 } 330 }
373 } 331 }
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
424 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 382 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
425 return pipe.Pass(); 383 return pipe.Pass();
426 } 384 }
427 385
428 class UDPProxyImpl : public UDPProxy { 386 class UDPProxyImpl : public UDPProxy {
429 public: 387 public:
430 UDPProxyImpl(const net::IPEndPoint& local_port, 388 UDPProxyImpl(const net::IPEndPoint& local_port,
431 const net::IPEndPoint& destination, 389 const net::IPEndPoint& destination,
432 scoped_ptr<PacketPipe> to_dest_pipe, 390 scoped_ptr<PacketPipe> to_dest_pipe,
433 scoped_ptr<PacketPipe> from_dest_pipe, 391 scoped_ptr<PacketPipe> from_dest_pipe,
434 net::NetLog* net_log) : 392 net::NetLog* net_log)
435 local_port_(local_port), 393 : local_port_(local_port),
436 destination_(destination), 394 destination_(destination),
437 destination_is_mutable_(destination.address().empty()), 395 destination_is_mutable_(destination.address().empty()),
438 proxy_thread_("media::cast::test::UdpProxy Thread"), 396 proxy_thread_("media::cast::test::UdpProxy Thread"),
439 to_dest_pipe_(to_dest_pipe.Pass()), 397 to_dest_pipe_(to_dest_pipe.Pass()),
440 from_dest_pipe_(from_dest_pipe.Pass()) { 398 from_dest_pipe_(from_dest_pipe.Pass()),
399 blocked_(false),
400 weak_factory_(this) {
441 proxy_thread_.StartWithOptions( 401 proxy_thread_.StartWithOptions(
442 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 402 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
443 base::WaitableEvent start_event(false, false); 403 base::WaitableEvent start_event(false, false);
444 proxy_thread_.message_loop_proxy()->PostTask( 404 proxy_thread_.message_loop_proxy()->PostTask(
445 FROM_HERE, 405 FROM_HERE,
446 base::Bind(&UDPProxyImpl::Start, 406 base::Bind(&UDPProxyImpl::Start,
447 base::Unretained(this), 407 base::Unretained(this),
448 base::Unretained(&start_event), 408 base::Unretained(&start_event),
449 net_log)); 409 net_log));
450 start_event.Wait(); 410 start_event.Wait();
451 } 411 }
452 412
453 virtual ~UDPProxyImpl() { 413 virtual ~UDPProxyImpl() {
454 base::WaitableEvent stop_event(false, false); 414 base::WaitableEvent stop_event(false, false);
455 proxy_thread_.message_loop_proxy()->PostTask( 415 proxy_thread_.message_loop_proxy()->PostTask(
456 FROM_HERE, 416 FROM_HERE,
457 base::Bind(&UDPProxyImpl::Stop, 417 base::Bind(&UDPProxyImpl::Stop,
458 base::Unretained(this), 418 base::Unretained(this),
459 base::Unretained(&stop_event))); 419 base::Unretained(&stop_event)));
460 stop_event.Wait(); 420 stop_event.Wait();
461 proxy_thread_.Stop(); 421 proxy_thread_.Stop();
462 } 422 }
463 423
424 void Send(scoped_ptr<transport::Packet> packet,
425 const net::IPEndPoint& destination) {
426 if (blocked_) {
427 LOG(ERROR) << "Cannot write packet right now: blocked";
428 return;
429 }
430
431 VLOG(1) << "Sending packet, len = " << packet->size();
432 // We ignore all problems, callbacks and errors.
433 // If it didn't work we just drop the packet at and call it a day.
434 scoped_refptr<net::IOBuffer> buf =
435 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
436 size_t buf_size = packet->size();
437 int result;
438 if (destination.address().empty()) {
439 VLOG(1) << "Destination has not been set yet.";
440 result = net::ERR_INVALID_ARGUMENT;
441 } else {
442 VLOG(1) << "Destination:" << destination.ToString();
443 result = socket_->SendTo(buf,
444 static_cast<int>(buf_size),
445 destination,
446 base::Bind(&UDPProxyImpl::AllowWrite,
447 weak_factory_.GetWeakPtr(),
448 buf,
449 base::Passed(&packet)));
450 }
451 if (result == net::ERR_IO_PENDING) {
452 blocked_ = true;
453 } else if (result < 0) {
454 LOG(ERROR) << "Failed to write packet.";
455 }
456 }
457
464 private: 458 private:
465 void Start(base::WaitableEvent* start_event, 459 void Start(base::WaitableEvent* start_event,
466 net::NetLog* net_log) { 460 net::NetLog* net_log) {
467 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, 461 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
468 net::RandIntCallback(), 462 net::RandIntCallback(),
469 net_log, 463 net_log,
470 net::NetLog::Source())); 464 net::NetLog::Source()));
471 BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_)); 465 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
472 BuildPipe(&from_dest_pipe_, 466 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
473 new PacketSender(socket_.get(), &return_address_));
474 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 467 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
475 &tick_clock_); 468 &tick_clock_);
476 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 469 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
477 &tick_clock_); 470 &tick_clock_);
478 471
479 VLOG(0) << "From:" << local_port_.ToString(); 472 VLOG(0) << "From:" << local_port_.ToString();
480 if (!destination_is_mutable_) 473 if (!destination_is_mutable_)
481 VLOG(0) << "To:" << destination_.ToString(); 474 VLOG(0) << "To:" << destination_.ToString();
482 475
483 CHECK_GE(socket_->Bind(local_port_), 0); 476 CHECK_GE(socket_->Bind(local_port_), 0);
(...skipping 10 matching lines...) Expand all
494 } 487 }
495 488
496 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { 489 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
497 DCHECK_NE(len, net::ERR_IO_PENDING); 490 DCHECK_NE(len, net::ERR_IO_PENDING);
498 VLOG(1) << "Got packet, len = " << len; 491 VLOG(1) << "Got packet, len = " << len;
499 if (len < 0) { 492 if (len < 0) {
500 LOG(WARNING) << "Socket read error: " << len; 493 LOG(WARNING) << "Socket read error: " << len;
501 return; 494 return;
502 } 495 }
503 packet_->resize(len); 496 packet_->resize(len);
504 if (destination_is_mutable_ && 497 if (destination_is_mutable_ && set_destination_next_ &&
505 !(recv_address_ == return_address_) && 498 !(recv_address_ == return_address_) &&
506 !(recv_address_ == destination_)) { 499 !(recv_address_ == destination_)) {
507 destination_ = recv_address_; 500 destination_ = recv_address_;
508 } 501 }
509 if (recv_address_ == destination_) { 502 if (recv_address_ == destination_) {
503 set_destination_next_ = false;
510 from_dest_pipe_->Send(packet_.Pass()); 504 from_dest_pipe_->Send(packet_.Pass());
511 } else { 505 } else {
506 set_destination_next_ = true;
512 VLOG(1) << "Return address = " << recv_address_.ToString(); 507 VLOG(1) << "Return address = " << recv_address_.ToString();
513 return_address_ = recv_address_; 508 return_address_ = recv_address_;
514 to_dest_pipe_->Send(packet_.Pass()); 509 to_dest_pipe_->Send(packet_.Pass());
515 } 510 }
516 } 511 }
517 512
518 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { 513 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
519 ProcessPacket(recv_buf, len); 514 ProcessPacket(recv_buf, len);
520 PollRead(); 515 PollRead();
521 } 516 }
522 517
523 void PollRead() { 518 void PollRead() {
524 while (true) { 519 while (true) {
525 packet_.reset(new transport::Packet(kMaxPacketSize)); 520 packet_.reset(new transport::Packet(kMaxPacketSize));
526 scoped_refptr<net::IOBuffer> recv_buf = 521 scoped_refptr<net::IOBuffer> recv_buf =
527 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); 522 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
528 int len = socket_->RecvFrom( 523 int len = socket_->RecvFrom(
529 recv_buf, 524 recv_buf,
530 kMaxPacketSize, 525 kMaxPacketSize,
531 &recv_address_, 526 &recv_address_,
532 base::Bind(&UDPProxyImpl::ReadCallback, 527 base::Bind(&UDPProxyImpl::ReadCallback,
533 base::Unretained(this), 528 base::Unretained(this),
534 recv_buf)); 529 recv_buf));
535 if (len == net::ERR_IO_PENDING) 530 if (len == net::ERR_IO_PENDING)
536 break; 531 break;
537 ProcessPacket(recv_buf, len); 532 ProcessPacket(recv_buf, len);
538 } 533 }
539 } 534 }
540 535
536 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
537 scoped_ptr<transport::Packet> packet,
538 int unused_len) {
539 DCHECK(blocked_);
540 blocked_ = false;
541 }
542
543 // Input
544 net::IPEndPoint local_port_;
545
546 net::IPEndPoint destination_;
547 bool destination_is_mutable_;
548
549 net::IPEndPoint return_address_;
550 bool set_destination_next_;
541 551
542 base::DefaultTickClock tick_clock_; 552 base::DefaultTickClock tick_clock_;
543 net::IPEndPoint local_port_;
544 net::IPEndPoint destination_;
545 bool destination_is_mutable_;
546 net::IPEndPoint recv_address_;
547 net::IPEndPoint return_address_;
548 base::Thread proxy_thread_; 553 base::Thread proxy_thread_;
549 scoped_ptr<net::UDPSocket> socket_; 554 scoped_ptr<net::UDPSocket> socket_;
550 scoped_ptr<PacketPipe> to_dest_pipe_; 555 scoped_ptr<PacketPipe> to_dest_pipe_;
551 scoped_ptr<PacketPipe> from_dest_pipe_; 556 scoped_ptr<PacketPipe> from_dest_pipe_;
557
558 // For receiving.
559 net::IPEndPoint recv_address_;
552 scoped_ptr<transport::Packet> packet_; 560 scoped_ptr<transport::Packet> packet_;
561
562 // For sending.
563 bool blocked_;
564
565 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
553 }; 566 };
554 567
568 void PacketSender::Send(scoped_ptr<transport::Packet> packet) {
569 udp_proxy_->Send(packet.Pass(), *destination_);
570 }
571
555 scoped_ptr<UDPProxy> UDPProxy::Create( 572 scoped_ptr<UDPProxy> UDPProxy::Create(
556 const net::IPEndPoint& local_port, 573 const net::IPEndPoint& local_port,
557 const net::IPEndPoint& destination, 574 const net::IPEndPoint& destination,
558 scoped_ptr<PacketPipe> to_dest_pipe, 575 scoped_ptr<PacketPipe> to_dest_pipe,
559 scoped_ptr<PacketPipe> from_dest_pipe, 576 scoped_ptr<PacketPipe> from_dest_pipe,
560 net::NetLog* net_log) { 577 net::NetLog* net_log) {
561 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, 578 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
562 destination, 579 destination,
563 to_dest_pipe.Pass(), 580 to_dest_pipe.Pass(),
564 from_dest_pipe.Pass(), 581 from_dest_pipe.Pass(),
565 net_log)); 582 net_log));
566 return ret.Pass(); 583 return ret.Pass();
567 } 584 }
568 585
569 } // namespace test 586 } // namespace test
570 } // namespace cast 587 } // namespace cast
571 } // namespace media 588 } // namespace media
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698