OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/api/quicdatachannel.h" | |
12 | |
13 #include "webrtc/base/bind.h" | |
14 #include "webrtc/base/bytebuffer.h" | |
15 #include "webrtc/base/copyonwritebuffer.h" | |
16 #include "webrtc/base/logging.h" | |
17 #include "webrtc/p2p/quic/quictransportchannel.h" | |
18 #include "webrtc/p2p/quic/reliablequicstream.h" | |
19 | |
20 namespace webrtc { | |
21 | |
22 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, | |
23 rtc::Thread* worker_thread, | |
24 const std::string& label, | |
25 const DataChannelInit* config, | |
26 const MessageHelper& message_helper) | |
27 : signaling_thread_(signaling_thread), | |
28 worker_thread_(worker_thread), | |
29 message_helper_(message_helper), | |
30 observer_(nullptr), | |
31 id_(config->id), | |
32 state_(kConnecting), | |
33 buffered_amount_(0), | |
34 num_sent_messages_(0), | |
35 label_(label), | |
36 protocol_(config->protocol) {} | |
37 | |
38 QuicDataChannel::~QuicDataChannel() {} | |
39 | |
40 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | |
41 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
42 observer_ = observer; | |
43 } | |
44 void QuicDataChannel::UnregisterObserver() { | |
45 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
46 observer_ = nullptr; | |
47 } | |
48 | |
49 bool QuicDataChannel::Send(const DataBuffer& buffer) { | |
50 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
51 return worker_thread_->Invoke<bool>( | |
52 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | |
53 } | |
54 | |
55 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | |
56 RTC_DCHECK(worker_thread_->IsCurrent()); | |
57 if (state_ != kOpen) { | |
pthatcher1
2016/04/12 00:58:38
Since state_ is set on the signaling thread, I thi
pthatcher1
2016/04/12 00:58:38
Since the state is set on the signaling, I think w
mikescarlett
2016/04/13 16:53:37
Ok done.
| |
58 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
59 << " is not open so cannot send."; | |
60 return false; | |
61 } | |
62 if (buffer.size() == 0) { | |
63 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
64 << " refuses to send an empty message."; | |
65 return true; | |
66 } | |
pthatcher1
2016/04/12 00:58:38
Why? If we prepend the message with the data chan
mikescarlett
2016/04/13 16:53:37
I wanted to be consistent with the SCTP data chann
| |
67 RTC_DCHECK(quic_transport_channel_ != nullptr); | |
68 rtc::CopyOnWriteBuffer payload; | |
69 message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload); | |
pthatcher1
2016/04/12 00:58:38
Instead of copying the whole message (which may be
mikescarlett
2016/04/13 16:53:37
Yes that's fine.
| |
70 return WriteData_w(payload.data<char>(), payload.size()); | |
pthatcher1
2016/04/12 00:58:38
Instead of making WriteData_w its own method, can
pthatcher1
2016/04/12 00:58:39
Rather than having WriteData_w be a separate metho
mikescarlett
2016/04/13 16:53:37
I'll combine them.
| |
71 } | |
72 | |
73 // TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at | |
74 // the stream and connection level, so that peers can set the maximum size of a | |
75 // message. Currently QUIC defaults to 16KB per stream and session and will | |
76 // block bytes exceeding this limit. | |
pthatcher1
2016/04/12 00:58:38
So we can't send data messages larger than 16KB?
mikescarlett
2016/04/13 16:53:37
I meant to say that the session buffers messages l
| |
77 bool QuicDataChannel::WriteData_w(const char* data, size_t len) { | |
78 RTC_DCHECK(worker_thread_->IsCurrent()); | |
79 cricket::ReliableQuicStream* stream = | |
80 quic_transport_channel_->CreateQuicStream(); | |
81 RTC_DCHECK(stream != nullptr); | |
82 // Send the message with FIN == true, which signals to the remote peer that | |
83 // there is no more data after this message. | |
84 uint64_t old_queued_bytes = stream->queued_data_bytes(); | |
85 rtc::StreamResult result = stream->Write(data, len, true); | |
86 if (result == rtc::SR_SUCCESS) { | |
87 // The message is sent and we don't need this QUIC stream. | |
88 LOG(LS_INFO) << "Stream " << stream->id() | |
89 << " successfully wrote data for QUIC data channel " << id_; | |
90 stream->Close(); | |
91 return true; | |
92 } | |
93 if (result == rtc::SR_BLOCK) { | |
94 // The QUIC stream is write blocked, so the message will be queued by the | |
95 // QUIC session. It might be due to the QUIC transport not being writable, | |
96 // or it may be due to exceeding the QUIC flow control limit. | |
97 LOG(LS_WARNING) << "Stream " << stream->id() | |
98 << " is write blocked for QUIC data channel " << id_; | |
99 if (observer_ != nullptr) { | |
100 observer_->OnBufferedAmountChange(buffered_amount_); | |
101 } | |
102 buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes; | |
103 stream->SignalQueuedBytesWritten.connect( | |
104 this, &QuicDataChannel::OnQueuedBytesWritten); | |
105 // Once the stream has stopped writing, it will be closed. | |
106 stream->StopReading(); | |
107 write_blocked_quic_streams_[stream->id()] = stream; | |
108 return true; | |
109 } | |
110 return false; | |
pthatcher1
2016/04/12 00:58:39
We should probably log what the result was, since
mikescarlett
2016/04/13 16:53:37
Done.
| |
111 } | |
112 | |
113 void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, | |
114 uint64_t queued_bytes_written) { | |
115 if (observer_ != nullptr) { | |
116 observer_->OnBufferedAmountChange(buffered_amount_); | |
pthatcher1
2016/04/12 00:58:39
What thread is this called on? Because I'm pretty
mikescarlett
2016/04/13 16:53:37
OnQueuedBytesWritten is on the worker thread. I fi
| |
117 } | |
118 buffered_amount_ -= queued_bytes_written; | |
119 } | |
120 | |
121 void QuicDataChannel::SetTransportChannel( | |
122 cricket::QuicTransportChannel* channel) { | |
123 RTC_DCHECK(channel != nullptr); | |
pthatcher1
2016/04/12 00:58:38
RTC_DCHECK(channel) is sufficient.
mikescarlett
2016/04/13 16:53:37
Done.
| |
124 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; | |
125 quic_transport_channel_ = channel; | |
126 quic_transport_channel_->SignalWritableState.connect( | |
127 this, &QuicDataChannel::OnWritableState); | |
128 quic_transport_channel_->SignalClosed.connect( | |
129 this, &QuicDataChannel::OnConnectionClosed); | |
pthatcher1
2016/04/12 00:58:39
We also need to disconnect from the previous quic_
mikescarlett
2016/04/13 16:53:37
This function should only be called once. I added
| |
130 if (quic_transport_channel_->writable()) { | |
131 SetState(kOpen); | |
132 } | |
133 } | |
134 | |
135 void QuicDataChannel::OnIncomingStream(uint64_t message_id, | |
136 const char* first_bytes, | |
137 size_t len, | |
138 cricket::ReliableQuicStream* stream) { | |
139 RTC_DCHECK(worker_thread_->IsCurrent()); | |
140 RTC_DCHECK(first_bytes != nullptr); | |
141 RTC_DCHECK(stream != nullptr); | |
142 if (!observer_) { | |
143 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
144 << " received a message but has no observer."; | |
145 stream->Close(); | |
146 return; | |
147 } | |
148 // A FIN is received if the message fits into a single QUIC stream frame and | |
149 // the remote peer is done sending. | |
150 if (!stream->fin_received()) { | |
151 // If FIN is not received, the message is divided across multiple QUIC | |
152 // stream frames, so queue the data. OnDataReceived() will be called each | |
153 // time the remaining QUIC stream frames arrive. | |
154 LOG(LS_INFO) << "QUIC data channel " << id_ | |
155 << " is queuing incoming data for stream " << stream->id(); | |
156 rtc::CopyOnWriteBuffer received_data; | |
157 received_data.AppendData(first_bytes, len); | |
158 Message message; | |
159 message.stream = stream; | |
160 message.buffer = std::move(received_data); | |
161 incoming_quic_streams_[stream->id()] = std::move(message); | |
162 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); | |
163 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); | |
164 return; | |
165 } | |
166 // Otherwise propagate the message to the observer. | |
167 LOG(LS_INFO) << "Stream " << stream->id() | |
168 << " has finished receiving data for QUIC data channel " << id_; | |
169 DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false); | |
170 invoker_.AsyncInvoke<void>( | |
171 signaling_thread_, | |
172 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | |
173 stream->Close(); | |
pthatcher1
2016/04/12 00:58:39
I think flipping this around to
if (stream->fin_r
mikescarlett
2016/04/13 16:53:37
Done.
| |
174 } | |
175 | |
176 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | |
177 const char* data, | |
178 size_t len) { | |
179 RTC_DCHECK(worker_thread_->IsCurrent()); | |
180 RTC_DCHECK(data != nullptr); | |
181 const auto& kv = incoming_quic_streams_.find(stream_id); | |
182 RTC_DCHECK(kv != incoming_quic_streams_.end()); | |
183 Message& message = kv->second; | |
184 cricket::ReliableQuicStream* stream = message.stream; | |
185 rtc::CopyOnWriteBuffer& received_data = message.buffer; | |
186 // If the QUIC stream has not received a FIN, then the remote peer is not | |
187 // finished sending data. | |
188 if (!stream->fin_received()) { | |
189 received_data.AppendData(data, len); | |
190 return; | |
191 } | |
192 // Otherwise we are done receiving and can provide the data channel observer | |
193 // with the message. | |
194 LOG(LS_INFO) << "Stream " << stream_id | |
195 << " has finished receiving data for QUIC data channel " << id_; | |
196 received_data.AppendData(data, len); | |
197 DataBuffer final_message(std::move(received_data), false); | |
198 invoker_.AsyncInvoke<void>( | |
199 signaling_thread_, | |
200 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | |
201 // Once the stream is closed, OnDataReceived should not be fired. | |
202 stream->Close(); | |
203 incoming_quic_streams_.erase(stream_id); | |
204 } | |
205 | |
206 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | |
207 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
208 RTC_DCHECK(observer_ != nullptr); | |
209 observer_->OnMessage(received_data); | |
210 } | |
211 | |
212 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { | |
213 RTC_DCHECK(worker_thread_->IsCurrent()); | |
214 RTC_DCHECK(channel == quic_transport_channel_); | |
215 LOG(LS_INFO) << "QuicTransportChannel is writable"; | |
216 SetState(kOpen); | |
pthatcher1
2016/04/12 00:58:38
Actually, you need to check channel->writable().
mikescarlett
2016/04/13 16:53:37
Done.
| |
217 } | |
218 | |
219 void QuicDataChannel::Close() { | |
220 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
221 if (state_ == kClosed) { | |
222 return; | |
223 } | |
224 LOG(LS_INFO) << "Closing QUIC data channel."; | |
225 SetState(kClosing); | |
226 | |
227 for (auto& kv : incoming_quic_streams_) { | |
228 Message& message = kv.second; | |
229 cricket::ReliableQuicStream* stream = message.stream; | |
230 stream->Close(); | |
231 } | |
232 incoming_quic_streams_.clear(); | |
233 | |
234 for (auto& kv : write_blocked_quic_streams_) { | |
235 cricket::ReliableQuicStream* stream = kv.second; | |
236 stream->Close(); | |
237 } | |
238 write_blocked_quic_streams_.clear(); | |
239 | |
240 SetState(kClosed); | |
241 } | |
242 | |
243 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { | |
244 RTC_DCHECK(worker_thread_->IsCurrent()); | |
245 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; | |
246 incoming_quic_streams_.erase(stream_id); | |
247 } | |
248 | |
249 void QuicDataChannel::OnConnectionClosed() { | |
250 RTC_DCHECK(worker_thread_->IsCurrent()); | |
251 SetState(kClosed); | |
252 } | |
253 | |
254 void QuicDataChannel::SetState(DataState state) { | |
255 if (state_ == state) { | |
256 return; | |
257 } | |
258 LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " | |
259 << id_; | |
260 state_ = state; | |
261 if (observer_) { | |
262 observer_->OnStateChange(); | |
263 } | |
264 if (state_ == kClosed) { | |
265 SignalClosed(this); | |
266 } | |
267 } | |
268 | |
269 } // namespace webrtc | |
OLD | NEW |