OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/api/quicdatachannel.h" | |
12 | |
13 #include "webrtc/base/bind.h" | |
14 #include "webrtc/base/bytebuffer.h" | |
15 #include "webrtc/base/copyonwritebuffer.h" | |
16 #include "webrtc/base/logging.h" | |
17 #include "webrtc/p2p/quic/quictransportchannel.h" | |
18 #include "webrtc/p2p/quic/reliablequicstream.h" | |
19 | |
20 namespace webrtc { | |
21 | |
22 QuicDataChannel::QuicDataChannel( | |
23 rtc::Thread* signaling_thread, | |
24 rtc::Thread* worker_thread, | |
25 const std::string& label, | |
26 const DataChannelInit& config, | |
27 const QuicMessageDispatcher& message_dispatcher) | |
28 : signaling_thread_(signaling_thread), | |
29 worker_thread_(worker_thread), | |
30 message_dispatcher_(message_dispatcher), | |
31 id_(config.id), | |
32 state_(kConnecting), | |
33 buffered_amount_(0), | |
34 num_sent_messages_(0), | |
35 label_(label), | |
36 protocol_(config.protocol) {} | |
37 | |
38 QuicDataChannel::~QuicDataChannel() {} | |
39 | |
40 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | |
41 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
42 observer_ = observer; | |
43 } | |
44 | |
45 void QuicDataChannel::UnregisterObserver() { | |
46 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
47 observer_ = nullptr; | |
48 } | |
49 | |
50 bool QuicDataChannel::Send(const DataBuffer& buffer) { | |
51 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
52 if (state_ != kOpen) { | |
53 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
54 << " is not open so cannot send."; | |
55 return false; | |
56 } | |
57 return worker_thread_->Invoke<bool>( | |
58 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | |
59 } | |
60 | |
61 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | |
62 RTC_DCHECK(worker_thread_->IsCurrent()); | |
63 if (buffer.size() == 0) { | |
64 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
65 << " refuses to send an empty message."; | |
pthatcher1
2016/04/14 17:21:56
I think this was just a limitation of SCTP. I don
mikescarlett
2016/04/14 22:16:50
Ok I'll remove that.
| |
66 return true; | |
67 } | |
68 // Send the header containing the data channel ID and message ID. | |
69 rtc::CopyOnWriteBuffer header; | |
70 message_dispatcher_.EncodeHeader(id_, ++num_sent_messages_, &header); | |
pthatcher1
2016/04/14 17:21:56
It's strange to me that a class called MessageDisp
mikescarlett
2016/04/14 22:16:50
Done.
| |
71 RTC_DCHECK(quic_transport_channel_); | |
72 cricket::ReliableQuicStream* stream = | |
73 quic_transport_channel_->CreateQuicStream(); | |
74 RTC_DCHECK(stream); | |
75 uint64_t old_queued_bytes = stream->queued_data_bytes(); | |
76 stream->Write(header.data<char>(), header.size(), false); | |
pthatcher1
2016/04/14 17:21:56
Don't you need to check the Write call with the he
mikescarlett
2016/04/14 22:16:50
I added a check.
| |
77 // Send the message with FIN == true, which signals to the remote peer that | |
78 // there is no more data after this message. | |
pthatcher1
2016/04/14 17:21:56
Can you add a line like this?
bool fin = true;
mikescarlett
2016/04/14 22:16:50
Done.
| |
79 rtc::StreamResult result = | |
80 stream->Write(buffer.data.data<char>(), buffer.size(), true); | |
81 if (result == rtc::SR_SUCCESS) { | |
82 // The message is sent and we don't need this QUIC stream. | |
83 LOG(LS_INFO) << "Stream " << stream->id() | |
84 << " successfully wrote data for QUIC data channel " << id_; | |
85 stream->Close(); | |
86 return true; | |
87 } | |
88 // TODO(mikescarlett): Register the ReliableQuicStream's priority to the | |
89 // QuicWriteBlockedList so that the QUIC session doesn't drop messages when | |
90 // the QUIC transport channel becomes unwritable. | |
91 if (result == rtc::SR_BLOCK) { | |
92 // The QUIC stream is write blocked, so the message will be queued by the | |
93 // QUIC session. If this is due to the QUIC not being writable, it will be | |
94 // sent once QUIC becomes writable again. Otherwise it may be due to | |
95 // exceeding the QUIC flow control limit, in which case the remote peer's | |
96 // QUIC session will tell the QUIC stream to send more data. | |
97 LOG(LS_INFO) << "Stream " << stream->id() | |
98 << " is write blocked for QUIC data channel " << id_; | |
99 invoker_.AsyncInvoke<void>( | |
100 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, | |
101 this, buffered_amount_)); | |
102 buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes; | |
103 stream->SignalQueuedBytesWritten.connect( | |
104 this, &QuicDataChannel::OnQueuedBytesWritten); | |
105 write_blocked_quic_streams_[stream->id()] = stream; | |
106 // The QUIC stream will be removed from |write_blocked_quic_streams_| once | |
107 // it closes. | |
108 stream->SignalClosed.connect(this, | |
109 &QuicDataChannel::OnWriteBlockedStreamClosed); | |
110 return true; | |
111 } | |
112 LOG(LS_ERROR) << "Stream " << stream->id() | |
113 << " failed to write for QUIC data channel " << id_ | |
114 << ". Unexpected result: " << result; | |
115 return false; | |
116 } | |
117 | |
118 void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, | |
119 uint64_t queued_bytes_written) { | |
120 RTC_DCHECK(worker_thread_->IsCurrent()); | |
121 invoker_.AsyncInvoke<void>( | |
122 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, | |
123 this, buffered_amount_)); | |
124 buffered_amount_ -= queued_bytes_written; | |
125 const auto& kv = write_blocked_quic_streams_.find(stream_id); | |
126 RTC_DCHECK(kv != write_blocked_quic_streams_.end()); | |
127 cricket::ReliableQuicStream* stream = kv->second; | |
128 // True if the QUIC stream is done sending data. | |
129 if (stream->fin_sent()) { | |
130 LOG(LS_INFO) << "Stream " << stream->id() | |
131 << " successfully wrote data for QUIC data channel " << id_; | |
132 stream->Close(); | |
133 } | |
134 } | |
135 | |
136 void QuicDataChannel::Close() { | |
137 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
138 if (state_ == kClosed || state_ == kClosing) { | |
139 return; | |
140 } | |
141 LOG(LS_INFO) << "Closing QUIC data channel."; | |
142 SetState_s(kClosing); | |
143 worker_thread_->Invoke<void>(rtc::Bind(&QuicDataChannel::Close_w, this)); | |
144 } | |
145 | |
146 void QuicDataChannel::Close_w() { | |
147 RTC_DCHECK(worker_thread_->IsCurrent()); | |
148 for (auto& kv : incoming_quic_streams_) { | |
149 Message& message = kv.second; | |
150 cricket::ReliableQuicStream* stream = message.stream; | |
151 stream->Close(); | |
152 } | |
153 | |
154 for (auto& kv : write_blocked_quic_streams_) { | |
155 cricket::ReliableQuicStream* stream = kv.second; | |
156 stream->Close(); | |
157 } | |
158 | |
159 invoker_.AsyncInvoke<void>( | |
160 signaling_thread_, | |
161 rtc::Bind(&QuicDataChannel::SetState_s, this, kClosed)); | |
162 } | |
163 | |
164 void QuicDataChannel::SetTransportChannel( | |
165 cricket::QuicTransportChannel* channel) { | |
166 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
167 RTC_DCHECK(channel); | |
168 RTC_DCHECK(!quic_transport_channel_); | |
169 quic_transport_channel_ = channel; | |
170 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; | |
171 worker_thread_->Invoke<void>( | |
172 rtc::Bind(&QuicDataChannel::ConnectTransportChannel_w, this)); | |
173 if (quic_transport_channel_->writable()) { | |
174 SetState_s(kOpen); | |
175 } | |
176 } | |
177 | |
178 void QuicDataChannel::ConnectTransportChannel_w() { | |
179 RTC_DCHECK(worker_thread_->IsCurrent()); | |
180 quic_transport_channel_->SignalReadyToSend.connect( | |
181 this, &QuicDataChannel::OnReadyToSend); | |
182 quic_transport_channel_->SignalClosed.connect( | |
183 this, &QuicDataChannel::OnConnectionClosed); | |
184 } | |
185 | |
186 void QuicDataChannel::OnIncomingStream(uint64_t message_id, | |
187 const char* first_bytes, | |
188 size_t len, | |
189 cricket::ReliableQuicStream* stream) { | |
190 RTC_DCHECK(worker_thread_->IsCurrent()); | |
191 RTC_DCHECK(first_bytes); | |
192 RTC_DCHECK(stream); | |
193 if (!observer_) { | |
194 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
195 << " received a message but has no observer."; | |
196 stream->Close(); | |
197 return; | |
198 } | |
199 // A FIN is received if the message fits into a single QUIC stream frame and | |
200 // the remote peer is done sending. | |
201 if (stream->fin_received()) { | |
202 LOG(LS_INFO) << "Stream " << stream->id() | |
203 << " has finished receiving data for QUIC data channel " | |
204 << id_; | |
205 DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false); | |
206 invoker_.AsyncInvoke<void>(signaling_thread_, | |
207 rtc::Bind(&QuicDataChannel::OnMessage_s, this, | |
208 std::move(final_message))); | |
209 stream->Close(); | |
210 return; | |
211 } | |
212 // Otherwise the message is divided across multiple QUIC stream frames, so | |
213 // queue the data. OnDataReceived() will be called each time the remaining | |
214 // QUIC stream frames arrive. | |
215 LOG(LS_INFO) << "QUIC data channel " << id_ | |
216 << " is queuing incoming data for stream " << stream->id(); | |
217 rtc::CopyOnWriteBuffer received_data; | |
218 if (len > 0) { | |
219 received_data.AppendData(first_bytes, len); | |
220 } | |
221 Message message; | |
222 message.stream = stream; | |
223 message.buffer = std::move(received_data); | |
224 incoming_quic_streams_[stream->id()] = std::move(message); | |
225 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); | |
226 // The QUIC stream will be removed from |incoming_quic_streams_| once it | |
227 // closes. | |
228 stream->SignalClosed.connect(this, | |
229 &QuicDataChannel::OnIncomingQueuedStreamClosed); | |
230 } | |
231 | |
232 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | |
233 const char* data, | |
234 size_t len) { | |
235 RTC_DCHECK(worker_thread_->IsCurrent()); | |
236 RTC_DCHECK(data); | |
237 const auto& kv = incoming_quic_streams_.find(stream_id); | |
238 RTC_DCHECK(kv != incoming_quic_streams_.end()); | |
239 Message& message = kv->second; | |
240 cricket::ReliableQuicStream* stream = message.stream; | |
241 rtc::CopyOnWriteBuffer& received_data = message.buffer; | |
242 // If the QUIC stream has not received a FIN, then the remote peer is not | |
243 // finished sending data. | |
244 if (!stream->fin_received()) { | |
245 received_data.AppendData(data, len); | |
246 return; | |
247 } | |
248 // Otherwise we are done receiving and can provide the data channel observer | |
249 // with the message. | |
250 LOG(LS_INFO) << "Stream " << stream_id | |
251 << " has finished receiving data for QUIC data channel " << id_; | |
252 received_data.AppendData(data, len); | |
253 DataBuffer final_message(std::move(received_data), false); | |
254 invoker_.AsyncInvoke<void>( | |
255 signaling_thread_, | |
256 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | |
257 // Once the stream is closed, OnDataReceived will not fire for the stream. | |
258 stream->Close(); | |
259 } | |
260 | |
261 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { | |
262 RTC_DCHECK(worker_thread_->IsCurrent()); | |
263 RTC_DCHECK(channel == quic_transport_channel_); | |
264 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; | |
265 invoker_.AsyncInvoke<void>( | |
266 signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); | |
267 } | |
268 | |
269 // Called when a write blocked QUIC stream that has been added to | |
270 // |write_blocked_quic_streams_| is closed. | |
271 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, | |
272 int error) { | |
273 RTC_DCHECK(worker_thread_->IsCurrent()); | |
274 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; | |
275 write_blocked_quic_streams_.erase(stream_id); | |
276 } | |
277 | |
278 // Called when an incoming QUIC stream that has been added to | |
279 // |incoming_quic_streams_| is closed. | |
280 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, | |
281 int error) { | |
282 RTC_DCHECK(worker_thread_->IsCurrent()); | |
283 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; | |
284 incoming_quic_streams_.erase(stream_id); | |
285 } | |
286 | |
287 void QuicDataChannel::OnConnectionClosed() { | |
288 RTC_DCHECK(worker_thread_->IsCurrent()); | |
289 invoker_.AsyncInvoke<void>( | |
290 signaling_thread_, | |
291 rtc::Bind(&QuicDataChannel::SetState_s, this, kClosed)); | |
292 } | |
293 | |
294 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | |
295 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
296 if (observer_) { | |
297 observer_->OnMessage(received_data); | |
298 } | |
299 } | |
300 | |
301 void QuicDataChannel::SetState_s(DataState state) { | |
302 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
303 if (state_ == state || state_ == kClosed) { | |
304 return; | |
305 } | |
306 if (state_ == kClosing && state != kClosed) { | |
307 return; | |
308 } | |
309 LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " | |
310 << id_; | |
311 state_ = state; | |
312 if (observer_) { | |
313 observer_->OnStateChange(); | |
314 } | |
315 } | |
316 | |
317 void QuicDataChannel::OnBufferedAmountChange_s(uint64_t buffered_amount) { | |
318 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
319 if (observer_) { | |
320 observer_->OnBufferedAmountChange(buffered_amount); | |
321 } | |
322 } | |
323 | |
324 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { | |
325 return write_blocked_quic_streams_.size(); | |
326 } | |
327 | |
328 size_t QuicDataChannel::GetNumIncomingStreams() const { | |
329 return incoming_quic_streams_.size(); | |
330 } | |
331 | |
332 } // namespace webrtc | |
OLD | NEW |