Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Side by Side Diff: webrtc/api/quicdatachannel.cc

Issue 1886623002: Add QuicDataChannel and QuicDataTransport classes (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/api/quicdatachannel.h"
12
13 #include "webrtc/base/bind.h"
14 #include "webrtc/base/bytebuffer.h"
15 #include "webrtc/base/copyonwritebuffer.h"
16 #include "webrtc/base/logging.h"
17 #include "webrtc/p2p/quic/quictransportchannel.h"
18 #include "webrtc/p2p/quic/reliablequicstream.h"
19
20 namespace webrtc {
21
22 QuicDataChannel::QuicDataChannel(
23 rtc::Thread* signaling_thread,
24 rtc::Thread* worker_thread,
25 const std::string& label,
26 const DataChannelInit& config,
27 const QuicMessageDispatcher& message_dispatcher)
28 : signaling_thread_(signaling_thread),
29 worker_thread_(worker_thread),
30 message_dispatcher_(message_dispatcher),
31 id_(config.id),
32 state_(kConnecting),
33 buffered_amount_(0),
34 num_sent_messages_(0),
35 label_(label),
36 protocol_(config.protocol) {}
37
38 QuicDataChannel::~QuicDataChannel() {}
39
40 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
41 RTC_DCHECK(signaling_thread_->IsCurrent());
42 observer_ = observer;
43 }
44
45 void QuicDataChannel::UnregisterObserver() {
46 RTC_DCHECK(signaling_thread_->IsCurrent());
47 observer_ = nullptr;
48 }
49
50 bool QuicDataChannel::Send(const DataBuffer& buffer) {
51 RTC_DCHECK(signaling_thread_->IsCurrent());
52 if (state_ != kOpen) {
53 LOG(LS_ERROR) << "QUIC data channel " << id_
54 << " is not open so cannot send.";
55 return false;
56 }
57 return worker_thread_->Invoke<bool>(
58 rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
59 }
60
61 bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
62 RTC_DCHECK(worker_thread_->IsCurrent());
63 if (buffer.size() == 0) {
64 LOG(LS_WARNING) << "QUIC data channel " << id_
65 << " refuses to send an empty message.";
pthatcher1 2016/04/14 17:21:56 I think this was just a limitation of SCTP. I don
mikescarlett 2016/04/14 22:16:50 Ok I'll remove that.
66 return true;
67 }
68 // Send the header containing the data channel ID and message ID.
69 rtc::CopyOnWriteBuffer header;
70 message_dispatcher_.EncodeHeader(id_, ++num_sent_messages_, &header);
pthatcher1 2016/04/14 17:21:56 It's strange to me that a class called MessageDisp
mikescarlett 2016/04/14 22:16:50 Done.
71 RTC_DCHECK(quic_transport_channel_);
72 cricket::ReliableQuicStream* stream =
73 quic_transport_channel_->CreateQuicStream();
74 RTC_DCHECK(stream);
75 uint64_t old_queued_bytes = stream->queued_data_bytes();
76 stream->Write(header.data<char>(), header.size(), false);
pthatcher1 2016/04/14 17:21:56 Don't you need to check the Write call with the he
mikescarlett 2016/04/14 22:16:50 I added a check.
77 // Send the message with FIN == true, which signals to the remote peer that
78 // there is no more data after this message.
pthatcher1 2016/04/14 17:21:56 Can you add a line like this? bool fin = true;
mikescarlett 2016/04/14 22:16:50 Done.
79 rtc::StreamResult result =
80 stream->Write(buffer.data.data<char>(), buffer.size(), true);
81 if (result == rtc::SR_SUCCESS) {
82 // The message is sent and we don't need this QUIC stream.
83 LOG(LS_INFO) << "Stream " << stream->id()
84 << " successfully wrote data for QUIC data channel " << id_;
85 stream->Close();
86 return true;
87 }
88 // TODO(mikescarlett): Register the ReliableQuicStream's priority to the
89 // QuicWriteBlockedList so that the QUIC session doesn't drop messages when
90 // the QUIC transport channel becomes unwritable.
91 if (result == rtc::SR_BLOCK) {
92 // The QUIC stream is write blocked, so the message will be queued by the
93 // QUIC session. If this is due to the QUIC not being writable, it will be
94 // sent once QUIC becomes writable again. Otherwise it may be due to
95 // exceeding the QUIC flow control limit, in which case the remote peer's
96 // QUIC session will tell the QUIC stream to send more data.
97 LOG(LS_INFO) << "Stream " << stream->id()
98 << " is write blocked for QUIC data channel " << id_;
99 invoker_.AsyncInvoke<void>(
100 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s,
101 this, buffered_amount_));
102 buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes;
103 stream->SignalQueuedBytesWritten.connect(
104 this, &QuicDataChannel::OnQueuedBytesWritten);
105 write_blocked_quic_streams_[stream->id()] = stream;
106 // The QUIC stream will be removed from |write_blocked_quic_streams_| once
107 // it closes.
108 stream->SignalClosed.connect(this,
109 &QuicDataChannel::OnWriteBlockedStreamClosed);
110 return true;
111 }
112 LOG(LS_ERROR) << "Stream " << stream->id()
113 << " failed to write for QUIC data channel " << id_
114 << ". Unexpected result: " << result;
115 return false;
116 }
117
118 void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id,
119 uint64_t queued_bytes_written) {
120 RTC_DCHECK(worker_thread_->IsCurrent());
121 invoker_.AsyncInvoke<void>(
122 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s,
123 this, buffered_amount_));
124 buffered_amount_ -= queued_bytes_written;
125 const auto& kv = write_blocked_quic_streams_.find(stream_id);
126 RTC_DCHECK(kv != write_blocked_quic_streams_.end());
127 cricket::ReliableQuicStream* stream = kv->second;
128 // True if the QUIC stream is done sending data.
129 if (stream->fin_sent()) {
130 LOG(LS_INFO) << "Stream " << stream->id()
131 << " successfully wrote data for QUIC data channel " << id_;
132 stream->Close();
133 }
134 }
135
136 void QuicDataChannel::Close() {
137 RTC_DCHECK(signaling_thread_->IsCurrent());
138 if (state_ == kClosed || state_ == kClosing) {
139 return;
140 }
141 LOG(LS_INFO) << "Closing QUIC data channel.";
142 SetState_s(kClosing);
143 worker_thread_->Invoke<void>(rtc::Bind(&QuicDataChannel::Close_w, this));
144 }
145
146 void QuicDataChannel::Close_w() {
147 RTC_DCHECK(worker_thread_->IsCurrent());
148 for (auto& kv : incoming_quic_streams_) {
149 Message& message = kv.second;
150 cricket::ReliableQuicStream* stream = message.stream;
151 stream->Close();
152 }
153
154 for (auto& kv : write_blocked_quic_streams_) {
155 cricket::ReliableQuicStream* stream = kv.second;
156 stream->Close();
157 }
158
159 invoker_.AsyncInvoke<void>(
160 signaling_thread_,
161 rtc::Bind(&QuicDataChannel::SetState_s, this, kClosed));
162 }
163
164 void QuicDataChannel::SetTransportChannel(
165 cricket::QuicTransportChannel* channel) {
166 RTC_DCHECK(signaling_thread_->IsCurrent());
167 RTC_DCHECK(channel);
168 RTC_DCHECK(!quic_transport_channel_);
169 quic_transport_channel_ = channel;
170 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_;
171 worker_thread_->Invoke<void>(
172 rtc::Bind(&QuicDataChannel::ConnectTransportChannel_w, this));
173 if (quic_transport_channel_->writable()) {
174 SetState_s(kOpen);
175 }
176 }
177
178 void QuicDataChannel::ConnectTransportChannel_w() {
179 RTC_DCHECK(worker_thread_->IsCurrent());
180 quic_transport_channel_->SignalReadyToSend.connect(
181 this, &QuicDataChannel::OnReadyToSend);
182 quic_transport_channel_->SignalClosed.connect(
183 this, &QuicDataChannel::OnConnectionClosed);
184 }
185
186 void QuicDataChannel::OnIncomingStream(uint64_t message_id,
187 const char* first_bytes,
188 size_t len,
189 cricket::ReliableQuicStream* stream) {
190 RTC_DCHECK(worker_thread_->IsCurrent());
191 RTC_DCHECK(first_bytes);
192 RTC_DCHECK(stream);
193 if (!observer_) {
194 LOG(LS_WARNING) << "QUIC data channel " << id_
195 << " received a message but has no observer.";
196 stream->Close();
197 return;
198 }
199 // A FIN is received if the message fits into a single QUIC stream frame and
200 // the remote peer is done sending.
201 if (stream->fin_received()) {
202 LOG(LS_INFO) << "Stream " << stream->id()
203 << " has finished receiving data for QUIC data channel "
204 << id_;
205 DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false);
206 invoker_.AsyncInvoke<void>(signaling_thread_,
207 rtc::Bind(&QuicDataChannel::OnMessage_s, this,
208 std::move(final_message)));
209 stream->Close();
210 return;
211 }
212 // Otherwise the message is divided across multiple QUIC stream frames, so
213 // queue the data. OnDataReceived() will be called each time the remaining
214 // QUIC stream frames arrive.
215 LOG(LS_INFO) << "QUIC data channel " << id_
216 << " is queuing incoming data for stream " << stream->id();
217 rtc::CopyOnWriteBuffer received_data;
218 if (len > 0) {
219 received_data.AppendData(first_bytes, len);
220 }
221 Message message;
222 message.stream = stream;
223 message.buffer = std::move(received_data);
224 incoming_quic_streams_[stream->id()] = std::move(message);
225 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived);
226 // The QUIC stream will be removed from |incoming_quic_streams_| once it
227 // closes.
228 stream->SignalClosed.connect(this,
229 &QuicDataChannel::OnIncomingQueuedStreamClosed);
230 }
231
232 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
233 const char* data,
234 size_t len) {
235 RTC_DCHECK(worker_thread_->IsCurrent());
236 RTC_DCHECK(data);
237 const auto& kv = incoming_quic_streams_.find(stream_id);
238 RTC_DCHECK(kv != incoming_quic_streams_.end());
239 Message& message = kv->second;
240 cricket::ReliableQuicStream* stream = message.stream;
241 rtc::CopyOnWriteBuffer& received_data = message.buffer;
242 // If the QUIC stream has not received a FIN, then the remote peer is not
243 // finished sending data.
244 if (!stream->fin_received()) {
245 received_data.AppendData(data, len);
246 return;
247 }
248 // Otherwise we are done receiving and can provide the data channel observer
249 // with the message.
250 LOG(LS_INFO) << "Stream " << stream_id
251 << " has finished receiving data for QUIC data channel " << id_;
252 received_data.AppendData(data, len);
253 DataBuffer final_message(std::move(received_data), false);
254 invoker_.AsyncInvoke<void>(
255 signaling_thread_,
256 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
257 // Once the stream is closed, OnDataReceived will not fire for the stream.
258 stream->Close();
259 }
260
261 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) {
262 RTC_DCHECK(worker_thread_->IsCurrent());
263 RTC_DCHECK(channel == quic_transport_channel_);
264 LOG(LS_INFO) << "QuicTransportChannel is ready to send";
265 invoker_.AsyncInvoke<void>(
266 signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen));
267 }
268
269 // Called when a write blocked QUIC stream that has been added to
270 // |write_blocked_quic_streams_| is closed.
271 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id,
272 int error) {
273 RTC_DCHECK(worker_thread_->IsCurrent());
274 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed.";
275 write_blocked_quic_streams_.erase(stream_id);
276 }
277
278 // Called when an incoming QUIC stream that has been added to
279 // |incoming_quic_streams_| is closed.
280 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id,
281 int error) {
282 RTC_DCHECK(worker_thread_->IsCurrent());
283 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed.";
284 incoming_quic_streams_.erase(stream_id);
285 }
286
287 void QuicDataChannel::OnConnectionClosed() {
288 RTC_DCHECK(worker_thread_->IsCurrent());
289 invoker_.AsyncInvoke<void>(
290 signaling_thread_,
291 rtc::Bind(&QuicDataChannel::SetState_s, this, kClosed));
292 }
293
294 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
295 RTC_DCHECK(signaling_thread_->IsCurrent());
296 if (observer_) {
297 observer_->OnMessage(received_data);
298 }
299 }
300
301 void QuicDataChannel::SetState_s(DataState state) {
302 RTC_DCHECK(signaling_thread_->IsCurrent());
303 if (state_ == state || state_ == kClosed) {
304 return;
305 }
306 if (state_ == kClosing && state != kClosed) {
307 return;
308 }
309 LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel "
310 << id_;
311 state_ = state;
312 if (observer_) {
313 observer_->OnStateChange();
314 }
315 }
316
317 void QuicDataChannel::OnBufferedAmountChange_s(uint64_t buffered_amount) {
318 RTC_DCHECK(signaling_thread_->IsCurrent());
319 if (observer_) {
320 observer_->OnBufferedAmountChange(buffered_amount);
321 }
322 }
323
324 size_t QuicDataChannel::GetNumWriteBlockedStreams() const {
325 return write_blocked_quic_streams_.size();
326 }
327
328 size_t QuicDataChannel::GetNumIncomingStreams() const {
329 return incoming_quic_streams_.size();
330 }
331
332 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698