Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(104)

Side by Side Diff: media/cast/test/utility/udp_proxy.cc

Issue 282333003: Cast: Make the udp proxy actually work for v1 mirroring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tested and works Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/cast/test/utility/udp_proxy.h" 5 #include "media/cast/test/utility/udp_proxy.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h" 8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h" 10 #include "base/synchronization/waitable_event.h"
(...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; 298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
299 }; 299 };
300 300
301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, 301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
302 double average_outage_time) { 302 double average_outage_time) {
303 return scoped_ptr<PacketPipe>( 303 return scoped_ptr<PacketPipe>(
304 new NetworkGlitchPipe(average_work_time, average_outage_time)) 304 new NetworkGlitchPipe(average_work_time, average_outage_time))
305 .Pass(); 305 .Pass();
306 } 306 }
307 307
308 class UDPProxyImpl;
309
308 class PacketSender : public PacketPipe { 310 class PacketSender : public PacketPipe {
309 public: 311 public:
310 PacketSender(net::UDPSocket* udp_socket, 312 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
311 const net::IPEndPoint* destination) : 313 : udp_proxy_(udp_proxy), destination_(destination) {}
312 blocked_(false), 314 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
313 udp_socket_(udp_socket),
314 destination_(destination),
315 weak_factory_(this) {
316 }
317 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
318 if (blocked_) {
319 LOG(ERROR) << "Cannot write packet right now: blocked";
320 return;
321 }
322
323 VLOG(1) << "Sending packet, len = " << packet->size();
324 // We ignore all problems, callbacks and errors.
325 // If it didn't work we just drop the packet at and call it a day.
326 scoped_refptr<net::IOBuffer> buf =
327 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
328 size_t buf_size = packet->size();
329 int result;
330 if (destination_->address().empty()) {
331 VLOG(1) << "Destination has not been set yet.";
332 result = net::ERR_INVALID_ARGUMENT;
333 } else {
334 VLOG(1) << "Destination:" << destination_->ToString();
335 result = udp_socket_->SendTo(buf,
336 static_cast<int>(buf_size),
337 *destination_,
338 base::Bind(&PacketSender::AllowWrite,
339 weak_factory_.GetWeakPtr(),
340 buf,
341 base::Passed(&packet)));
342 }
343 if (result == net::ERR_IO_PENDING) {
344 blocked_ = true;
345 } else if (result < 0) {
346 LOG(ERROR) << "Failed to write packet.";
347 }
348 }
349 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { 315 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
350 NOTREACHED(); 316 NOTREACHED();
351 } 317 }
352 318
353 private: 319 private:
354 void AllowWrite(scoped_refptr<net::IOBuffer> buf, 320 UDPProxyImpl* udp_proxy_;
355 scoped_ptr<transport::Packet> packet,
356 int unused_len) {
357 DCHECK(blocked_);
358 blocked_ = false;
359 }
360 bool blocked_;
361 net::UDPSocket* udp_socket_;
362 const net::IPEndPoint* destination_; // not owned 321 const net::IPEndPoint* destination_; // not owned
363 base::WeakPtrFactory<PacketSender> weak_factory_;
364 }; 322 };
365 323
366 namespace { 324 namespace {
367 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { 325 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
368 if (*pipe) { 326 if (*pipe) {
369 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); 327 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
370 } else { 328 } else {
371 pipe->reset(next); 329 pipe->reset(next);
372 } 330 }
373 } 331 }
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
424 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 382 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
425 return pipe.Pass(); 383 return pipe.Pass();
426 } 384 }
427 385
428 class UDPProxyImpl : public UDPProxy { 386 class UDPProxyImpl : public UDPProxy {
429 public: 387 public:
430 UDPProxyImpl(const net::IPEndPoint& local_port, 388 UDPProxyImpl(const net::IPEndPoint& local_port,
431 const net::IPEndPoint& destination, 389 const net::IPEndPoint& destination,
432 scoped_ptr<PacketPipe> to_dest_pipe, 390 scoped_ptr<PacketPipe> to_dest_pipe,
433 scoped_ptr<PacketPipe> from_dest_pipe, 391 scoped_ptr<PacketPipe> from_dest_pipe,
434 net::NetLog* net_log) : 392 net::NetLog* net_log)
435 local_port_(local_port), 393 : local_port_(local_port),
436 destination_(destination), 394 destination_(destination),
437 destination_is_mutable_(destination.address().empty()), 395 destination_is_mutable_(destination.address().empty()),
438 proxy_thread_("media::cast::test::UdpProxy Thread"), 396 proxy_thread_("media::cast::test::UdpProxy Thread"),
439 to_dest_pipe_(to_dest_pipe.Pass()), 397 to_dest_pipe_(to_dest_pipe.Pass()),
440 from_dest_pipe_(from_dest_pipe.Pass()) { 398 from_dest_pipe_(from_dest_pipe.Pass()),
399 weak_factory_(this) {
441 proxy_thread_.StartWithOptions( 400 proxy_thread_.StartWithOptions(
442 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 401 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
443 base::WaitableEvent start_event(false, false); 402 base::WaitableEvent start_event(false, false);
444 proxy_thread_.message_loop_proxy()->PostTask( 403 proxy_thread_.message_loop_proxy()->PostTask(
445 FROM_HERE, 404 FROM_HERE,
446 base::Bind(&UDPProxyImpl::Start, 405 base::Bind(&UDPProxyImpl::Start,
447 base::Unretained(this), 406 base::Unretained(this),
448 base::Unretained(&start_event), 407 base::Unretained(&start_event),
449 net_log)); 408 net_log));
450 start_event.Wait(); 409 start_event.Wait();
451 } 410 }
452 411
453 virtual ~UDPProxyImpl() { 412 virtual ~UDPProxyImpl() {
454 base::WaitableEvent stop_event(false, false); 413 base::WaitableEvent stop_event(false, false);
455 proxy_thread_.message_loop_proxy()->PostTask( 414 proxy_thread_.message_loop_proxy()->PostTask(
456 FROM_HERE, 415 FROM_HERE,
457 base::Bind(&UDPProxyImpl::Stop, 416 base::Bind(&UDPProxyImpl::Stop,
458 base::Unretained(this), 417 base::Unretained(this),
459 base::Unretained(&stop_event))); 418 base::Unretained(&stop_event)));
460 stop_event.Wait(); 419 stop_event.Wait();
461 proxy_thread_.Stop(); 420 proxy_thread_.Stop();
462 } 421 }
463 422
423 void Send(scoped_ptr<transport::Packet> packet,
424 const net::IPEndPoint& destination) {
425 if (blocked_) {
426 LOG(ERROR) << "Cannot write packet right now: blocked";
427 return;
428 }
429
430 VLOG(1) << "Sending packet, len = " << packet->size();
431 // We ignore all problems, callbacks and errors.
432 // If it didn't work we just drop the packet at and call it a day.
433 scoped_refptr<net::IOBuffer> buf =
434 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
435 size_t buf_size = packet->size();
436 int result;
437 if (destination.address().empty()) {
438 VLOG(1) << "Destination has not been set yet.";
439 result = net::ERR_INVALID_ARGUMENT;
440 } else {
441 VLOG(1) << "Destination:" << destination.ToString();
442 result = socket_->SendTo(buf,
443 static_cast<int>(buf_size),
444 destination,
445 base::Bind(&UDPProxyImpl::AllowWrite,
446 weak_factory_.GetWeakPtr(),
447 buf,
448 base::Passed(&packet)));
449 }
450 if (result == net::ERR_IO_PENDING) {
451 blocked_ = true;
452 } else if (result < 0) {
453 LOG(ERROR) << "Failed to write packet.";
454 }
455 }
456
464 private: 457 private:
465 void Start(base::WaitableEvent* start_event, 458 void Start(base::WaitableEvent* start_event,
466 net::NetLog* net_log) { 459 net::NetLog* net_log) {
467 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, 460 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
468 net::RandIntCallback(), 461 net::RandIntCallback(),
469 net_log, 462 net_log,
470 net::NetLog::Source())); 463 net::NetLog::Source()));
471 BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_)); 464 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
472 BuildPipe(&from_dest_pipe_, 465 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
473 new PacketSender(socket_.get(), &return_address_));
474 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 466 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
475 &tick_clock_); 467 &tick_clock_);
476 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 468 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
477 &tick_clock_); 469 &tick_clock_);
478 470
479 VLOG(0) << "From:" << local_port_.ToString(); 471 VLOG(0) << "From:" << local_port_.ToString();
480 if (!destination_is_mutable_) 472 if (!destination_is_mutable_)
481 VLOG(0) << "To:" << destination_.ToString(); 473 VLOG(0) << "To:" << destination_.ToString();
482 474
483 CHECK_GE(socket_->Bind(local_port_), 0); 475 CHECK_GE(socket_->Bind(local_port_), 0);
(...skipping 10 matching lines...) Expand all
494 } 486 }
495 487
496 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { 488 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
497 DCHECK_NE(len, net::ERR_IO_PENDING); 489 DCHECK_NE(len, net::ERR_IO_PENDING);
498 VLOG(1) << "Got packet, len = " << len; 490 VLOG(1) << "Got packet, len = " << len;
499 if (len < 0) { 491 if (len < 0) {
500 LOG(WARNING) << "Socket read error: " << len; 492 LOG(WARNING) << "Socket read error: " << len;
501 return; 493 return;
502 } 494 }
503 packet_->resize(len); 495 packet_->resize(len);
504 if (destination_is_mutable_ && 496 if (destination_is_mutable_ && set_destination_next_ &&
505 !(recv_address_ == return_address_) && 497 !(recv_address_ == return_address_) &&
506 !(recv_address_ == destination_)) { 498 !(recv_address_ == destination_)) {
507 destination_ = recv_address_; 499 destination_ = recv_address_;
508 } 500 }
509 if (recv_address_ == destination_) { 501 if (recv_address_ == destination_) {
502 set_destination_next_ = false;
510 from_dest_pipe_->Send(packet_.Pass()); 503 from_dest_pipe_->Send(packet_.Pass());
511 } else { 504 } else {
505 set_destination_next_ = true;
512 VLOG(1) << "Return address = " << recv_address_.ToString(); 506 VLOG(1) << "Return address = " << recv_address_.ToString();
513 return_address_ = recv_address_; 507 return_address_ = recv_address_;
514 to_dest_pipe_->Send(packet_.Pass()); 508 to_dest_pipe_->Send(packet_.Pass());
515 } 509 }
516 } 510 }
517 511
518 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { 512 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
519 ProcessPacket(recv_buf, len); 513 ProcessPacket(recv_buf, len);
520 PollRead(); 514 PollRead();
521 } 515 }
522 516
523 void PollRead() { 517 void PollRead() {
524 while (true) { 518 while (true) {
525 packet_.reset(new transport::Packet(kMaxPacketSize)); 519 packet_.reset(new transport::Packet(kMaxPacketSize));
526 scoped_refptr<net::IOBuffer> recv_buf = 520 scoped_refptr<net::IOBuffer> recv_buf =
527 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); 521 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
528 int len = socket_->RecvFrom( 522 int len = socket_->RecvFrom(
529 recv_buf, 523 recv_buf,
530 kMaxPacketSize, 524 kMaxPacketSize,
531 &recv_address_, 525 &recv_address_,
532 base::Bind(&UDPProxyImpl::ReadCallback, 526 base::Bind(&UDPProxyImpl::ReadCallback,
533 base::Unretained(this), 527 base::Unretained(this),
534 recv_buf)); 528 recv_buf));
535 if (len == net::ERR_IO_PENDING) 529 if (len == net::ERR_IO_PENDING)
536 break; 530 break;
537 ProcessPacket(recv_buf, len); 531 ProcessPacket(recv_buf, len);
538 } 532 }
539 } 533 }
540 534
535 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
536 scoped_ptr<transport::Packet> packet,
537 int unused_len) {
538 DCHECK(blocked_);
539 blocked_ = false;
540 }
541
542 // Input
543 net::IPEndPoint local_port_;
544
545 net::IPEndPoint destination_;
546 bool destination_is_mutable_;
547
548 net::IPEndPoint return_address_;
549 bool set_destination_next_;
541 550
542 base::DefaultTickClock tick_clock_; 551 base::DefaultTickClock tick_clock_;
543 net::IPEndPoint local_port_;
544 net::IPEndPoint destination_;
545 bool destination_is_mutable_;
546 net::IPEndPoint recv_address_;
547 net::IPEndPoint return_address_;
548 base::Thread proxy_thread_; 552 base::Thread proxy_thread_;
549 scoped_ptr<net::UDPSocket> socket_; 553 scoped_ptr<net::UDPSocket> socket_;
550 scoped_ptr<PacketPipe> to_dest_pipe_; 554 scoped_ptr<PacketPipe> to_dest_pipe_;
551 scoped_ptr<PacketPipe> from_dest_pipe_; 555 scoped_ptr<PacketPipe> from_dest_pipe_;
556
557 // For receiving.
558 net::IPEndPoint recv_address_;
552 scoped_ptr<transport::Packet> packet_; 559 scoped_ptr<transport::Packet> packet_;
560
561 // For sending.
562 bool blocked_;
miu 2014/05/23 22:20:43 Need to initialize this to false in ctor.
hubbe 2014/05/23 22:23:48 Done.
563
564 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
553 }; 565 };
554 566
567 void PacketSender::Send(scoped_ptr<transport::Packet> packet) {
568 udp_proxy_->Send(packet.Pass(), *destination_);
569 }
570
555 scoped_ptr<UDPProxy> UDPProxy::Create( 571 scoped_ptr<UDPProxy> UDPProxy::Create(
556 const net::IPEndPoint& local_port, 572 const net::IPEndPoint& local_port,
557 const net::IPEndPoint& destination, 573 const net::IPEndPoint& destination,
558 scoped_ptr<PacketPipe> to_dest_pipe, 574 scoped_ptr<PacketPipe> to_dest_pipe,
559 scoped_ptr<PacketPipe> from_dest_pipe, 575 scoped_ptr<PacketPipe> from_dest_pipe,
560 net::NetLog* net_log) { 576 net::NetLog* net_log) {
561 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, 577 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
562 destination, 578 destination,
563 to_dest_pipe.Pass(), 579 to_dest_pipe.Pass(),
564 from_dest_pipe.Pass(), 580 from_dest_pipe.Pass(),
565 net_log)); 581 net_log));
566 return ret.Pass(); 582 return ret.Pass();
567 } 583 }
568 584
569 } // namespace test 585 } // namespace test
570 } // namespace cast 586 } // namespace cast
571 } // namespace media 587 } // namespace media
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698