| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "media/cast/transport/transport/udp_transport.h" | |
| 6 | |
| 7 #include <algorithm> | |
| 8 #include <string> | |
| 9 | |
| 10 #include "base/bind.h" | |
| 11 #include "base/logging.h" | |
| 12 #include "base/memory/ref_counted.h" | |
| 13 #include "base/memory/scoped_ptr.h" | |
| 14 #include "base/message_loop/message_loop.h" | |
| 15 #include "base/rand_util.h" | |
| 16 #include "net/base/io_buffer.h" | |
| 17 #include "net/base/net_errors.h" | |
| 18 #include "net/base/rand_callback.h" | |
| 19 | |
| 20 namespace media { | |
| 21 namespace cast { | |
| 22 namespace transport { | |
| 23 | |
| 24 namespace { | |
| 25 const int kMaxPacketSize = 1500; | |
| 26 | |
| 27 bool IsEmpty(const net::IPEndPoint& addr) { | |
| 28 net::IPAddressNumber empty_addr(addr.address().size()); | |
| 29 return std::equal( | |
| 30 empty_addr.begin(), empty_addr.end(), addr.address().begin()) && | |
| 31 !addr.port(); | |
| 32 } | |
| 33 | |
| 34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) { | |
| 35 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(), | |
| 36 addr1.address().end(), | |
| 37 addr2.address().begin()); | |
| 38 } | |
| 39 } // namespace | |
| 40 | |
| 41 UdpTransport::UdpTransport( | |
| 42 net::NetLog* net_log, | |
| 43 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy, | |
| 44 const net::IPEndPoint& local_end_point, | |
| 45 const net::IPEndPoint& remote_end_point, | |
| 46 const CastTransportStatusCallback& status_callback) | |
| 47 : io_thread_proxy_(io_thread_proxy), | |
| 48 local_addr_(local_end_point), | |
| 49 remote_addr_(remote_end_point), | |
| 50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, | |
| 51 net::RandIntCallback(), | |
| 52 net_log, | |
| 53 net::NetLog::Source())), | |
| 54 send_pending_(false), | |
| 55 receive_pending_(false), | |
| 56 client_connected_(false), | |
| 57 next_dscp_value_(net::DSCP_NO_CHANGE), | |
| 58 status_callback_(status_callback), | |
| 59 weak_factory_(this) { | |
| 60 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point)); | |
| 61 } | |
| 62 | |
| 63 UdpTransport::~UdpTransport() {} | |
| 64 | |
| 65 void UdpTransport::StartReceiving( | |
| 66 const PacketReceiverCallback& packet_receiver) { | |
| 67 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 68 | |
| 69 packet_receiver_ = packet_receiver; | |
| 70 udp_socket_->AllowAddressReuse(); | |
| 71 udp_socket_->SetMulticastLoopbackMode(true); | |
| 72 if (!IsEmpty(local_addr_)) { | |
| 73 if (udp_socket_->Bind(local_addr_) < 0) { | |
| 74 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | |
| 75 LOG(ERROR) << "Failed to bind local address."; | |
| 76 return; | |
| 77 } | |
| 78 } else if (!IsEmpty(remote_addr_)) { | |
| 79 if (udp_socket_->Connect(remote_addr_) < 0) { | |
| 80 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | |
| 81 LOG(ERROR) << "Failed to connect to remote address."; | |
| 82 return; | |
| 83 } | |
| 84 client_connected_ = true; | |
| 85 } else { | |
| 86 NOTREACHED() << "Either local or remote address has to be defined."; | |
| 87 } | |
| 88 | |
| 89 ScheduleReceiveNextPacket(); | |
| 90 } | |
| 91 | |
| 92 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) { | |
| 93 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 94 next_dscp_value_ = dscp; | |
| 95 } | |
| 96 | |
| 97 void UdpTransport::ScheduleReceiveNextPacket() { | |
| 98 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 99 if (!packet_receiver_.is_null() && !receive_pending_) { | |
| 100 receive_pending_ = true; | |
| 101 io_thread_proxy_->PostTask(FROM_HERE, | |
| 102 base::Bind(&UdpTransport::ReceiveNextPacket, | |
| 103 weak_factory_.GetWeakPtr(), | |
| 104 net::ERR_IO_PENDING)); | |
| 105 } | |
| 106 } | |
| 107 | |
| 108 void UdpTransport::ReceiveNextPacket(int length_or_status) { | |
| 109 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 110 | |
| 111 // Loop while UdpSocket is delivering data synchronously. When it responds | |
| 112 // with a "pending" status, break and expect this method to be called back in | |
| 113 // the future when a packet is ready. | |
| 114 while (true) { | |
| 115 if (length_or_status == net::ERR_IO_PENDING) { | |
| 116 next_packet_.reset(new Packet(kMaxPacketSize)); | |
| 117 recv_buf_ = new net::WrappedIOBuffer( | |
| 118 reinterpret_cast<char*>(&next_packet_->front())); | |
| 119 length_or_status = udp_socket_->RecvFrom( | |
| 120 recv_buf_, | |
| 121 kMaxPacketSize, | |
| 122 &recv_addr_, | |
| 123 base::Bind(&UdpTransport::ReceiveNextPacket, | |
| 124 weak_factory_.GetWeakPtr())); | |
| 125 if (length_or_status == net::ERR_IO_PENDING) { | |
| 126 receive_pending_ = true; | |
| 127 return; | |
| 128 } | |
| 129 } | |
| 130 | |
| 131 // Note: At this point, either a packet is ready or an error has occurred. | |
| 132 if (length_or_status < 0) { | |
| 133 VLOG(1) << "Failed to receive packet: Status code is " | |
| 134 << length_or_status; | |
| 135 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | |
| 136 receive_pending_ = false; | |
| 137 return; | |
| 138 } | |
| 139 | |
| 140 // Confirm the packet has come from the expected remote address; otherwise, | |
| 141 // ignore it. If this is the first packet being received and no remote | |
| 142 // address has been set, set the remote address and expect all future | |
| 143 // packets to come from the same one. | |
| 144 // TODO(hubbe): We should only do this if the caller used a valid ssrc. | |
| 145 if (IsEmpty(remote_addr_)) { | |
| 146 remote_addr_ = recv_addr_; | |
| 147 VLOG(1) << "Setting remote address from first received packet: " | |
| 148 << remote_addr_.ToString(); | |
| 149 } else if (!IsEqual(remote_addr_, recv_addr_)) { | |
| 150 VLOG(1) << "Ignoring packet received from an unrecognized address: " | |
| 151 << recv_addr_.ToString() << "."; | |
| 152 length_or_status = net::ERR_IO_PENDING; | |
| 153 continue; | |
| 154 } | |
| 155 | |
| 156 next_packet_->resize(length_or_status); | |
| 157 packet_receiver_.Run(next_packet_.Pass()); | |
| 158 length_or_status = net::ERR_IO_PENDING; | |
| 159 } | |
| 160 } | |
| 161 | |
| 162 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) { | |
| 163 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 164 | |
| 165 DCHECK(!send_pending_); | |
| 166 if (send_pending_) { | |
| 167 VLOG(1) << "Cannot send because of pending IO."; | |
| 168 return true; | |
| 169 } | |
| 170 | |
| 171 if (next_dscp_value_ != net::DSCP_NO_CHANGE) { | |
| 172 int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_); | |
| 173 if (result != net::OK) { | |
| 174 LOG(ERROR) << "Unable to set DSCP: " << next_dscp_value_ | |
| 175 << " to socket; Error: " << result; | |
| 176 } | |
| 177 // Don't change DSCP in next send. | |
| 178 next_dscp_value_ = net::DSCP_NO_CHANGE; | |
| 179 } | |
| 180 | |
| 181 scoped_refptr<net::IOBuffer> buf = | |
| 182 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front())); | |
| 183 | |
| 184 int result; | |
| 185 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent, | |
| 186 weak_factory_.GetWeakPtr(), | |
| 187 buf, | |
| 188 packet, | |
| 189 cb); | |
| 190 if (client_connected_) { | |
| 191 // If we called Connect() before we must call Write() instead of | |
| 192 // SendTo(). Otherwise on some platforms we might get | |
| 193 // ERR_SOCKET_IS_CONNECTED. | |
| 194 result = udp_socket_->Write(buf, | |
| 195 static_cast<int>(packet->data.size()), | |
| 196 callback); | |
| 197 } else if (!IsEmpty(remote_addr_)) { | |
| 198 result = udp_socket_->SendTo(buf, | |
| 199 static_cast<int>(packet->data.size()), | |
| 200 remote_addr_, | |
| 201 callback); | |
| 202 } else { | |
| 203 return true; | |
| 204 } | |
| 205 | |
| 206 if (result == net::ERR_IO_PENDING) { | |
| 207 send_pending_ = true; | |
| 208 return false; | |
| 209 } else if (result < 0) { | |
| 210 LOG(ERROR) << "Failed to send packet: " << result << "."; | |
| 211 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | |
| 212 return true; | |
| 213 } else { | |
| 214 // Successful send, re-start reading if needed. | |
| 215 ScheduleReceiveNextPacket(); | |
| 216 return true; | |
| 217 } | |
| 218 } | |
| 219 | |
| 220 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, | |
| 221 PacketRef packet, | |
| 222 const base::Closure& cb, | |
| 223 int result) { | |
| 224 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | |
| 225 | |
| 226 send_pending_ = false; | |
| 227 if (result < 0) { | |
| 228 LOG(ERROR) << "Failed to send packet: " << result << "."; | |
| 229 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | |
| 230 } else { | |
| 231 // Successful send, re-start reading if needed. | |
| 232 ScheduleReceiveNextPacket(); | |
| 233 } | |
| 234 | |
| 235 if (!cb.is_null()) { | |
| 236 cb.Run(); | |
| 237 } | |
| 238 } | |
| 239 | |
| 240 } // namespace transport | |
| 241 } // namespace cast | |
| 242 } // namespace media | |
| OLD | NEW |