OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "media/cast/transport/pacing/paced_sender.h" | |
6 | |
7 #include "base/big_endian.h" | |
8 #include "base/bind.h" | |
9 #include "base/message_loop/message_loop.h" | |
10 | |
11 namespace media { | |
12 namespace cast { | |
13 namespace transport { | |
14 | |
15 namespace { | |
16 | |
17 static const int64 kPacingIntervalMs = 10; | |
18 // Each frame will be split into no more than kPacingMaxBurstsPerFrame | |
19 // bursts of packets. | |
20 static const size_t kPacingMaxBurstsPerFrame = 3; | |
21 static const size_t kTargetBurstSize = 10; | |
22 static const size_t kMaxBurstSize = 20; | |
23 static const size_t kMaxDedupeWindowMs = 500; | |
24 | |
25 } // namespace | |
26 | |
27 // static | |
28 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks, | |
29 uint32 ssrc, | |
30 uint16 packet_id) { | |
31 return std::make_pair(ticks, std::make_pair(ssrc, packet_id)); | |
32 } | |
33 | |
34 PacedSender::PacedSender( | |
35 base::TickClock* clock, | |
36 LoggingImpl* logging, | |
37 PacketSender* transport, | |
38 const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner) | |
39 : clock_(clock), | |
40 logging_(logging), | |
41 transport_(transport), | |
42 transport_task_runner_(transport_task_runner), | |
43 audio_ssrc_(0), | |
44 video_ssrc_(0), | |
45 max_burst_size_(kTargetBurstSize), | |
46 next_max_burst_size_(kTargetBurstSize), | |
47 next_next_max_burst_size_(kTargetBurstSize), | |
48 current_burst_size_(0), | |
49 state_(State_Unblocked), | |
50 weak_factory_(this) { | |
51 } | |
52 | |
53 PacedSender::~PacedSender() {} | |
54 | |
55 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) { | |
56 audio_ssrc_ = audio_ssrc; | |
57 } | |
58 | |
59 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) { | |
60 video_ssrc_ = video_ssrc; | |
61 } | |
62 | |
63 bool PacedSender::SendPackets(const SendPacketVector& packets) { | |
64 if (packets.empty()) { | |
65 return true; | |
66 } | |
67 for (size_t i = 0; i < packets.size(); i++) { | |
68 packet_list_[packets[i].first] = | |
69 make_pair(PacketType_Normal, packets[i].second); | |
70 } | |
71 if (state_ == State_Unblocked) { | |
72 SendStoredPackets(); | |
73 } | |
74 return true; | |
75 } | |
76 | |
77 bool PacedSender::ResendPackets(const SendPacketVector& packets, | |
78 base::TimeDelta dedupe_window) { | |
79 if (packets.empty()) { | |
80 return true; | |
81 } | |
82 base::TimeTicks now = clock_->NowTicks(); | |
83 for (size_t i = 0; i < packets.size(); i++) { | |
84 std::map<PacketKey, base::TimeTicks>::const_iterator j = | |
85 sent_time_.find(packets[i].first); | |
86 | |
87 if (j != sent_time_.end() && now - j->second < dedupe_window) { | |
88 LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED); | |
89 continue; | |
90 } | |
91 | |
92 packet_list_[packets[i].first] = | |
93 make_pair(PacketType_Resend, packets[i].second); | |
94 } | |
95 if (state_ == State_Unblocked) { | |
96 SendStoredPackets(); | |
97 } | |
98 return true; | |
99 } | |
100 | |
101 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) { | |
102 if (state_ == State_TransportBlocked) { | |
103 packet_list_[PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] = | |
104 make_pair(PacketType_RTCP, packet); | |
105 } else { | |
106 // We pass the RTCP packets straight through. | |
107 if (!transport_->SendPacket( | |
108 packet, | |
109 base::Bind(&PacedSender::SendStoredPackets, | |
110 weak_factory_.GetWeakPtr()))) { | |
111 state_ = State_TransportBlocked; | |
112 } | |
113 | |
114 } | |
115 return true; | |
116 } | |
117 | |
118 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) { | |
119 packet_list_.erase(packet_key); | |
120 } | |
121 | |
122 PacketRef PacedSender::GetNextPacket(PacketType* packet_type, | |
123 PacketKey* packet_key) { | |
124 std::map<PacketKey, std::pair<PacketType, PacketRef> >::iterator i; | |
125 i = packet_list_.begin(); | |
126 DCHECK(i != packet_list_.end()); | |
127 *packet_type = i->second.first; | |
128 *packet_key = i->first; | |
129 PacketRef ret = i->second.second; | |
130 packet_list_.erase(i); | |
131 return ret; | |
132 } | |
133 | |
134 bool PacedSender::empty() const { | |
135 return packet_list_.empty(); | |
136 } | |
137 | |
138 size_t PacedSender::size() const { | |
139 return packet_list_.size(); | |
140 } | |
141 | |
142 // This function can be called from three places: | |
143 // 1. User called one of the Send* functions and we were in an unblocked state. | |
144 // 2. state_ == State_TransportBlocked and the transport is calling us to | |
145 // let us know that it's ok to send again. | |
146 // 3. state_ == State_BurstFull and there are still packets to send. In this | |
147 // case we called PostDelayedTask on this function to start a new burst. | |
148 void PacedSender::SendStoredPackets() { | |
149 State previous_state = state_; | |
150 state_ = State_Unblocked; | |
151 if (empty()) { | |
152 return; | |
153 } | |
154 | |
155 base::TimeTicks now = clock_->NowTicks(); | |
156 // I don't actually trust that PostDelayTask(x - now) will mean that | |
157 // now >= x when the call happens, so check if the previous state was | |
158 // State_BurstFull too. | |
159 if (now >= burst_end_ || previous_state == State_BurstFull) { | |
160 // Start a new burst. | |
161 current_burst_size_ = 0; | |
162 burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs); | |
163 | |
164 // The goal here is to try to send out the queued packets over the next | |
165 // three bursts, while trying to keep the burst size below 10 if possible. | |
166 // We have some evidence that sending more than 12 packets in a row doesn't | |
167 // work very well, but we don't actually know why yet. Sending out packets | |
168 // sooner is better than sending out packets later as that gives us more | |
169 // time to re-send them if needed. So if we have less than 30 packets, just | |
170 // send 10 at a time. If we have less than 60 packets, send n / 3 at a time. | |
171 // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s | |
172 // which is more bandwidth than the cast library should need, and sending | |
173 // out more data per second is unlikely to be helpful. | |
174 size_t max_burst_size = std::min( | |
175 kMaxBurstSize, | |
176 std::max(kTargetBurstSize, size() / kPacingMaxBurstsPerFrame)); | |
177 | |
178 // If the queue is long, issue a warning. Try to limit the number of | |
179 // warnings issued by only issuing the warning when the burst size | |
180 // grows. Otherwise we might get 100 warnings per second. | |
181 if (max_burst_size > next_next_max_burst_size_ && size() > 100) { | |
182 LOG(WARNING) << "Packet queue is very long:" << size(); | |
183 } | |
184 | |
185 max_burst_size_ = std::max(next_max_burst_size_, max_burst_size); | |
186 next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size); | |
187 next_next_max_burst_size_ = max_burst_size; | |
188 } | |
189 | |
190 base::Closure cb = base::Bind(&PacedSender::SendStoredPackets, | |
191 weak_factory_.GetWeakPtr()); | |
192 while (!empty()) { | |
193 if (current_burst_size_ >= max_burst_size_) { | |
194 transport_task_runner_->PostDelayedTask(FROM_HERE, | |
195 cb, | |
196 burst_end_ - now); | |
197 state_ = State_BurstFull; | |
198 return; | |
199 } | |
200 PacketType packet_type; | |
201 PacketKey packet_key; | |
202 PacketRef packet = GetNextPacket(&packet_type, &packet_key); | |
203 sent_time_[packet_key] = now; | |
204 sent_time_buffer_[packet_key] = now; | |
205 | |
206 switch (packet_type) { | |
207 case PacketType_Resend: | |
208 LogPacketEvent(packet->data, PACKET_RETRANSMITTED); | |
209 break; | |
210 case PacketType_Normal: | |
211 LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK); | |
212 break; | |
213 case PacketType_RTCP: | |
214 break; | |
215 } | |
216 if (!transport_->SendPacket(packet, cb)) { | |
217 state_ = State_TransportBlocked; | |
218 return; | |
219 } | |
220 current_burst_size_++; | |
221 } | |
222 // Keep ~0.5 seconds of data (1000 packets) | |
223 if (sent_time_buffer_.size() >= | |
224 kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs) { | |
225 sent_time_.swap(sent_time_buffer_); | |
226 sent_time_buffer_.clear(); | |
227 } | |
228 DCHECK_LE(sent_time_buffer_.size(), | |
229 kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs); | |
230 DCHECK_LE(sent_time_.size(), | |
231 2 * kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs); | |
232 state_ = State_Unblocked; | |
233 } | |
234 | |
235 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) { | |
236 // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see | |
237 // if the packet is audio or video. | |
238 DCHECK_GE(packet.size(), 12u); | |
239 base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4); | |
240 uint32 ssrc; | |
241 bool success = reader.ReadU32(&ssrc); | |
242 DCHECK(success); | |
243 bool is_audio; | |
244 if (ssrc == audio_ssrc_) { | |
245 is_audio = true; | |
246 } else if (ssrc == video_ssrc_) { | |
247 is_audio = false; | |
248 } else { | |
249 DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event"; | |
250 return; | |
251 } | |
252 | |
253 EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT; | |
254 logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type, | |
255 packet); | |
256 } | |
257 | |
258 } // namespace transport | |
259 } // namespace cast | |
260 } // namespace media | |
OLD | NEW |