Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(144)

Side by Side Diff: webrtc/api/quicdatachannel.cc

Issue 1844803002: Modify PeerConnection for end-to-end QuicDataChannel usage (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Remove webrtcsdp.cc from this CL Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/api/quicdatachannel.h"
12
13 #include "webrtc/base/bind.h"
14 #include "webrtc/base/bytebuffer.h"
15 #include "webrtc/base/copyonwritebuffer.h"
16 #include "webrtc/base/logging.h"
17 #include "webrtc/p2p/quic/quictransportchannel.h"
18 #include "webrtc/p2p/quic/reliablequicstream.h"
19
20 namespace webrtc {
21
22 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread,
23 rtc::Thread* worker_thread,
24 const std::string& label,
25 const DataChannelInit* config,
26 const MessageHelper& message_helper)
27 : signaling_thread_(signaling_thread),
28 worker_thread_(worker_thread),
29 message_helper_(message_helper),
30 observer_(nullptr),
31 id_(config->id),
32 state_(kConnecting),
33 buffered_amount_(0),
34 num_sent_messages_(0),
35 label_(label),
36 protocol_(config->protocol) {}
37
38 QuicDataChannel::~QuicDataChannel() {}
39
40 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
41 RTC_DCHECK(signaling_thread_->IsCurrent());
42 observer_ = observer;
43 }
44 void QuicDataChannel::UnregisterObserver() {
45 RTC_DCHECK(signaling_thread_->IsCurrent());
46 observer_ = nullptr;
47 }
48
49 bool QuicDataChannel::Send(const DataBuffer& buffer) {
50 RTC_DCHECK(signaling_thread_->IsCurrent());
51 return worker_thread_->Invoke<bool>(
52 rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
53 }
54
55 bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
56 RTC_DCHECK(worker_thread_->IsCurrent());
57 if (state_ != kOpen) {
pthatcher1 2016/04/12 00:58:38 Since state_ is set on the signaling thread, I thi
pthatcher1 2016/04/12 00:58:38 Since the state is set on the signaling, I think w
mikescarlett 2016/04/13 16:53:37 Ok done.
58 LOG(LS_ERROR) << "QUIC data channel " << id_
59 << " is not open so cannot send.";
60 return false;
61 }
62 if (buffer.size() == 0) {
63 LOG(LS_WARNING) << "QUIC data channel " << id_
64 << " refuses to send an empty message.";
65 return true;
66 }
pthatcher1 2016/04/12 00:58:38 Why? If we prepend the message with the data chan
mikescarlett 2016/04/13 16:53:37 I wanted to be consistent with the SCTP data chann
67 RTC_DCHECK(quic_transport_channel_ != nullptr);
68 rtc::CopyOnWriteBuffer payload;
69 message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload);
pthatcher1 2016/04/12 00:58:38 Instead of copying the whole message (which may be
mikescarlett 2016/04/13 16:53:37 Yes that's fine.
70 return WriteData_w(payload.data<char>(), payload.size());
pthatcher1 2016/04/12 00:58:38 Instead of making WriteData_w its own method, can
pthatcher1 2016/04/12 00:58:39 Rather than having WriteData_w be a separate metho
mikescarlett 2016/04/13 16:53:37 I'll combine them.
71 }
72
73 // TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at
74 // the stream and connection level, so that peers can set the maximum size of a
75 // message. Currently QUIC defaults to 16KB per stream and session and will
76 // block bytes exceeding this limit.
pthatcher1 2016/04/12 00:58:38 So we can't send data messages larger than 16KB?
mikescarlett 2016/04/13 16:53:37 I meant to say that the session buffers messages l
77 bool QuicDataChannel::WriteData_w(const char* data, size_t len) {
78 RTC_DCHECK(worker_thread_->IsCurrent());
79 cricket::ReliableQuicStream* stream =
80 quic_transport_channel_->CreateQuicStream();
81 RTC_DCHECK(stream != nullptr);
82 // Send the message with FIN == true, which signals to the remote peer that
83 // there is no more data after this message.
84 uint64_t old_queued_bytes = stream->queued_data_bytes();
85 rtc::StreamResult result = stream->Write(data, len, true);
86 if (result == rtc::SR_SUCCESS) {
87 // The message is sent and we don't need this QUIC stream.
88 LOG(LS_INFO) << "Stream " << stream->id()
89 << " successfully wrote data for QUIC data channel " << id_;
90 stream->Close();
91 return true;
92 }
93 if (result == rtc::SR_BLOCK) {
94 // The QUIC stream is write blocked, so the message will be queued by the
95 // QUIC session. It might be due to the QUIC transport not being writable,
96 // or it may be due to exceeding the QUIC flow control limit.
97 LOG(LS_WARNING) << "Stream " << stream->id()
98 << " is write blocked for QUIC data channel " << id_;
99 if (observer_ != nullptr) {
100 observer_->OnBufferedAmountChange(buffered_amount_);
101 }
102 buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes;
103 stream->SignalQueuedBytesWritten.connect(
104 this, &QuicDataChannel::OnQueuedBytesWritten);
105 // Once the stream has stopped writing, it will be closed.
106 stream->StopReading();
107 write_blocked_quic_streams_[stream->id()] = stream;
108 return true;
109 }
110 return false;
pthatcher1 2016/04/12 00:58:39 We should probably log what the result was, since
mikescarlett 2016/04/13 16:53:37 Done.
111 }
112
113 void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id,
114 uint64_t queued_bytes_written) {
115 if (observer_ != nullptr) {
116 observer_->OnBufferedAmountChange(buffered_amount_);
pthatcher1 2016/04/12 00:58:39 What thread is this called on? Because I'm pretty
mikescarlett 2016/04/13 16:53:37 OnQueuedBytesWritten is on the worker thread. I fi
117 }
118 buffered_amount_ -= queued_bytes_written;
119 }
120
121 void QuicDataChannel::SetTransportChannel(
122 cricket::QuicTransportChannel* channel) {
123 RTC_DCHECK(channel != nullptr);
pthatcher1 2016/04/12 00:58:38 RTC_DCHECK(channel) is sufficient.
mikescarlett 2016/04/13 16:53:37 Done.
124 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_;
125 quic_transport_channel_ = channel;
126 quic_transport_channel_->SignalWritableState.connect(
127 this, &QuicDataChannel::OnWritableState);
128 quic_transport_channel_->SignalClosed.connect(
129 this, &QuicDataChannel::OnConnectionClosed);
pthatcher1 2016/04/12 00:58:39 We also need to disconnect from the previous quic_
mikescarlett 2016/04/13 16:53:37 This function should only be called once. I added
130 if (quic_transport_channel_->writable()) {
131 SetState(kOpen);
132 }
133 }
134
135 void QuicDataChannel::OnIncomingStream(uint64_t message_id,
136 const char* first_bytes,
137 size_t len,
138 cricket::ReliableQuicStream* stream) {
139 RTC_DCHECK(worker_thread_->IsCurrent());
140 RTC_DCHECK(first_bytes != nullptr);
141 RTC_DCHECK(stream != nullptr);
142 if (!observer_) {
143 LOG(LS_WARNING) << "QUIC data channel " << id_
144 << " received a message but has no observer.";
145 stream->Close();
146 return;
147 }
148 // A FIN is received if the message fits into a single QUIC stream frame and
149 // the remote peer is done sending.
150 if (!stream->fin_received()) {
151 // If FIN is not received, the message is divided across multiple QUIC
152 // stream frames, so queue the data. OnDataReceived() will be called each
153 // time the remaining QUIC stream frames arrive.
154 LOG(LS_INFO) << "QUIC data channel " << id_
155 << " is queuing incoming data for stream " << stream->id();
156 rtc::CopyOnWriteBuffer received_data;
157 received_data.AppendData(first_bytes, len);
158 Message message;
159 message.stream = stream;
160 message.buffer = std::move(received_data);
161 incoming_quic_streams_[stream->id()] = std::move(message);
162 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived);
163 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed);
164 return;
165 }
166 // Otherwise propagate the message to the observer.
167 LOG(LS_INFO) << "Stream " << stream->id()
168 << " has finished receiving data for QUIC data channel " << id_;
169 DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false);
170 invoker_.AsyncInvoke<void>(
171 signaling_thread_,
172 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
173 stream->Close();
pthatcher1 2016/04/12 00:58:39 I think flipping this around to if (stream->fin_r
mikescarlett 2016/04/13 16:53:37 Done.
174 }
175
176 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
177 const char* data,
178 size_t len) {
179 RTC_DCHECK(worker_thread_->IsCurrent());
180 RTC_DCHECK(data != nullptr);
181 const auto& kv = incoming_quic_streams_.find(stream_id);
182 RTC_DCHECK(kv != incoming_quic_streams_.end());
183 Message& message = kv->second;
184 cricket::ReliableQuicStream* stream = message.stream;
185 rtc::CopyOnWriteBuffer& received_data = message.buffer;
186 // If the QUIC stream has not received a FIN, then the remote peer is not
187 // finished sending data.
188 if (!stream->fin_received()) {
189 received_data.AppendData(data, len);
190 return;
191 }
192 // Otherwise we are done receiving and can provide the data channel observer
193 // with the message.
194 LOG(LS_INFO) << "Stream " << stream_id
195 << " has finished receiving data for QUIC data channel " << id_;
196 received_data.AppendData(data, len);
197 DataBuffer final_message(std::move(received_data), false);
198 invoker_.AsyncInvoke<void>(
199 signaling_thread_,
200 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
201 // Once the stream is closed, OnDataReceived should not be fired.
202 stream->Close();
203 incoming_quic_streams_.erase(stream_id);
204 }
205
206 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
207 RTC_DCHECK(signaling_thread_->IsCurrent());
208 RTC_DCHECK(observer_ != nullptr);
209 observer_->OnMessage(received_data);
210 }
211
212 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) {
213 RTC_DCHECK(worker_thread_->IsCurrent());
214 RTC_DCHECK(channel == quic_transport_channel_);
215 LOG(LS_INFO) << "QuicTransportChannel is writable";
216 SetState(kOpen);
pthatcher1 2016/04/12 00:58:38 Actually, you need to check channel->writable().
mikescarlett 2016/04/13 16:53:37 Done.
217 }
218
219 void QuicDataChannel::Close() {
220 RTC_DCHECK(signaling_thread_->IsCurrent());
221 if (state_ == kClosed) {
222 return;
223 }
224 LOG(LS_INFO) << "Closing QUIC data channel.";
225 SetState(kClosing);
226
227 for (auto& kv : incoming_quic_streams_) {
228 Message& message = kv.second;
229 cricket::ReliableQuicStream* stream = message.stream;
230 stream->Close();
231 }
232 incoming_quic_streams_.clear();
233
234 for (auto& kv : write_blocked_quic_streams_) {
235 cricket::ReliableQuicStream* stream = kv.second;
236 stream->Close();
237 }
238 write_blocked_quic_streams_.clear();
239
240 SetState(kClosed);
241 }
242
243 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) {
244 RTC_DCHECK(worker_thread_->IsCurrent());
245 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed.";
246 incoming_quic_streams_.erase(stream_id);
247 }
248
249 void QuicDataChannel::OnConnectionClosed() {
250 RTC_DCHECK(worker_thread_->IsCurrent());
251 SetState(kClosed);
252 }
253
254 void QuicDataChannel::SetState(DataState state) {
255 if (state_ == state) {
256 return;
257 }
258 LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel "
259 << id_;
260 state_ = state;
261 if (observer_) {
262 observer_->OnStateChange();
263 }
264 if (state_ == kClosed) {
265 SignalClosed(this);
266 }
267 }
268
269 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698