Chromium Code Reviews| Index: media/cast/transport/transport/transport.cc |
| diff --git a/media/cast/transport/transport/transport.cc b/media/cast/transport/transport/transport.cc |
| index dfa155a4c72cad8e3cb46f3117e50ae0e5aff560..3050e8b5d1a61f0a3a565f6361b59dd6c3632397 100644 |
| --- a/media/cast/transport/transport/transport.cc |
| +++ b/media/cast/transport/transport/transport.cc |
| @@ -4,6 +4,7 @@ |
| #include "media/cast/transport/transport/transport.h" |
| +#include <algorithm> |
| #include <string> |
| #include "base/bind.h" |
| @@ -20,184 +21,131 @@ namespace media { |
| namespace cast { |
| namespace transport { |
| +namespace { |
| const int kMaxPacketSize = 1500; |
| -class LocalUdpTransportData; |
| - |
| -void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) { |
| - net::IPAddressNumber ip_number; |
| - bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number); |
| - if (!rv) |
| - return; |
| - *address = net::IPEndPoint(ip_number, port); |
| +bool IsEmpty(const net::IPEndPoint& addr) { |
| + net::IPAddressNumber empty_addr(addr.address().size()); |
| + return std::equal(empty_addr.begin(), |
| + empty_addr.end(), |
| + addr.address().begin()); |
| } |
| -class LocalUdpTransportData |
| - : public base::RefCountedThreadSafe<LocalUdpTransportData> { |
| - public: |
| - LocalUdpTransportData(net::UDPServerSocket* udp_socket, |
| - scoped_refptr<base::TaskRunner> io_thread_proxy) |
| - : udp_socket_(udp_socket), |
| - buffer_(new net::IOBufferWithSize(kMaxPacketSize)), |
| - io_thread_proxy_(io_thread_proxy) { |
| - } |
| - |
| - void ListenTo(net::IPEndPoint bind_address) { |
| - DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| +bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) { |
| + return addr1.port() == addr2.port() && |
| + std::equal(addr1.address().begin(), |
| + addr1.address().end(), |
| + addr2.address().begin()); |
| +} |
| - bind_address_ = bind_address; |
| - io_thread_proxy_->PostTask(FROM_HERE, |
| - base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this)); |
| - } |
| +} // namespace |
| + |
| +UdpTransport::UdpTransport( |
| + const scoped_refptr<base::TaskRunner>& io_thread_proxy, |
| + const net::IPEndPoint& local_end_point, |
| + const net::IPEndPoint& remote_end_point) |
| + : io_thread_proxy_(io_thread_proxy), |
| + local_addr_(local_end_point), |
| + remote_addr_(remote_end_point), |
| + udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), |
| + recv_buf_(new net::IOBuffer(kMaxPacketSize)), |
| + packet_receiver_(NULL), |
| + weak_factory_(this) { |
| +} |
| - void DeletePacket(uint8* data) { |
| - // Should be called from the receiver (not on the transport thread). |
| - DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread())); |
| - delete [] data; |
| - } |
| +UdpTransport::~UdpTransport() { |
| +} |
| - void PacketReceived(int size) { |
| - DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - // Got a packet with length result. |
| - uint8* data = new uint8[size]; |
| - memcpy(data, buffer_->data(), size); |
| - packet_receiver_->ReceivedPacket(data, size, |
| - base::Bind(&LocalUdpTransportData::DeletePacket, this, data)); |
| - RecvFromSocketLoop(); |
| +void UdpTransport::StartReceiving(PacketReceiver* packet_receiver) { |
| + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| + DCHECK(!packet_receiver_); |
| - } |
| + packet_receiver_ = packet_receiver; |
| + udp_socket_->AllowAddressReuse(); |
| + udp_socket_->SetMulticastLoopbackMode(true); |
| + udp_socket_->Listen(local_addr_); |
| + ReceiveOnePacket(); |
| +} |
| - void RecvFromSocketLoop() { |
| - DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - // Callback should always trigger with a packet. |
| - int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize, |
| - &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived, |
| - this)); |
| - DCHECK(res >= net::ERR_IO_PENDING); |
| - if (res > 0) { |
| - PacketReceived(res); |
| - } |
| - } |
| +void UdpTransport::ReceiveOnePacket() { |
| + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - void set_packet_receiver(PacketReceiver* packet_receiver) { |
| - packet_receiver_ = packet_receiver; |
| + int result = udp_socket_->RecvFrom( |
| + recv_buf_, |
| + kMaxPacketSize, |
| + &recv_addr_, |
| + base::Bind(&UdpTransport::OnReceived, |
| + weak_factory_.GetWeakPtr())); |
|
mikhal1
2014/01/07 20:50:06
nit:Can't this be in one line?
Alpha Left Google
2014/01/08 00:37:11
Done.
|
| + if (result > 0) { |
| + OnReceived(result); |
| + } else if (result != net::ERR_IO_PENDING) { |
| + LOG(ERROR) << "Failed to receive packet: " << result << "." |
| + << " Stop receiving packets."; |
| } |
| +} |
| - void Close() { |
| - udp_socket_->Close(); |
| - } |
| +void UdpTransport::OnReceived(int result) { |
| + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - protected: |
| - virtual ~LocalUdpTransportData() {} |
| - |
| - private: |
| - friend class base::RefCountedThreadSafe<LocalUdpTransportData>; |
| - |
| - net::UDPServerSocket* udp_socket_; |
| - net::IPEndPoint bind_address_; |
| - PacketReceiver* packet_receiver_; |
| - scoped_refptr<net::IOBufferWithSize> buffer_; |
| - scoped_refptr<base::TaskRunner> io_thread_proxy_; |
| - |
| - DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData); |
| -}; |
| - |
| -class LocalPacketSender : public PacketSender, |
| - public base::RefCountedThreadSafe<LocalPacketSender> { |
| - public: |
| - LocalPacketSender(net::UDPServerSocket* udp_socket, |
| - scoped_refptr<base::TaskRunner> io_thread_proxy) |
| - : udp_socket_(udp_socket), |
| - send_address_(), |
| - io_thread_proxy_(io_thread_proxy) {} |
| - |
| - virtual bool SendPacket(const Packet& packet) OVERRIDE { |
| - io_thread_proxy_->PostTask(FROM_HERE, |
| - base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet)); |
| - return true; |
| + if (result < 0) { |
| + LOG(ERROR) << "Failed to receive packet: " << result << "." |
| + << " Stop receiving packets."; |
| + return; |
| } |
| - virtual void SendPacketToNetwork(const Packet& packet) { |
| - DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - const uint8* data = &packet[0]; |
| - scoped_refptr<net::WrappedIOBuffer> buffer( |
| - new net::WrappedIOBuffer(reinterpret_cast<const char*>(data))); |
| - udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()), |
| - send_address_, |
| - base::Bind(&LocalPacketSender::OnSendCompleted, |
| - base::Unretained(this))); |
| + if (IsEmpty(remote_addr_)) { |
| + remote_addr_ = recv_addr_; |
| + VLOG(1) << "First packet received from: " |
| + << remote_addr_.ToString() << "."; |
| + } else if (!IsEqual(remote_addr_, recv_addr_)) { |
| + VLOG(1) << "Received from an unrecognized address: " |
| + << recv_addr_.ToString() << "."; |
| + return; |
| } |
| - virtual void OnSendCompleted(int result) { |
| - if (result < 0) { |
| - // TODO(mikhal): Add to error messages. |
| - VLOG(0) << "Send failed on UDP socket : " << result; |
| - } |
| - } |
| + // TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy. |
| + uint8* data = new uint8[result]; |
| + memcpy(data, recv_buf_->data(), result); |
| + packet_receiver_->ReceivedPacket( |
| + data, |
| + result, |
| + base::Bind(&PacketReceiver::DeletePacket, data)); |
| + ReceiveOnePacket(); |
| +} |
| - virtual bool SendPackets(const PacketList& packets) OVERRIDE { |
| - bool out_val = true; |
| - for (size_t i = 0; i < packets.size(); ++i) { |
| - const Packet& packet = packets[i]; |
| - out_val |= SendPacket(packet); |
| - } |
| - return out_val; |
| - } |
| +bool UdpTransport::SendPackets(const PacketList& packets) { |
| + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - void SetSendAddress(const net::IPEndPoint& send_address) { |
| - send_address_ = send_address; |
| + bool result = true; |
| + for (size_t i = 0; i < packets.size(); ++i) { |
| + result |= SendPacket(packets[i]); |
| } |
| - |
| - protected: |
| - virtual ~LocalPacketSender() {} |
| - |
| - private: |
| - friend class base::RefCountedThreadSafe<LocalPacketSender>; |
| - |
| - net::UDPServerSocket* udp_socket_; // Not owned by this class. |
| - net::IPEndPoint send_address_; |
| - scoped_refptr<base::TaskRunner> io_thread_proxy_; |
| -}; |
| - |
| -Transport::Transport( |
| - scoped_refptr<base::TaskRunner> io_thread_proxy) |
| - : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), |
| - local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(), |
| - io_thread_proxy)), |
| - packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)), |
| - io_thread_proxy_(io_thread_proxy) {} |
| - |
| -Transport::~Transport() {} |
| - |
| -PacketSender* Transport::packet_sender() { |
| - return static_cast<PacketSender*>(packet_sender_.get()); |
| + return result; |
| } |
| -void Transport::StopReceiving() { |
| - local_udp_transport_data_->Close(); |
| -} |
| - |
| -void Transport::SetLocalReceiver(PacketReceiver* packet_receiver, |
| - std::string ip_address, |
| - std::string local_ip_address, |
| - int port) { |
| +bool UdpTransport::SendPacket(const Packet& packet) { |
| DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| - net::IPEndPoint bind_address, local_bind_address; |
| - CreateUDPAddress(ip_address, port, &bind_address); |
| - CreateUDPAddress(local_ip_address, port, &local_bind_address); |
| - local_udp_transport_data_->set_packet_receiver(packet_receiver); |
| - udp_socket_->AllowAddressReuse(); |
| - udp_socket_->SetMulticastLoopbackMode(true); |
| - udp_socket_->Listen(local_bind_address); |
| - // Start listening once receiver has been set. |
| - local_udp_transport_data_->ListenTo(bind_address); |
| + // TODO(hclam): This interface should take a net::IOBuffer to minimize |
| + // memcpy. |
| + scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(packet.size()); |
| + memcpy(buf->data(), &packet[0], packet.size()); |
| + bool ret = udp_socket_->SendTo( |
| + buf, |
| + static_cast<int>(packet.size()), |
| + remote_addr_, |
| + base::Bind(&UdpTransport::OnSent, |
| + weak_factory_.GetWeakPtr(), buf)); |
| + return ret == net::OK; |
| } |
| -void Transport::SetSendDestination(std::string ip_address, int port) { |
| - net::IPEndPoint send_address; |
| - CreateUDPAddress(ip_address, port, &send_address); |
| - packet_sender_->SetSendAddress(send_address); |
| +void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, |
| + int result) { |
| + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| + |
| + if (result < 0) { |
| + VLOG(1) << "Failed to send packet: " << result << "."; |
| + } |
| } |
| } // namespace transport |