OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "media/cast/test/utility/udp_proxy.h" | 5 #include "media/cast/test/utility/udp_proxy.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "base/memory/linked_ptr.h" | 8 #include "base/memory/linked_ptr.h" |
9 #include "base/rand_util.h" | 9 #include "base/rand_util.h" |
10 #include "base/synchronization/waitable_event.h" | 10 #include "base/synchronization/waitable_event.h" |
(...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; | 298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; |
299 }; | 299 }; |
300 | 300 |
301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, | 301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, |
302 double average_outage_time) { | 302 double average_outage_time) { |
303 return scoped_ptr<PacketPipe>( | 303 return scoped_ptr<PacketPipe>( |
304 new NetworkGlitchPipe(average_work_time, average_outage_time)) | 304 new NetworkGlitchPipe(average_work_time, average_outage_time)) |
305 .Pass(); | 305 .Pass(); |
306 } | 306 } |
307 | 307 |
| 308 class UDPProxyImpl; |
| 309 |
308 class PacketSender : public PacketPipe { | 310 class PacketSender : public PacketPipe { |
309 public: | 311 public: |
310 PacketSender(net::UDPSocket* udp_socket, | 312 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) |
311 const net::IPEndPoint* destination) : | 313 : udp_proxy_(udp_proxy), destination_(destination) {} |
312 blocked_(false), | 314 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE; |
313 udp_socket_(udp_socket), | |
314 destination_(destination), | |
315 weak_factory_(this) { | |
316 } | |
317 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { | |
318 if (blocked_) { | |
319 LOG(ERROR) << "Cannot write packet right now: blocked"; | |
320 return; | |
321 } | |
322 | |
323 VLOG(1) << "Sending packet, len = " << packet->size(); | |
324 // We ignore all problems, callbacks and errors. | |
325 // If it didn't work we just drop the packet at and call it a day. | |
326 scoped_refptr<net::IOBuffer> buf = | |
327 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front())); | |
328 size_t buf_size = packet->size(); | |
329 int result; | |
330 if (destination_->address().empty()) { | |
331 VLOG(1) << "Destination has not been set yet."; | |
332 result = net::ERR_INVALID_ARGUMENT; | |
333 } else { | |
334 VLOG(1) << "Destination:" << destination_->ToString(); | |
335 result = udp_socket_->SendTo(buf, | |
336 static_cast<int>(buf_size), | |
337 *destination_, | |
338 base::Bind(&PacketSender::AllowWrite, | |
339 weak_factory_.GetWeakPtr(), | |
340 buf, | |
341 base::Passed(&packet))); | |
342 } | |
343 if (result == net::ERR_IO_PENDING) { | |
344 blocked_ = true; | |
345 } else if (result < 0) { | |
346 LOG(ERROR) << "Failed to write packet."; | |
347 } | |
348 } | |
349 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { | 315 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { |
350 NOTREACHED(); | 316 NOTREACHED(); |
351 } | 317 } |
352 | 318 |
353 private: | 319 private: |
354 void AllowWrite(scoped_refptr<net::IOBuffer> buf, | 320 UDPProxyImpl* udp_proxy_; |
355 scoped_ptr<transport::Packet> packet, | |
356 int unused_len) { | |
357 DCHECK(blocked_); | |
358 blocked_ = false; | |
359 } | |
360 bool blocked_; | |
361 net::UDPSocket* udp_socket_; | |
362 const net::IPEndPoint* destination_; // not owned | 321 const net::IPEndPoint* destination_; // not owned |
363 base::WeakPtrFactory<PacketSender> weak_factory_; | |
364 }; | 322 }; |
365 | 323 |
366 namespace { | 324 namespace { |
367 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { | 325 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { |
368 if (*pipe) { | 326 if (*pipe) { |
369 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); | 327 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); |
370 } else { | 328 } else { |
371 pipe->reset(next); | 329 pipe->reset(next); |
372 } | 330 } |
373 } | 331 } |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
424 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s | 382 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s |
425 return pipe.Pass(); | 383 return pipe.Pass(); |
426 } | 384 } |
427 | 385 |
428 class UDPProxyImpl : public UDPProxy { | 386 class UDPProxyImpl : public UDPProxy { |
429 public: | 387 public: |
430 UDPProxyImpl(const net::IPEndPoint& local_port, | 388 UDPProxyImpl(const net::IPEndPoint& local_port, |
431 const net::IPEndPoint& destination, | 389 const net::IPEndPoint& destination, |
432 scoped_ptr<PacketPipe> to_dest_pipe, | 390 scoped_ptr<PacketPipe> to_dest_pipe, |
433 scoped_ptr<PacketPipe> from_dest_pipe, | 391 scoped_ptr<PacketPipe> from_dest_pipe, |
434 net::NetLog* net_log) : | 392 net::NetLog* net_log) |
435 local_port_(local_port), | 393 : local_port_(local_port), |
436 destination_(destination), | 394 destination_(destination), |
437 destination_is_mutable_(destination.address().empty()), | 395 destination_is_mutable_(destination.address().empty()), |
438 proxy_thread_("media::cast::test::UdpProxy Thread"), | 396 proxy_thread_("media::cast::test::UdpProxy Thread"), |
439 to_dest_pipe_(to_dest_pipe.Pass()), | 397 to_dest_pipe_(to_dest_pipe.Pass()), |
440 from_dest_pipe_(from_dest_pipe.Pass()) { | 398 from_dest_pipe_(from_dest_pipe.Pass()), |
| 399 blocked_(false), |
| 400 weak_factory_(this) { |
441 proxy_thread_.StartWithOptions( | 401 proxy_thread_.StartWithOptions( |
442 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); | 402 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); |
443 base::WaitableEvent start_event(false, false); | 403 base::WaitableEvent start_event(false, false); |
444 proxy_thread_.message_loop_proxy()->PostTask( | 404 proxy_thread_.message_loop_proxy()->PostTask( |
445 FROM_HERE, | 405 FROM_HERE, |
446 base::Bind(&UDPProxyImpl::Start, | 406 base::Bind(&UDPProxyImpl::Start, |
447 base::Unretained(this), | 407 base::Unretained(this), |
448 base::Unretained(&start_event), | 408 base::Unretained(&start_event), |
449 net_log)); | 409 net_log)); |
450 start_event.Wait(); | 410 start_event.Wait(); |
451 } | 411 } |
452 | 412 |
453 virtual ~UDPProxyImpl() { | 413 virtual ~UDPProxyImpl() { |
454 base::WaitableEvent stop_event(false, false); | 414 base::WaitableEvent stop_event(false, false); |
455 proxy_thread_.message_loop_proxy()->PostTask( | 415 proxy_thread_.message_loop_proxy()->PostTask( |
456 FROM_HERE, | 416 FROM_HERE, |
457 base::Bind(&UDPProxyImpl::Stop, | 417 base::Bind(&UDPProxyImpl::Stop, |
458 base::Unretained(this), | 418 base::Unretained(this), |
459 base::Unretained(&stop_event))); | 419 base::Unretained(&stop_event))); |
460 stop_event.Wait(); | 420 stop_event.Wait(); |
461 proxy_thread_.Stop(); | 421 proxy_thread_.Stop(); |
462 } | 422 } |
463 | 423 |
| 424 void Send(scoped_ptr<transport::Packet> packet, |
| 425 const net::IPEndPoint& destination) { |
| 426 if (blocked_) { |
| 427 LOG(ERROR) << "Cannot write packet right now: blocked"; |
| 428 return; |
| 429 } |
| 430 |
| 431 VLOG(1) << "Sending packet, len = " << packet->size(); |
| 432 // We ignore all problems, callbacks and errors. |
| 433 // If it didn't work we just drop the packet at and call it a day. |
| 434 scoped_refptr<net::IOBuffer> buf = |
| 435 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front())); |
| 436 size_t buf_size = packet->size(); |
| 437 int result; |
| 438 if (destination.address().empty()) { |
| 439 VLOG(1) << "Destination has not been set yet."; |
| 440 result = net::ERR_INVALID_ARGUMENT; |
| 441 } else { |
| 442 VLOG(1) << "Destination:" << destination.ToString(); |
| 443 result = socket_->SendTo(buf, |
| 444 static_cast<int>(buf_size), |
| 445 destination, |
| 446 base::Bind(&UDPProxyImpl::AllowWrite, |
| 447 weak_factory_.GetWeakPtr(), |
| 448 buf, |
| 449 base::Passed(&packet))); |
| 450 } |
| 451 if (result == net::ERR_IO_PENDING) { |
| 452 blocked_ = true; |
| 453 } else if (result < 0) { |
| 454 LOG(ERROR) << "Failed to write packet."; |
| 455 } |
| 456 } |
| 457 |
464 private: | 458 private: |
465 void Start(base::WaitableEvent* start_event, | 459 void Start(base::WaitableEvent* start_event, |
466 net::NetLog* net_log) { | 460 net::NetLog* net_log) { |
467 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, | 461 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, |
468 net::RandIntCallback(), | 462 net::RandIntCallback(), |
469 net_log, | 463 net_log, |
470 net::NetLog::Source())); | 464 net::NetLog::Source())); |
471 BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_)); | 465 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_)); |
472 BuildPipe(&from_dest_pipe_, | 466 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_)); |
473 new PacketSender(socket_.get(), &return_address_)); | |
474 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), | 467 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), |
475 &tick_clock_); | 468 &tick_clock_); |
476 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), | 469 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), |
477 &tick_clock_); | 470 &tick_clock_); |
478 | 471 |
479 VLOG(0) << "From:" << local_port_.ToString(); | 472 VLOG(0) << "From:" << local_port_.ToString(); |
480 if (!destination_is_mutable_) | 473 if (!destination_is_mutable_) |
481 VLOG(0) << "To:" << destination_.ToString(); | 474 VLOG(0) << "To:" << destination_.ToString(); |
482 | 475 |
483 CHECK_GE(socket_->Bind(local_port_), 0); | 476 CHECK_GE(socket_->Bind(local_port_), 0); |
(...skipping 10 matching lines...) Expand all Loading... |
494 } | 487 } |
495 | 488 |
496 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { | 489 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { |
497 DCHECK_NE(len, net::ERR_IO_PENDING); | 490 DCHECK_NE(len, net::ERR_IO_PENDING); |
498 VLOG(1) << "Got packet, len = " << len; | 491 VLOG(1) << "Got packet, len = " << len; |
499 if (len < 0) { | 492 if (len < 0) { |
500 LOG(WARNING) << "Socket read error: " << len; | 493 LOG(WARNING) << "Socket read error: " << len; |
501 return; | 494 return; |
502 } | 495 } |
503 packet_->resize(len); | 496 packet_->resize(len); |
504 if (destination_is_mutable_ && | 497 if (destination_is_mutable_ && set_destination_next_ && |
505 !(recv_address_ == return_address_) && | 498 !(recv_address_ == return_address_) && |
506 !(recv_address_ == destination_)) { | 499 !(recv_address_ == destination_)) { |
507 destination_ = recv_address_; | 500 destination_ = recv_address_; |
508 } | 501 } |
509 if (recv_address_ == destination_) { | 502 if (recv_address_ == destination_) { |
| 503 set_destination_next_ = false; |
510 from_dest_pipe_->Send(packet_.Pass()); | 504 from_dest_pipe_->Send(packet_.Pass()); |
511 } else { | 505 } else { |
| 506 set_destination_next_ = true; |
512 VLOG(1) << "Return address = " << recv_address_.ToString(); | 507 VLOG(1) << "Return address = " << recv_address_.ToString(); |
513 return_address_ = recv_address_; | 508 return_address_ = recv_address_; |
514 to_dest_pipe_->Send(packet_.Pass()); | 509 to_dest_pipe_->Send(packet_.Pass()); |
515 } | 510 } |
516 } | 511 } |
517 | 512 |
518 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { | 513 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { |
519 ProcessPacket(recv_buf, len); | 514 ProcessPacket(recv_buf, len); |
520 PollRead(); | 515 PollRead(); |
521 } | 516 } |
522 | 517 |
523 void PollRead() { | 518 void PollRead() { |
524 while (true) { | 519 while (true) { |
525 packet_.reset(new transport::Packet(kMaxPacketSize)); | 520 packet_.reset(new transport::Packet(kMaxPacketSize)); |
526 scoped_refptr<net::IOBuffer> recv_buf = | 521 scoped_refptr<net::IOBuffer> recv_buf = |
527 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); | 522 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); |
528 int len = socket_->RecvFrom( | 523 int len = socket_->RecvFrom( |
529 recv_buf, | 524 recv_buf, |
530 kMaxPacketSize, | 525 kMaxPacketSize, |
531 &recv_address_, | 526 &recv_address_, |
532 base::Bind(&UDPProxyImpl::ReadCallback, | 527 base::Bind(&UDPProxyImpl::ReadCallback, |
533 base::Unretained(this), | 528 base::Unretained(this), |
534 recv_buf)); | 529 recv_buf)); |
535 if (len == net::ERR_IO_PENDING) | 530 if (len == net::ERR_IO_PENDING) |
536 break; | 531 break; |
537 ProcessPacket(recv_buf, len); | 532 ProcessPacket(recv_buf, len); |
538 } | 533 } |
539 } | 534 } |
540 | 535 |
| 536 void AllowWrite(scoped_refptr<net::IOBuffer> buf, |
| 537 scoped_ptr<transport::Packet> packet, |
| 538 int unused_len) { |
| 539 DCHECK(blocked_); |
| 540 blocked_ = false; |
| 541 } |
| 542 |
| 543 // Input |
| 544 net::IPEndPoint local_port_; |
| 545 |
| 546 net::IPEndPoint destination_; |
| 547 bool destination_is_mutable_; |
| 548 |
| 549 net::IPEndPoint return_address_; |
| 550 bool set_destination_next_; |
541 | 551 |
542 base::DefaultTickClock tick_clock_; | 552 base::DefaultTickClock tick_clock_; |
543 net::IPEndPoint local_port_; | |
544 net::IPEndPoint destination_; | |
545 bool destination_is_mutable_; | |
546 net::IPEndPoint recv_address_; | |
547 net::IPEndPoint return_address_; | |
548 base::Thread proxy_thread_; | 553 base::Thread proxy_thread_; |
549 scoped_ptr<net::UDPSocket> socket_; | 554 scoped_ptr<net::UDPSocket> socket_; |
550 scoped_ptr<PacketPipe> to_dest_pipe_; | 555 scoped_ptr<PacketPipe> to_dest_pipe_; |
551 scoped_ptr<PacketPipe> from_dest_pipe_; | 556 scoped_ptr<PacketPipe> from_dest_pipe_; |
| 557 |
| 558 // For receiving. |
| 559 net::IPEndPoint recv_address_; |
552 scoped_ptr<transport::Packet> packet_; | 560 scoped_ptr<transport::Packet> packet_; |
| 561 |
| 562 // For sending. |
| 563 bool blocked_; |
| 564 |
| 565 base::WeakPtrFactory<UDPProxyImpl> weak_factory_; |
553 }; | 566 }; |
554 | 567 |
| 568 void PacketSender::Send(scoped_ptr<transport::Packet> packet) { |
| 569 udp_proxy_->Send(packet.Pass(), *destination_); |
| 570 } |
| 571 |
555 scoped_ptr<UDPProxy> UDPProxy::Create( | 572 scoped_ptr<UDPProxy> UDPProxy::Create( |
556 const net::IPEndPoint& local_port, | 573 const net::IPEndPoint& local_port, |
557 const net::IPEndPoint& destination, | 574 const net::IPEndPoint& destination, |
558 scoped_ptr<PacketPipe> to_dest_pipe, | 575 scoped_ptr<PacketPipe> to_dest_pipe, |
559 scoped_ptr<PacketPipe> from_dest_pipe, | 576 scoped_ptr<PacketPipe> from_dest_pipe, |
560 net::NetLog* net_log) { | 577 net::NetLog* net_log) { |
561 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, | 578 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, |
562 destination, | 579 destination, |
563 to_dest_pipe.Pass(), | 580 to_dest_pipe.Pass(), |
564 from_dest_pipe.Pass(), | 581 from_dest_pipe.Pass(), |
565 net_log)); | 582 net_log)); |
566 return ret.Pass(); | 583 return ret.Pass(); |
567 } | 584 } |
568 | 585 |
569 } // namespace test | 586 } // namespace test |
570 } // namespace cast | 587 } // namespace cast |
571 } // namespace media | 588 } // namespace media |
OLD | NEW |