Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | |
| 3 * | |
| 4 * Use of this source code is governed by a BSD-style license | |
| 5 * that can be found in the LICENSE file in the root of the source | |
| 6 * tree. An additional intellectual property rights grant can be found | |
| 7 * in the file PATENTS. All contributing project authors may | |
| 8 * be found in the AUTHORS file in the root of the source tree. | |
| 9 */ | |
| 10 | |
| 11 #include "webrtc/api/quicdatachannel.h" | |
| 12 | |
| 13 #include "webrtc/base/bind.h" | |
| 14 #include "webrtc/base/bytebuffer.h" | |
| 15 #include "webrtc/base/logging.h" | |
| 16 #include "webrtc/p2p/quic/quictransportchannel.h" | |
| 17 #include "webrtc/p2p/quic/reliablequicstream.h" | |
| 18 | |
| 19 namespace webrtc { | |
| 20 | |
| 21 QuicDataChannel::QuicDataChannel( | |
| 22 cricket::QuicTransportChannel* quic_transport_channel, | |
| 23 rtc::Thread* signaling_thread, | |
| 24 rtc::Thread* worker_thread, | |
| 25 const std::string& label, | |
| 26 const DataChannelInit* config) | |
|
Taylor Brandstetter
2016/04/01 23:23:41
What should happen if the data channel init is som
mikescarlett
2016/04/05 19:58:50
By error do you mean something like "RTC_DCHECK(co
Taylor Brandstetter
2016/04/05 22:31:17
It definitely shouldn't crash, but the API should
| |
| 27 : quic_transport_channel_(quic_transport_channel), | |
| 28 signaling_thread_(signaling_thread), | |
| 29 worker_thread_(worker_thread), | |
| 30 observer_(nullptr), | |
| 31 id_(config->id), | |
| 32 state_(quic_transport_channel_->quic_state() == | |
| 33 cricket::QUIC_TRANSPORT_CONNECTED | |
| 34 ? kOpen | |
| 35 : kConnecting), | |
| 36 buffered_amount_(0), | |
| 37 num_sent_messages_(0), | |
| 38 label_(label), | |
| 39 protocol_(config->protocol), | |
| 40 send_open_message_(false) { | |
| 41 quic_transport_channel_->SignalWritableState.connect( | |
| 42 this, &QuicDataChannel::OnWritableState); | |
| 43 quic_transport_channel_->SignalClosed.connect( | |
| 44 this, &QuicDataChannel::OnConnectionClosed); | |
| 45 if (state_ == kOpen && send_open_message_) { | |
|
Taylor Brandstetter
2016/04/01 23:23:41
If send_open_message_ is always false I'd just rem
mikescarlett
2016/04/05 19:58:49
Removed.
| |
| 46 worker_thread_->Invoke<void>( | |
| 47 rtc::Bind(&QuicDataChannel::SendOpenMessage_w, this)); | |
| 48 } | |
| 49 } | |
| 50 | |
| 51 QuicDataChannel::~QuicDataChannel() {} | |
| 52 | |
| 53 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | |
| 54 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 55 observer_ = observer; | |
| 56 } | |
| 57 void QuicDataChannel::UnregisterObserver() { | |
| 58 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 59 observer_ = nullptr; | |
| 60 } | |
| 61 | |
| 62 bool QuicDataChannel::Send(const DataBuffer& buffer) { | |
| 63 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 64 return worker_thread_->Invoke<bool>( | |
| 65 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | |
| 66 } | |
| 67 | |
| 68 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | |
| 69 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 70 if (state_ != kOpen) { | |
| 71 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
| 72 << " is not open so cannot send."; | |
| 73 return false; | |
| 74 } | |
| 75 if (buffer.size() == 0) { | |
| 76 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
| 77 << " refuses to send an empty message."; | |
| 78 return true; | |
| 79 } | |
| 80 size_t max_length = buffer.size() + 15; | |
| 81 rtc::ByteBuffer byte_buffer(nullptr, max_length, | |
| 82 rtc::ByteBuffer::ByteOrder::ORDER_HOST); | |
| 83 byte_buffer.WriteVarint(id_); | |
| 84 byte_buffer.WriteVarint(++num_sent_messages_); | |
| 85 byte_buffer.WriteBytes(buffer.data.data<char>(), buffer.size()); | |
| 86 return WriteData_w(byte_buffer.Data(), byte_buffer.Length()); | |
| 87 } | |
| 88 | |
| 89 bool QuicDataChannel::WriteData_w(const char* data, size_t len) { | |
| 90 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 91 cricket::ReliableQuicStream* stream = | |
| 92 quic_transport_channel_->CreateQuicStream(); | |
| 93 // The stream should not be NULL. Otherwise an internal error is preventing | |
| 94 // QUIC streams from being created or the data channel state is invalid. | |
|
Taylor Brandstetter
2016/04/01 23:23:41
If this can only occur due to an internal error, w
mikescarlett
2016/04/05 19:58:50
I agree now. A minor issue is this will crash if a
| |
| 95 if (stream == nullptr) { | |
| 96 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
| 97 << " is open but failed to allocate a QUIC stream."; | |
| 98 return false; | |
| 99 } | |
| 100 // Send the message with FIN == true, which signals to the remote peer that | |
| 101 // there is no more data after this message. | |
| 102 rtc::StreamResult result = stream->Write(data, len, true); | |
| 103 if (result == rtc::SR_SUCCESS) { | |
| 104 // The message is sent and we don't need this QUIC stream. | |
| 105 LOG(INFO) << "Stream " << stream->id() | |
| 106 << " successfully wrote data for QUIC data channel " << id_; | |
| 107 stream->Close(); | |
| 108 return true; | |
| 109 } else if (result == rtc::SR_BLOCK) { | |
| 110 // The QUIC stream is write blocked, so the message will be queued by the | |
| 111 // QUIC session. It might be due to the QUIC transport not being writable, | |
| 112 // or it may be due to exceeding the QUIC flow control limit. | |
| 113 LOG(LS_WARNING) << "Stream " << stream->id() | |
| 114 << " is write blocked for QUIC data channel " << id_; | |
| 115 if (observer_ != nullptr) { | |
| 116 observer_->OnBufferedAmountChange(buffered_amount_); | |
| 117 buffered_amount_ += len; | |
|
Taylor Brandstetter
2016/04/01 23:23:41
I don't see buffered_amount_ ever decrease. Also w
mikescarlett
2016/04/05 19:58:49
I implemented this by adding a new ReliableQuicStr
| |
| 118 } | |
| 119 } | |
| 120 return false; | |
|
Taylor Brandstetter
2016/04/01 23:23:41
If the data is buffered successfully, this method
mikescarlett
2016/04/05 19:58:50
Okay buffering returns true now.
| |
| 121 } | |
| 122 | |
| 123 void QuicDataChannel::SendOpenMessage_w() { | |
| 124 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 125 // Not implemented. | |
| 126 } | |
| 127 | |
| 128 void QuicDataChannel::Close() { | |
| 129 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 130 if (state_ == kClosed) { | |
| 131 return; | |
| 132 } | |
| 133 LOG(LS_INFO) << "Closing QUIC data channel."; | |
| 134 SetState(kClosing); | |
| 135 for (auto& kv : incoming_quic_streams_) { | |
| 136 cricket::ReliableQuicStream* stream = kv.second; | |
| 137 stream->Close(); | |
| 138 } | |
| 139 queued_data_.clear(); | |
| 140 SetState(kClosed); | |
| 141 } | |
| 142 | |
| 143 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { | |
| 144 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 145 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; | |
| 146 incoming_quic_streams_.erase(stream_id); | |
| 147 } | |
| 148 | |
| 149 void QuicDataChannel::OnIncomingStream(cricket::ReliableQuicStream* stream, | |
| 150 rtc::ByteBuffer* remaining_bytes) { | |
|
pthatcher1
2016/03/30 20:34:48
The order should be remaining_bytes, stream, since
mikescarlett
2016/04/05 19:58:50
Done.
| |
| 151 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 152 // The data channel ID has already been read from the byte buffer, so we can | |
| 153 // skip to the message id. | |
| 154 uint64_t message_id; | |
| 155 remaining_bytes->ReadVarint(&message_id); | |
|
Taylor Brandstetter
2016/04/01 23:23:41
Should probably handle failures of ReadVarint.
mikescarlett
2016/04/05 19:58:49
Done. ReadVarint is in QuicDataTransport now.
| |
| 156 | |
| 157 if (stream->fin_received()) { | |
| 158 LOG(LS_INFO) << "Stream " << stream->id() | |
| 159 << " has finished receiving data for QUIC data channel " | |
| 160 << id_; | |
| 161 // A FIN is received if the message fits into a single QUIC stream frame and | |
| 162 // the remote peer is done sending. In this case, propagate the data to the | |
| 163 // observer and close the stream. | |
| 164 if (observer_ != nullptr) { | |
| 165 DataBuffer message(rtc::CopyOnWriteBuffer(remaining_bytes->Data(), | |
| 166 remaining_bytes->Length()), | |
| 167 false); | |
| 168 signaling_thread_->Invoke<void>( | |
|
pthatcher1
2016/03/30 20:34:48
These needs to be an AsyncInvoke
mikescarlett
2016/04/05 19:58:49
Done.
| |
| 169 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); | |
| 170 OnMessage_s(message); | |
|
pthatcher1
2016/03/30 20:34:48
Why do we hop to the signalling thread and then ca
mikescarlett
2016/04/05 19:58:50
That clearly shouldn't be there. Fixed.
| |
| 171 } else { | |
| 172 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
| 173 << " received a message but has no observer."; | |
| 174 } | |
| 175 stream->Close(); | |
| 176 } else { | |
| 177 // Otherwise the message is divided across multiple QUIC stream frames, so | |
| 178 // queue the data. OnDataReceived() will be called each time the remaining | |
| 179 // QUIC stream frames arrive. | |
| 180 LOG(LS_INFO) << "QUIC data channel " << id_ | |
| 181 << " is queuing incoming data for stream " << stream->id(); | |
| 182 incoming_quic_streams_[stream->id()] = stream; | |
| 183 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream->id()]; | |
|
pthatcher1
2016/03/30 20:34:48
When does the buffer get added to the queud_data_?
pthatcher1
2016/03/30 20:34:48
I think we just want a normal rtc::Buffer, not an
Taylor Brandstetter
2016/04/01 23:23:41
Do you mean, "when does the object get constructed
mikescarlett
2016/04/05 19:58:49
I changed it to not use operator[] to create the b
mikescarlett
2016/04/05 19:58:49
It needs to be rtc::CopyOnWriteBuffer due to this
| |
| 184 received_data.AppendData(remaining_bytes->Data(), | |
| 185 remaining_bytes->Length()); | |
| 186 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); | |
| 187 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); | |
| 188 } | |
|
pthatcher1
2016/03/30 20:34:48
Can you use an early return in this code? Perhaps
mikescarlett
2016/04/05 19:58:49
Done.
| |
| 189 } | |
| 190 | |
| 191 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | |
| 192 const char* data, | |
| 193 size_t len) { | |
| 194 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 195 cricket::ReliableQuicStream* stream = incoming_quic_streams_[stream_id]; | |
| 196 // True if the QUIC stream has reeived a FIN, which indicates the remote | |
| 197 // peer is finished receiving data. | |
| 198 bool finished = stream->fin_received(); | |
| 199 // Lookup or create an rtc::CopyOnWriteBuffer. | |
| 200 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream_id]; | |
|
pthatcher1
2016/03/30 20:34:48
We should use .find() and check to see if it's in
Taylor Brandstetter
2016/04/01 23:23:41
It always *should* be. So if you do that, put it i
mikescarlett
2016/04/05 19:58:49
I put an RTC_DCHECK. OnDataReceived is only called
| |
| 201 | |
| 202 if (finished) { | |
| 203 LOG(LS_INFO) << "Stream " << stream_id | |
| 204 << " has finished receiving data for QUIC data channel " | |
| 205 << id_; | |
| 206 received_data.AppendData(data, len); | |
| 207 DataBuffer message(received_data, false); | |
|
pthatcher1
2016/03/30 20:34:48
We should probably use a map of rtc::Buffer* so th
mikescarlett
2016/04/05 19:58:49
What about using std::move to avoid copying for Da
| |
| 208 signaling_thread_->Invoke<void>( | |
| 209 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); | |
| 210 OnMessage_s(message); | |
|
Taylor Brandstetter
2016/04/01 23:23:41
Again, OnMessage_s shouldn't be called twice.
mikescarlett
2016/04/05 19:58:49
Removed.
| |
| 211 queued_data_.erase(stream_id); | |
| 212 stream->Close(); | |
| 213 } else { | |
| 214 received_data.AppendData(data, len); | |
| 215 } | |
|
pthatcher1
2016/03/30 20:34:48
Same here with early returns. Something like this
mikescarlett
2016/04/05 19:58:49
Done.
| |
| 216 } | |
|
pthatcher1
2016/03/30 20:34:48
Now that I see this more completely, I think that
Taylor Brandstetter
2016/04/01 23:23:41
+1
mikescarlett
2016/04/05 19:58:49
I revised it. QuicDataChannelMessage is a struct a
| |
| 217 | |
| 218 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { | |
| 219 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 220 RTC_DCHECK(channel == quic_transport_channel_); | |
| 221 state_ = kOpen; | |
| 222 if (send_open_message_) { | |
| 223 SendOpenMessage_w(); | |
|
pthatcher1
2016/03/30 20:34:48
I though we weren't doing the open message in this
mikescarlett
2016/04/05 19:58:49
Right I'm not implementing it. Fixed
| |
| 224 } | |
| 225 } | |
| 226 | |
| 227 void QuicDataChannel::OnConnectionClosed() { | |
| 228 RTC_DCHECK(worker_thread_->IsCurrent()); | |
| 229 SetState(kClosed); | |
| 230 } | |
| 231 | |
| 232 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | |
| 233 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
| 234 if (observer_ != nullptr) { | |
|
Taylor Brandstetter
2016/04/01 23:23:41
If it's OnMessage_s that checks for an observer, y
mikescarlett
2016/04/05 19:58:49
Replaced with RTC_DCHECK.
| |
| 235 observer_->OnMessage(received_data); | |
| 236 } else { | |
| 237 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
| 238 << " received a message but has no observer."; | |
| 239 } | |
|
pthatcher1
2016/03/30 20:34:48
Early return here please.
mikescarlett
2016/04/05 19:58:49
Done.
| |
| 240 } | |
| 241 | |
| 242 void QuicDataChannel::SetState(DataState state) { | |
| 243 if (state_ == state) { | |
| 244 return; | |
| 245 } | |
| 246 state_ = state; | |
| 247 if (observer_) { | |
| 248 observer_->OnStateChange(); | |
| 249 } | |
| 250 if (state_ == kClosed) { | |
| 251 SignalClosed(this); | |
| 252 } | |
| 253 } | |
| 254 | |
| 255 } // namespace webrtc | |
| OLD | NEW |