Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Side by Side Diff: media/cast/transport/pacing/paced_sender.cc

Issue 388663003: Cast: Reshuffle files under media/cast (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: missing includes Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/transport/pacing/paced_sender.h"
6
7 #include "base/big_endian.h"
8 #include "base/bind.h"
9 #include "base/message_loop/message_loop.h"
10
11 namespace media {
12 namespace cast {
13 namespace transport {
14
15 namespace {
16
17 static const int64 kPacingIntervalMs = 10;
18 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
19 // bursts of packets.
20 static const size_t kPacingMaxBurstsPerFrame = 3;
21 static const size_t kTargetBurstSize = 10;
22 static const size_t kMaxBurstSize = 20;
23 static const size_t kMaxDedupeWindowMs = 500;
24
25 } // namespace
26
27 // static
28 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
29 uint32 ssrc,
30 uint16 packet_id) {
31 return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
32 }
33
34 PacedSender::PacedSender(
35 base::TickClock* clock,
36 LoggingImpl* logging,
37 PacketSender* transport,
38 const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
39 : clock_(clock),
40 logging_(logging),
41 transport_(transport),
42 transport_task_runner_(transport_task_runner),
43 audio_ssrc_(0),
44 video_ssrc_(0),
45 max_burst_size_(kTargetBurstSize),
46 next_max_burst_size_(kTargetBurstSize),
47 next_next_max_burst_size_(kTargetBurstSize),
48 current_burst_size_(0),
49 state_(State_Unblocked),
50 weak_factory_(this) {
51 }
52
53 PacedSender::~PacedSender() {}
54
55 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
56 audio_ssrc_ = audio_ssrc;
57 }
58
59 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
60 video_ssrc_ = video_ssrc;
61 }
62
63 bool PacedSender::SendPackets(const SendPacketVector& packets) {
64 if (packets.empty()) {
65 return true;
66 }
67 for (size_t i = 0; i < packets.size(); i++) {
68 packet_list_[packets[i].first] =
69 make_pair(PacketType_Normal, packets[i].second);
70 }
71 if (state_ == State_Unblocked) {
72 SendStoredPackets();
73 }
74 return true;
75 }
76
77 bool PacedSender::ResendPackets(const SendPacketVector& packets,
78 base::TimeDelta dedupe_window) {
79 if (packets.empty()) {
80 return true;
81 }
82 base::TimeTicks now = clock_->NowTicks();
83 for (size_t i = 0; i < packets.size(); i++) {
84 std::map<PacketKey, base::TimeTicks>::const_iterator j =
85 sent_time_.find(packets[i].first);
86
87 if (j != sent_time_.end() && now - j->second < dedupe_window) {
88 LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
89 continue;
90 }
91
92 packet_list_[packets[i].first] =
93 make_pair(PacketType_Resend, packets[i].second);
94 }
95 if (state_ == State_Unblocked) {
96 SendStoredPackets();
97 }
98 return true;
99 }
100
101 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
102 if (state_ == State_TransportBlocked) {
103 packet_list_[PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
104 make_pair(PacketType_RTCP, packet);
105 } else {
106 // We pass the RTCP packets straight through.
107 if (!transport_->SendPacket(
108 packet,
109 base::Bind(&PacedSender::SendStoredPackets,
110 weak_factory_.GetWeakPtr()))) {
111 state_ = State_TransportBlocked;
112 }
113
114 }
115 return true;
116 }
117
118 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
119 packet_list_.erase(packet_key);
120 }
121
122 PacketRef PacedSender::GetNextPacket(PacketType* packet_type,
123 PacketKey* packet_key) {
124 std::map<PacketKey, std::pair<PacketType, PacketRef> >::iterator i;
125 i = packet_list_.begin();
126 DCHECK(i != packet_list_.end());
127 *packet_type = i->second.first;
128 *packet_key = i->first;
129 PacketRef ret = i->second.second;
130 packet_list_.erase(i);
131 return ret;
132 }
133
134 bool PacedSender::empty() const {
135 return packet_list_.empty();
136 }
137
138 size_t PacedSender::size() const {
139 return packet_list_.size();
140 }
141
142 // This function can be called from three places:
143 // 1. User called one of the Send* functions and we were in an unblocked state.
144 // 2. state_ == State_TransportBlocked and the transport is calling us to
145 // let us know that it's ok to send again.
146 // 3. state_ == State_BurstFull and there are still packets to send. In this
147 // case we called PostDelayedTask on this function to start a new burst.
148 void PacedSender::SendStoredPackets() {
149 State previous_state = state_;
150 state_ = State_Unblocked;
151 if (empty()) {
152 return;
153 }
154
155 base::TimeTicks now = clock_->NowTicks();
156 // I don't actually trust that PostDelayTask(x - now) will mean that
157 // now >= x when the call happens, so check if the previous state was
158 // State_BurstFull too.
159 if (now >= burst_end_ || previous_state == State_BurstFull) {
160 // Start a new burst.
161 current_burst_size_ = 0;
162 burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
163
164 // The goal here is to try to send out the queued packets over the next
165 // three bursts, while trying to keep the burst size below 10 if possible.
166 // We have some evidence that sending more than 12 packets in a row doesn't
167 // work very well, but we don't actually know why yet. Sending out packets
168 // sooner is better than sending out packets later as that gives us more
169 // time to re-send them if needed. So if we have less than 30 packets, just
170 // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
171 // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
172 // which is more bandwidth than the cast library should need, and sending
173 // out more data per second is unlikely to be helpful.
174 size_t max_burst_size = std::min(
175 kMaxBurstSize,
176 std::max(kTargetBurstSize, size() / kPacingMaxBurstsPerFrame));
177
178 // If the queue is long, issue a warning. Try to limit the number of
179 // warnings issued by only issuing the warning when the burst size
180 // grows. Otherwise we might get 100 warnings per second.
181 if (max_burst_size > next_next_max_burst_size_ && size() > 100) {
182 LOG(WARNING) << "Packet queue is very long:" << size();
183 }
184
185 max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
186 next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
187 next_next_max_burst_size_ = max_burst_size;
188 }
189
190 base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
191 weak_factory_.GetWeakPtr());
192 while (!empty()) {
193 if (current_burst_size_ >= max_burst_size_) {
194 transport_task_runner_->PostDelayedTask(FROM_HERE,
195 cb,
196 burst_end_ - now);
197 state_ = State_BurstFull;
198 return;
199 }
200 PacketType packet_type;
201 PacketKey packet_key;
202 PacketRef packet = GetNextPacket(&packet_type, &packet_key);
203 sent_time_[packet_key] = now;
204 sent_time_buffer_[packet_key] = now;
205
206 switch (packet_type) {
207 case PacketType_Resend:
208 LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
209 break;
210 case PacketType_Normal:
211 LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
212 break;
213 case PacketType_RTCP:
214 break;
215 }
216 if (!transport_->SendPacket(packet, cb)) {
217 state_ = State_TransportBlocked;
218 return;
219 }
220 current_burst_size_++;
221 }
222 // Keep ~0.5 seconds of data (1000 packets)
223 if (sent_time_buffer_.size() >=
224 kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs) {
225 sent_time_.swap(sent_time_buffer_);
226 sent_time_buffer_.clear();
227 }
228 DCHECK_LE(sent_time_buffer_.size(),
229 kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
230 DCHECK_LE(sent_time_.size(),
231 2 * kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
232 state_ = State_Unblocked;
233 }
234
235 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
236 // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
237 // if the packet is audio or video.
238 DCHECK_GE(packet.size(), 12u);
239 base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
240 uint32 ssrc;
241 bool success = reader.ReadU32(&ssrc);
242 DCHECK(success);
243 bool is_audio;
244 if (ssrc == audio_ssrc_) {
245 is_audio = true;
246 } else if (ssrc == video_ssrc_) {
247 is_audio = false;
248 } else {
249 DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
250 return;
251 }
252
253 EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
254 logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
255 packet);
256 }
257
258 } // namespace transport
259 } // namespace cast
260 } // namespace media
OLDNEW
« no previous file with comments | « media/cast/transport/pacing/paced_sender.h ('k') | media/cast/transport/pacing/paced_sender_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698