Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: media/cast/transport/transport/udp_transport.cc

Issue 388663003: Cast: Reshuffle files under media/cast (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: DEPS Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/transport/transport/udp_transport.h"
6
7 #include <algorithm>
8 #include <string>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
19
20 namespace media {
21 namespace cast {
22 namespace transport {
23
24 namespace {
25 const int kMaxPacketSize = 1500;
26
27 bool IsEmpty(const net::IPEndPoint& addr) {
28 net::IPAddressNumber empty_addr(addr.address().size());
29 return std::equal(
30 empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
31 !addr.port();
32 }
33
34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
35 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
36 addr1.address().end(),
37 addr2.address().begin());
38 }
39 } // namespace
40
41 UdpTransport::UdpTransport(
42 net::NetLog* net_log,
43 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
44 const net::IPEndPoint& local_end_point,
45 const net::IPEndPoint& remote_end_point,
46 const CastTransportStatusCallback& status_callback)
47 : io_thread_proxy_(io_thread_proxy),
48 local_addr_(local_end_point),
49 remote_addr_(remote_end_point),
50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
51 net::RandIntCallback(),
52 net_log,
53 net::NetLog::Source())),
54 send_pending_(false),
55 receive_pending_(false),
56 client_connected_(false),
57 next_dscp_value_(net::DSCP_NO_CHANGE),
58 status_callback_(status_callback),
59 weak_factory_(this) {
60 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
61 }
62
63 UdpTransport::~UdpTransport() {}
64
65 void UdpTransport::StartReceiving(
66 const PacketReceiverCallback& packet_receiver) {
67 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
68
69 packet_receiver_ = packet_receiver;
70 udp_socket_->AllowAddressReuse();
71 udp_socket_->SetMulticastLoopbackMode(true);
72 if (!IsEmpty(local_addr_)) {
73 if (udp_socket_->Bind(local_addr_) < 0) {
74 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
75 LOG(ERROR) << "Failed to bind local address.";
76 return;
77 }
78 } else if (!IsEmpty(remote_addr_)) {
79 if (udp_socket_->Connect(remote_addr_) < 0) {
80 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
81 LOG(ERROR) << "Failed to connect to remote address.";
82 return;
83 }
84 client_connected_ = true;
85 } else {
86 NOTREACHED() << "Either local or remote address has to be defined.";
87 }
88
89 ScheduleReceiveNextPacket();
90 }
91
92 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
93 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
94 next_dscp_value_ = dscp;
95 }
96
97 void UdpTransport::ScheduleReceiveNextPacket() {
98 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
99 if (!packet_receiver_.is_null() && !receive_pending_) {
100 receive_pending_ = true;
101 io_thread_proxy_->PostTask(FROM_HERE,
102 base::Bind(&UdpTransport::ReceiveNextPacket,
103 weak_factory_.GetWeakPtr(),
104 net::ERR_IO_PENDING));
105 }
106 }
107
108 void UdpTransport::ReceiveNextPacket(int length_or_status) {
109 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
110
111 // Loop while UdpSocket is delivering data synchronously. When it responds
112 // with a "pending" status, break and expect this method to be called back in
113 // the future when a packet is ready.
114 while (true) {
115 if (length_or_status == net::ERR_IO_PENDING) {
116 next_packet_.reset(new Packet(kMaxPacketSize));
117 recv_buf_ = new net::WrappedIOBuffer(
118 reinterpret_cast<char*>(&next_packet_->front()));
119 length_or_status = udp_socket_->RecvFrom(
120 recv_buf_,
121 kMaxPacketSize,
122 &recv_addr_,
123 base::Bind(&UdpTransport::ReceiveNextPacket,
124 weak_factory_.GetWeakPtr()));
125 if (length_or_status == net::ERR_IO_PENDING) {
126 receive_pending_ = true;
127 return;
128 }
129 }
130
131 // Note: At this point, either a packet is ready or an error has occurred.
132 if (length_or_status < 0) {
133 VLOG(1) << "Failed to receive packet: Status code is "
134 << length_or_status;
135 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
136 receive_pending_ = false;
137 return;
138 }
139
140 // Confirm the packet has come from the expected remote address; otherwise,
141 // ignore it. If this is the first packet being received and no remote
142 // address has been set, set the remote address and expect all future
143 // packets to come from the same one.
144 // TODO(hubbe): We should only do this if the caller used a valid ssrc.
145 if (IsEmpty(remote_addr_)) {
146 remote_addr_ = recv_addr_;
147 VLOG(1) << "Setting remote address from first received packet: "
148 << remote_addr_.ToString();
149 } else if (!IsEqual(remote_addr_, recv_addr_)) {
150 VLOG(1) << "Ignoring packet received from an unrecognized address: "
151 << recv_addr_.ToString() << ".";
152 length_or_status = net::ERR_IO_PENDING;
153 continue;
154 }
155
156 next_packet_->resize(length_or_status);
157 packet_receiver_.Run(next_packet_.Pass());
158 length_or_status = net::ERR_IO_PENDING;
159 }
160 }
161
162 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
163 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
164
165 DCHECK(!send_pending_);
166 if (send_pending_) {
167 VLOG(1) << "Cannot send because of pending IO.";
168 return true;
169 }
170
171 if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
172 int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
173 if (result != net::OK) {
174 LOG(ERROR) << "Unable to set DSCP: " << next_dscp_value_
175 << " to socket; Error: " << result;
176 }
177 // Don't change DSCP in next send.
178 next_dscp_value_ = net::DSCP_NO_CHANGE;
179 }
180
181 scoped_refptr<net::IOBuffer> buf =
182 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
183
184 int result;
185 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
186 weak_factory_.GetWeakPtr(),
187 buf,
188 packet,
189 cb);
190 if (client_connected_) {
191 // If we called Connect() before we must call Write() instead of
192 // SendTo(). Otherwise on some platforms we might get
193 // ERR_SOCKET_IS_CONNECTED.
194 result = udp_socket_->Write(buf,
195 static_cast<int>(packet->data.size()),
196 callback);
197 } else if (!IsEmpty(remote_addr_)) {
198 result = udp_socket_->SendTo(buf,
199 static_cast<int>(packet->data.size()),
200 remote_addr_,
201 callback);
202 } else {
203 return true;
204 }
205
206 if (result == net::ERR_IO_PENDING) {
207 send_pending_ = true;
208 return false;
209 } else if (result < 0) {
210 LOG(ERROR) << "Failed to send packet: " << result << ".";
211 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
212 return true;
213 } else {
214 // Successful send, re-start reading if needed.
215 ScheduleReceiveNextPacket();
216 return true;
217 }
218 }
219
220 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
221 PacketRef packet,
222 const base::Closure& cb,
223 int result) {
224 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
225
226 send_pending_ = false;
227 if (result < 0) {
228 LOG(ERROR) << "Failed to send packet: " << result << ".";
229 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
230 } else {
231 // Successful send, re-start reading if needed.
232 ScheduleReceiveNextPacket();
233 }
234
235 if (!cb.is_null()) {
236 cb.Run();
237 }
238 }
239
240 } // namespace transport
241 } // namespace cast
242 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698