OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "media/cast/net/udp_transport.h" | 5 #include "media/cast/net/udp_transport.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <string> | 8 #include <string> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
58 send_buffer_size_(send_buffer_size), | 58 send_buffer_size_(send_buffer_size), |
59 status_callback_(status_callback), | 59 status_callback_(status_callback), |
60 bytes_sent_(0), | 60 bytes_sent_(0), |
61 weak_factory_(this) { | 61 weak_factory_(this) { |
62 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point)); | 62 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point)); |
63 } | 63 } |
64 | 64 |
65 UdpTransport::~UdpTransport() {} | 65 UdpTransport::~UdpTransport() {} |
66 | 66 |
67 void UdpTransport::StartReceiving( | 67 void UdpTransport::StartReceiving( |
68 const PacketReceiverCallback& packet_receiver) { | 68 const PacketReceiverCallbackWithStatus& packet_receiver) { |
69 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | 69 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
70 | 70 |
71 packet_receiver_ = packet_receiver; | 71 packet_receiver_ = packet_receiver; |
72 udp_socket_->SetMulticastLoopbackMode(true); | 72 udp_socket_->SetMulticastLoopbackMode(true); |
73 if (!IsEmpty(local_addr_)) { | 73 if (!IsEmpty(local_addr_)) { |
74 if (udp_socket_->Open(local_addr_.GetFamily()) < 0 || | 74 if (udp_socket_->Open(local_addr_.GetFamily()) < 0 || |
75 udp_socket_->AllowAddressReuse() < 0 || | 75 udp_socket_->AllowAddressReuse() < 0 || |
76 udp_socket_->Bind(local_addr_) < 0) { | 76 udp_socket_->Bind(local_addr_) < 0) { |
77 udp_socket_->Close(); | 77 udp_socket_->Close(); |
78 status_callback_.Run(TRANSPORT_SOCKET_ERROR); | 78 status_callback_.Run(TRANSPORT_SOCKET_ERROR); |
(...skipping 13 matching lines...) Expand all Loading... |
92 } else { | 92 } else { |
93 NOTREACHED() << "Either local or remote address has to be defined."; | 93 NOTREACHED() << "Either local or remote address has to be defined."; |
94 } | 94 } |
95 if (udp_socket_->SetSendBufferSize(send_buffer_size_) != net::OK) { | 95 if (udp_socket_->SetSendBufferSize(send_buffer_size_) != net::OK) { |
96 LOG(WARNING) << "Failed to set socket send buffer size."; | 96 LOG(WARNING) << "Failed to set socket send buffer size."; |
97 } | 97 } |
98 | 98 |
99 ScheduleReceiveNextPacket(); | 99 ScheduleReceiveNextPacket(); |
100 } | 100 } |
101 | 101 |
| 102 void UdpTransport::StopReceiving() { |
| 103 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
| 104 packet_receiver_ = PacketReceiverCallbackWithStatus(); |
| 105 } |
| 106 |
| 107 |
102 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) { | 108 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) { |
103 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | 109 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
104 next_dscp_value_ = dscp; | 110 next_dscp_value_ = dscp; |
105 } | 111 } |
106 | 112 |
107 void UdpTransport::ScheduleReceiveNextPacket() { | 113 void UdpTransport::ScheduleReceiveNextPacket() { |
108 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | 114 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
109 if (!packet_receiver_.is_null() && !receive_pending_) { | 115 if (!packet_receiver_.is_null() && !receive_pending_) { |
110 receive_pending_ = true; | 116 receive_pending_ = true; |
111 io_thread_proxy_->PostTask(FROM_HERE, | 117 io_thread_proxy_->PostTask(FROM_HERE, |
112 base::Bind(&UdpTransport::ReceiveNextPacket, | 118 base::Bind(&UdpTransport::ReceiveNextPacket, |
113 weak_factory_.GetWeakPtr(), | 119 weak_factory_.GetWeakPtr(), |
114 net::ERR_IO_PENDING)); | 120 net::ERR_IO_PENDING)); |
115 } | 121 } |
116 } | 122 } |
117 | 123 |
118 void UdpTransport::ReceiveNextPacket(int length_or_status) { | 124 void UdpTransport::ReceiveNextPacket(int length_or_status) { |
119 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | 125 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
120 | 126 |
| 127 if (packet_receiver_.is_null()) |
| 128 return; |
| 129 |
121 // Loop while UdpSocket is delivering data synchronously. When it responds | 130 // Loop while UdpSocket is delivering data synchronously. When it responds |
122 // with a "pending" status, break and expect this method to be called back in | 131 // with a "pending" status, break and expect this method to be called back in |
123 // the future when a packet is ready. | 132 // the future when a packet is ready. |
124 while (true) { | 133 while (true) { |
125 if (length_or_status == net::ERR_IO_PENDING) { | 134 if (length_or_status == net::ERR_IO_PENDING) { |
126 next_packet_.reset(new Packet(kMaxPacketSize)); | 135 next_packet_.reset(new Packet(kMaxPacketSize)); |
127 recv_buf_ = new net::WrappedIOBuffer( | 136 recv_buf_ = new net::WrappedIOBuffer( |
128 reinterpret_cast<char*>(&next_packet_->front())); | 137 reinterpret_cast<char*>(&next_packet_->front())); |
129 length_or_status = | 138 length_or_status = |
130 udp_socket_->RecvFrom(recv_buf_.get(), | 139 udp_socket_->RecvFrom(recv_buf_.get(), |
(...skipping 17 matching lines...) Expand all Loading... |
148 | 157 |
149 // Confirm the packet has come from the expected remote address; otherwise, | 158 // Confirm the packet has come from the expected remote address; otherwise, |
150 // ignore it. If this is the first packet being received and no remote | 159 // ignore it. If this is the first packet being received and no remote |
151 // address has been set, set the remote address and expect all future | 160 // address has been set, set the remote address and expect all future |
152 // packets to come from the same one. | 161 // packets to come from the same one. |
153 // TODO(hubbe): We should only do this if the caller used a valid ssrc. | 162 // TODO(hubbe): We should only do this if the caller used a valid ssrc. |
154 if (IsEmpty(remote_addr_)) { | 163 if (IsEmpty(remote_addr_)) { |
155 remote_addr_ = recv_addr_; | 164 remote_addr_ = recv_addr_; |
156 VLOG(1) << "Setting remote address from first received packet: " | 165 VLOG(1) << "Setting remote address from first received packet: " |
157 << remote_addr_.ToString(); | 166 << remote_addr_.ToString(); |
| 167 next_packet_->resize(length_or_status); |
| 168 if (!packet_receiver_.Run(next_packet_.Pass())) { |
| 169 VLOG(1) << "Packet was not valid, resetting remote address."; |
| 170 remote_addr_ = net::IPEndPoint(); |
| 171 } |
158 } else if (!IsEqual(remote_addr_, recv_addr_)) { | 172 } else if (!IsEqual(remote_addr_, recv_addr_)) { |
159 VLOG(1) << "Ignoring packet received from an unrecognized address: " | 173 VLOG(1) << "Ignoring packet received from an unrecognized address: " |
160 << recv_addr_.ToString() << "."; | 174 << recv_addr_.ToString() << "."; |
161 length_or_status = net::ERR_IO_PENDING; | 175 } else { |
162 continue; | 176 next_packet_->resize(length_or_status); |
| 177 packet_receiver_.Run(next_packet_.Pass()); |
163 } | 178 } |
164 | |
165 next_packet_->resize(length_or_status); | |
166 packet_receiver_.Run(next_packet_.Pass()); | |
167 length_or_status = net::ERR_IO_PENDING; | 179 length_or_status = net::ERR_IO_PENDING; |
168 } | 180 } |
169 } | 181 } |
170 | 182 |
171 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) { | 183 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) { |
172 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); | 184 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
173 | 185 |
174 // Increase byte count no matter the packet was sent or dropped. | 186 // Increase byte count no matter the packet was sent or dropped. |
175 bytes_sent_ += packet->data.size(); | 187 bytes_sent_ += packet->data.size(); |
176 | 188 |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 } | 255 } |
244 ScheduleReceiveNextPacket(); | 256 ScheduleReceiveNextPacket(); |
245 | 257 |
246 if (!cb.is_null()) { | 258 if (!cb.is_null()) { |
247 cb.Run(); | 259 cb.Run(); |
248 } | 260 } |
249 } | 261 } |
250 | 262 |
251 } // namespace cast | 263 } // namespace cast |
252 } // namespace media | 264 } // namespace media |
OLD | NEW |