Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | |
| 3 * | |
| 4 * Use of this source code is governed by a BSD-style license | |
| 5 * that can be found in the LICENSE file in the root of the source | |
| 6 * tree. An additional intellectual property rights grant can be found | |
| 7 * in the file PATENTS. All contributing project authors may | |
| 8 * be found in the AUTHORS file in the root of the source tree. | |
| 9 */ | |
| 10 | |
| 11 #include "webrtc/api/quicdatachannel.h" | |
| 12 | |
| 13 #include "webrtc/base/bind.h" | |
| 14 #include "webrtc/base/bytebuffer.h" | |
| 15 #include "webrtc/base/copyonwritebuffer.h" | |
| 16 #include "webrtc/base/logging.h" | |
| 17 #include "webrtc/p2p/quic/quictransportchannel.h" | |
| 18 #include "webrtc/p2p/quic/reliablequicstream.h" | |
| 19 | |
| 20 namespace webrtc { | |
| 21 | |
| 22 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, | |
| 23 rtc::Thread* worker_thread, | |
| 24 const std::string& label, | |
| 25 const DataChannelInit* config, | |
| 26 const MessageHelper& message_helper) | |
| 27 : signaling_thread_(signaling_thread), | |
| 28 worker_thread_(worker_thread), | |
| 29 message_helper_(message_helper), | |
| 30 observer_(nullptr), | |
| 31 id_(config->id), | |
| 32 state_(kConnecting), | |
| 33 buffered_amount_(0), | |
| 34 num_sent_messages_(0), | |
| 35 label_(label), | |
| 36 protocol_(config->protocol) {} | |
| 37 | |
| 38 QuicDataChannel::~QuicDataChannel() {} | |
| 39 | |
| 40 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | |
| 41 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 42 observer_ = observer; | |
| 43 } | |
| 44 void QuicDataChannel::UnregisterObserver() { | |
| 45 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 46 observer_ = nullptr; | |
| 47 } | |
| 48 | |
| 49 bool QuicDataChannel::Send(const DataBuffer& buffer) { | |
| 50 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 51 return worker_thread_->Invoke<bool>( | |
| 52 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | |
| 53 } | |
| 54 | |
| 55 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | |
| 56 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 57 if (state_ != kOpen) { | |
|
pthatcher1
2016/04/12 00:58:38
Since state_ is set on the signaling thread, I thi
pthatcher1
2016/04/12 00:58:38
Since the state is set on the signaling, I think w
mikescarlett
2016/04/13 16:53:37
Ok done.
| |
| 58 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
| 59 << " is not open so cannot send."; | |
| 60 return false; | |
| 61 } | |
| 62 if (buffer.size() == 0) { | |
| 63 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
| 64 << " refuses to send an empty message."; | |
| 65 return true; | |
| 66 } | |
|
pthatcher1
2016/04/12 00:58:38
Why? If we prepend the message with the data chan
mikescarlett
2016/04/13 16:53:37
I wanted to be consistent with the SCTP data chann
| |
| 67 RTC_DCHECK(quic_transport_channel_ != nullptr); | |
| 68 rtc::CopyOnWriteBuffer payload; | |
| 69 message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload); | |
|
pthatcher1
2016/04/12 00:58:38
Instead of copying the whole message (which may be
mikescarlett
2016/04/13 16:53:37
Yes that's fine.
| |
| 70 return WriteData_w(payload.data<char>(), payload.size()); | |
|
pthatcher1
2016/04/12 00:58:38
Instead of making WriteData_w its own method, can
pthatcher1
2016/04/12 00:58:39
Rather than having WriteData_w be a separate metho
mikescarlett
2016/04/13 16:53:37
I'll combine them.
| |
| 71 } | |
| 72 | |
| 73 // TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at | |
| 74 // the stream and connection level, so that peers can set the maximum size of a | |
| 75 // message. Currently QUIC defaults to 16KB per stream and session and will | |
| 76 // block bytes exceeding this limit. | |
|
pthatcher1
2016/04/12 00:58:38
So we can't send data messages larger than 16KB?
mikescarlett
2016/04/13 16:53:37
I meant to say that the session buffers messages l
| |
| 77 bool QuicDataChannel::WriteData_w(const char* data, size_t len) { | |
| 78 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 79 cricket::ReliableQuicStream* stream = | |
| 80 quic_transport_channel_->CreateQuicStream(); | |
| 81 RTC_DCHECK(stream != nullptr); | |
| 82 // Send the message with FIN == true, which signals to the remote peer that | |
| 83 // there is no more data after this message. | |
| 84 uint64_t old_queued_bytes = stream->queued_data_bytes(); | |
| 85 rtc::StreamResult result = stream->Write(data, len, true); | |
| 86 if (result == rtc::SR_SUCCESS) { | |
| 87 // The message is sent and we don't need this QUIC stream. | |
| 88 LOG(LS_INFO) << "Stream " << stream->id() | |
| 89 << " successfully wrote data for QUIC data channel " << id_; | |
| 90 stream->Close(); | |
| 91 return true; | |
| 92 } | |
| 93 if (result == rtc::SR_BLOCK) { | |
| 94 // The QUIC stream is write blocked, so the message will be queued by the | |
| 95 // QUIC session. It might be due to the QUIC transport not being writable, | |
| 96 // or it may be due to exceeding the QUIC flow control limit. | |
| 97 LOG(LS_WARNING) << "Stream " << stream->id() | |
| 98 << " is write blocked for QUIC data channel " << id_; | |
| 99 if (observer_ != nullptr) { | |
| 100 observer_->OnBufferedAmountChange(buffered_amount_); | |
| 101 } | |
| 102 buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes; | |
| 103 stream->SignalQueuedBytesWritten.connect( | |
| 104 this, &QuicDataChannel::OnQueuedBytesWritten); | |
| 105 // Once the stream has stopped writing, it will be closed. | |
| 106 stream->StopReading(); | |
| 107 write_blocked_quic_streams_[stream->id()] = stream; | |
| 108 return true; | |
| 109 } | |
| 110 return false; | |
|
pthatcher1
2016/04/12 00:58:39
We should probably log what the result was, since
mikescarlett
2016/04/13 16:53:37
Done.
| |
| 111 } | |
| 112 | |
| 113 void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, | |
| 114 uint64_t queued_bytes_written) { | |
| 115 if (observer_ != nullptr) { | |
| 116 observer_->OnBufferedAmountChange(buffered_amount_); | |
|
pthatcher1
2016/04/12 00:58:39
What thread is this called on? Because I'm pretty
mikescarlett
2016/04/13 16:53:37
OnQueuedBytesWritten is on the worker thread. I fi
| |
| 117 } | |
| 118 buffered_amount_ -= queued_bytes_written; | |
| 119 } | |
| 120 | |
| 121 void QuicDataChannel::SetTransportChannel( | |
| 122 cricket::QuicTransportChannel* channel) { | |
| 123 RTC_DCHECK(channel != nullptr); | |
|
pthatcher1
2016/04/12 00:58:38
RTC_DCHECK(channel) is sufficient.
mikescarlett
2016/04/13 16:53:37
Done.
| |
| 124 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; | |
| 125 quic_transport_channel_ = channel; | |
| 126 quic_transport_channel_->SignalWritableState.connect( | |
| 127 this, &QuicDataChannel::OnWritableState); | |
| 128 quic_transport_channel_->SignalClosed.connect( | |
| 129 this, &QuicDataChannel::OnConnectionClosed); | |
|
pthatcher1
2016/04/12 00:58:39
We also need to disconnect from the previous quic_
mikescarlett
2016/04/13 16:53:37
This function should only be called once. I added
| |
| 130 if (quic_transport_channel_->writable()) { | |
| 131 SetState(kOpen); | |
| 132 } | |
| 133 } | |
| 134 | |
| 135 void QuicDataChannel::OnIncomingStream(uint64_t message_id, | |
| 136 const char* first_bytes, | |
| 137 size_t len, | |
| 138 cricket::ReliableQuicStream* stream) { | |
| 139 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 140 RTC_DCHECK(first_bytes != nullptr); | |
| 141 RTC_DCHECK(stream != nullptr); | |
| 142 if (!observer_) { | |
| 143 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
| 144 << " received a message but has no observer."; | |
| 145 stream->Close(); | |
| 146 return; | |
| 147 } | |
| 148 // A FIN is received if the message fits into a single QUIC stream frame and | |
| 149 // the remote peer is done sending. | |
| 150 if (!stream->fin_received()) { | |
| 151 // If FIN is not received, the message is divided across multiple QUIC | |
| 152 // stream frames, so queue the data. OnDataReceived() will be called each | |
| 153 // time the remaining QUIC stream frames arrive. | |
| 154 LOG(LS_INFO) << "QUIC data channel " << id_ | |
| 155 << " is queuing incoming data for stream " << stream->id(); | |
| 156 rtc::CopyOnWriteBuffer received_data; | |
| 157 received_data.AppendData(first_bytes, len); | |
| 158 Message message; | |
| 159 message.stream = stream; | |
| 160 message.buffer = std::move(received_data); | |
| 161 incoming_quic_streams_[stream->id()] = std::move(message); | |
| 162 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); | |
| 163 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); | |
| 164 return; | |
| 165 } | |
| 166 // Otherwise propagate the message to the observer. | |
| 167 LOG(LS_INFO) << "Stream " << stream->id() | |
| 168 << " has finished receiving data for QUIC data channel " << id_; | |
| 169 DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false); | |
| 170 invoker_.AsyncInvoke<void>( | |
| 171 signaling_thread_, | |
| 172 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | |
| 173 stream->Close(); | |
|
pthatcher1
2016/04/12 00:58:39
I think flipping this around to
if (stream->fin_r
mikescarlett
2016/04/13 16:53:37
Done.
| |
| 174 } | |
| 175 | |
| 176 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | |
| 177 const char* data, | |
| 178 size_t len) { | |
| 179 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 180 RTC_DCHECK(data != nullptr); | |
| 181 const auto& kv = incoming_quic_streams_.find(stream_id); | |
| 182 RTC_DCHECK(kv != incoming_quic_streams_.end()); | |
| 183 Message& message = kv->second; | |
| 184 cricket::ReliableQuicStream* stream = message.stream; | |
| 185 rtc::CopyOnWriteBuffer& received_data = message.buffer; | |
| 186 // If the QUIC stream has not received a FIN, then the remote peer is not | |
| 187 // finished sending data. | |
| 188 if (!stream->fin_received()) { | |
| 189 received_data.AppendData(data, len); | |
| 190 return; | |
| 191 } | |
| 192 // Otherwise we are done receiving and can provide the data channel observer | |
| 193 // with the message. | |
| 194 LOG(LS_INFO) << "Stream " << stream_id | |
| 195 << " has finished receiving data for QUIC data channel " << id_; | |
| 196 received_data.AppendData(data, len); | |
| 197 DataBuffer final_message(std::move(received_data), false); | |
| 198 invoker_.AsyncInvoke<void>( | |
| 199 signaling_thread_, | |
| 200 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | |
| 201 // Once the stream is closed, OnDataReceived should not be fired. | |
| 202 stream->Close(); | |
| 203 incoming_quic_streams_.erase(stream_id); | |
| 204 } | |
| 205 | |
| 206 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | |
| 207 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 208 RTC_DCHECK(observer_ != nullptr); | |
| 209 observer_->OnMessage(received_data); | |
| 210 } | |
| 211 | |
| 212 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { | |
| 213 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 214 RTC_DCHECK(channel == quic_transport_channel_); | |
| 215 LOG(LS_INFO) << "QuicTransportChannel is writable"; | |
| 216 SetState(kOpen); | |
|
pthatcher1
2016/04/12 00:58:38
Actually, you need to check channel->writable().
mikescarlett
2016/04/13 16:53:37
Done.
| |
| 217 } | |
| 218 | |
| 219 void QuicDataChannel::Close() { | |
| 220 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 221 if (state_ == kClosed) { | |
| 222 return; | |
| 223 } | |
| 224 LOG(LS_INFO) << "Closing QUIC data channel."; | |
| 225 SetState(kClosing); | |
| 226 | |
| 227 for (auto& kv : incoming_quic_streams_) { | |
| 228 Message& message = kv.second; | |
| 229 cricket::ReliableQuicStream* stream = message.stream; | |
| 230 stream->Close(); | |
| 231 } | |
| 232 incoming_quic_streams_.clear(); | |
| 233 | |
| 234 for (auto& kv : write_blocked_quic_streams_) { | |
| 235 cricket::ReliableQuicStream* stream = kv.second; | |
| 236 stream->Close(); | |
| 237 } | |
| 238 write_blocked_quic_streams_.clear(); | |
| 239 | |
| 240 SetState(kClosed); | |
| 241 } | |
| 242 | |
| 243 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { | |
| 244 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 245 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; | |
| 246 incoming_quic_streams_.erase(stream_id); | |
| 247 } | |
| 248 | |
| 249 void QuicDataChannel::OnConnectionClosed() { | |
| 250 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 251 SetState(kClosed); | |
| 252 } | |
| 253 | |
| 254 void QuicDataChannel::SetState(DataState state) { | |
| 255 if (state_ == state) { | |
| 256 return; | |
| 257 } | |
| 258 LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " | |
| 259 << id_; | |
| 260 state_ = state; | |
| 261 if (observer_) { | |
| 262 observer_->OnStateChange(); | |
| 263 } | |
| 264 if (state_ == kClosed) { | |
| 265 SignalClosed(this); | |
| 266 } | |
| 267 } | |
| 268 | |
| 269 } // namespace webrtc | |
| OLD | NEW |