Index: webrtc/api/quicdatachannel.cc |
diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..32984b5746a930c208ad79a99364dae86596800e |
--- /dev/null |
+++ b/webrtc/api/quicdatachannel.cc |
@@ -0,0 +1,255 @@ |
+/* |
+ * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
+ * |
+ * Use of this source code is governed by a BSD-style license |
+ * that can be found in the LICENSE file in the root of the source |
+ * tree. An additional intellectual property rights grant can be found |
+ * in the file PATENTS. All contributing project authors may |
+ * be found in the AUTHORS file in the root of the source tree. |
+ */ |
+ |
+#include "webrtc/api/quicdatachannel.h" |
+ |
+#include "webrtc/base/bind.h" |
+#include "webrtc/base/bytebuffer.h" |
+#include "webrtc/base/logging.h" |
+#include "webrtc/p2p/quic/quictransportchannel.h" |
+#include "webrtc/p2p/quic/reliablequicstream.h" |
+ |
+namespace webrtc { |
+ |
+QuicDataChannel::QuicDataChannel( |
+ cricket::QuicTransportChannel* quic_transport_channel, |
+ rtc::Thread* signaling_thread, |
+ rtc::Thread* worker_thread, |
+ const std::string& label, |
+ const DataChannelInit* config) |
Taylor Brandstetter
2016/04/01 23:23:41
What should happen if the data channel init is som
mikescarlett
2016/04/05 19:58:50
By error do you mean something like "RTC_DCHECK(co
Taylor Brandstetter
2016/04/05 22:31:17
It definitely shouldn't crash, but the API should
|
+ : quic_transport_channel_(quic_transport_channel), |
+ signaling_thread_(signaling_thread), |
+ worker_thread_(worker_thread), |
+ observer_(nullptr), |
+ id_(config->id), |
+ state_(quic_transport_channel_->quic_state() == |
+ cricket::QUIC_TRANSPORT_CONNECTED |
+ ? kOpen |
+ : kConnecting), |
+ buffered_amount_(0), |
+ num_sent_messages_(0), |
+ label_(label), |
+ protocol_(config->protocol), |
+ send_open_message_(false) { |
+ quic_transport_channel_->SignalWritableState.connect( |
+ this, &QuicDataChannel::OnWritableState); |
+ quic_transport_channel_->SignalClosed.connect( |
+ this, &QuicDataChannel::OnConnectionClosed); |
+ if (state_ == kOpen && send_open_message_) { |
Taylor Brandstetter
2016/04/01 23:23:41
If send_open_message_ is always false I'd just rem
mikescarlett
2016/04/05 19:58:49
Removed.
|
+ worker_thread_->Invoke<void>( |
+ rtc::Bind(&QuicDataChannel::SendOpenMessage_w, this)); |
+ } |
+} |
+ |
+QuicDataChannel::~QuicDataChannel() {} |
+ |
+void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ observer_ = observer; |
+} |
+void QuicDataChannel::UnregisterObserver() { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ observer_ = nullptr; |
+} |
+ |
+bool QuicDataChannel::Send(const DataBuffer& buffer) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
+} |
+ |
+bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ if (state_ != kOpen) { |
+ LOG(LS_ERROR) << "QUIC data channel " << id_ |
+ << " is not open so cannot send."; |
+ return false; |
+ } |
+ if (buffer.size() == 0) { |
+ LOG(LS_WARNING) << "QUIC data channel " << id_ |
+ << " refuses to send an empty message."; |
+ return true; |
+ } |
+ size_t max_length = buffer.size() + 15; |
+ rtc::ByteBuffer byte_buffer(nullptr, max_length, |
+ rtc::ByteBuffer::ByteOrder::ORDER_HOST); |
+ byte_buffer.WriteVarint(id_); |
+ byte_buffer.WriteVarint(++num_sent_messages_); |
+ byte_buffer.WriteBytes(buffer.data.data<char>(), buffer.size()); |
+ return WriteData_w(byte_buffer.Data(), byte_buffer.Length()); |
+} |
+ |
+bool QuicDataChannel::WriteData_w(const char* data, size_t len) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ cricket::ReliableQuicStream* stream = |
+ quic_transport_channel_->CreateQuicStream(); |
+ // The stream should not be NULL. Otherwise an internal error is preventing |
+ // QUIC streams from being created or the data channel state is invalid. |
Taylor Brandstetter
2016/04/01 23:23:41
If this can only occur due to an internal error, w
mikescarlett
2016/04/05 19:58:50
I agree now. A minor issue is this will crash if a
|
+ if (stream == nullptr) { |
+ LOG(LS_ERROR) << "QUIC data channel " << id_ |
+ << " is open but failed to allocate a QUIC stream."; |
+ return false; |
+ } |
+ // Send the message with FIN == true, which signals to the remote peer that |
+ // there is no more data after this message. |
+ rtc::StreamResult result = stream->Write(data, len, true); |
+ if (result == rtc::SR_SUCCESS) { |
+ // The message is sent and we don't need this QUIC stream. |
+ LOG(INFO) << "Stream " << stream->id() |
+ << " successfully wrote data for QUIC data channel " << id_; |
+ stream->Close(); |
+ return true; |
+ } else if (result == rtc::SR_BLOCK) { |
+ // The QUIC stream is write blocked, so the message will be queued by the |
+ // QUIC session. It might be due to the QUIC transport not being writable, |
+ // or it may be due to exceeding the QUIC flow control limit. |
+ LOG(LS_WARNING) << "Stream " << stream->id() |
+ << " is write blocked for QUIC data channel " << id_; |
+ if (observer_ != nullptr) { |
+ observer_->OnBufferedAmountChange(buffered_amount_); |
+ buffered_amount_ += len; |
Taylor Brandstetter
2016/04/01 23:23:41
I don't see buffered_amount_ ever decrease. Also w
mikescarlett
2016/04/05 19:58:49
I implemented this by adding a new ReliableQuicStr
|
+ } |
+ } |
+ return false; |
Taylor Brandstetter
2016/04/01 23:23:41
If the data is buffered successfully, this method
mikescarlett
2016/04/05 19:58:50
Okay buffering returns true now.
|
+} |
+ |
+void QuicDataChannel::SendOpenMessage_w() { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ // Not implemented. |
+} |
+ |
+void QuicDataChannel::Close() { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ if (state_ == kClosed) { |
+ return; |
+ } |
+ LOG(LS_INFO) << "Closing QUIC data channel."; |
+ SetState(kClosing); |
+ for (auto& kv : incoming_quic_streams_) { |
+ cricket::ReliableQuicStream* stream = kv.second; |
+ stream->Close(); |
+ } |
+ queued_data_.clear(); |
+ SetState(kClosed); |
+} |
+ |
+void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; |
+ incoming_quic_streams_.erase(stream_id); |
+} |
+ |
+void QuicDataChannel::OnIncomingStream(cricket::ReliableQuicStream* stream, |
+ rtc::ByteBuffer* remaining_bytes) { |
pthatcher1
2016/03/30 20:34:48
The order should be remaining_bytes, stream, since
mikescarlett
2016/04/05 19:58:50
Done.
|
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ // The data channel ID has already been read from the byte buffer, so we can |
+ // skip to the message id. |
+ uint64_t message_id; |
+ remaining_bytes->ReadVarint(&message_id); |
Taylor Brandstetter
2016/04/01 23:23:41
Should probably handle failures of ReadVarint.
mikescarlett
2016/04/05 19:58:49
Done. ReadVarint is in QuicDataTransport now.
|
+ |
+ if (stream->fin_received()) { |
+ LOG(LS_INFO) << "Stream " << stream->id() |
+ << " has finished receiving data for QUIC data channel " |
+ << id_; |
+ // A FIN is received if the message fits into a single QUIC stream frame and |
+ // the remote peer is done sending. In this case, propagate the data to the |
+ // observer and close the stream. |
+ if (observer_ != nullptr) { |
+ DataBuffer message(rtc::CopyOnWriteBuffer(remaining_bytes->Data(), |
+ remaining_bytes->Length()), |
+ false); |
+ signaling_thread_->Invoke<void>( |
pthatcher1
2016/03/30 20:34:48
These needs to be an AsyncInvoke
mikescarlett
2016/04/05 19:58:49
Done.
|
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); |
+ OnMessage_s(message); |
pthatcher1
2016/03/30 20:34:48
Why do we hop to the signalling thread and then ca
mikescarlett
2016/04/05 19:58:50
That clearly shouldn't be there. Fixed.
|
+ } else { |
+ LOG(LS_WARNING) << "QUIC data channel " << id_ |
+ << " received a message but has no observer."; |
+ } |
+ stream->Close(); |
+ } else { |
+ // Otherwise the message is divided across multiple QUIC stream frames, so |
+ // queue the data. OnDataReceived() will be called each time the remaining |
+ // QUIC stream frames arrive. |
+ LOG(LS_INFO) << "QUIC data channel " << id_ |
+ << " is queuing incoming data for stream " << stream->id(); |
+ incoming_quic_streams_[stream->id()] = stream; |
+ rtc::CopyOnWriteBuffer& received_data = queued_data_[stream->id()]; |
pthatcher1
2016/03/30 20:34:48
When does the buffer get added to the queud_data_?
pthatcher1
2016/03/30 20:34:48
I think we just want a normal rtc::Buffer, not an
Taylor Brandstetter
2016/04/01 23:23:41
Do you mean, "when does the object get constructed
mikescarlett
2016/04/05 19:58:49
I changed it to not use operator[] to create the b
mikescarlett
2016/04/05 19:58:49
It needs to be rtc::CopyOnWriteBuffer due to this
|
+ received_data.AppendData(remaining_bytes->Data(), |
+ remaining_bytes->Length()); |
+ stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); |
+ stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); |
+ } |
pthatcher1
2016/03/30 20:34:48
Can you use an early return in this code? Perhaps
mikescarlett
2016/04/05 19:58:49
Done.
|
+} |
+ |
+void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, |
+ const char* data, |
+ size_t len) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ cricket::ReliableQuicStream* stream = incoming_quic_streams_[stream_id]; |
+ // True if the QUIC stream has reeived a FIN, which indicates the remote |
+ // peer is finished receiving data. |
+ bool finished = stream->fin_received(); |
+ // Lookup or create an rtc::CopyOnWriteBuffer. |
+ rtc::CopyOnWriteBuffer& received_data = queued_data_[stream_id]; |
pthatcher1
2016/03/30 20:34:48
We should use .find() and check to see if it's in
Taylor Brandstetter
2016/04/01 23:23:41
It always *should* be. So if you do that, put it i
mikescarlett
2016/04/05 19:58:49
I put an RTC_DCHECK. OnDataReceived is only called
|
+ |
+ if (finished) { |
+ LOG(LS_INFO) << "Stream " << stream_id |
+ << " has finished receiving data for QUIC data channel " |
+ << id_; |
+ received_data.AppendData(data, len); |
+ DataBuffer message(received_data, false); |
pthatcher1
2016/03/30 20:34:48
We should probably use a map of rtc::Buffer* so th
mikescarlett
2016/04/05 19:58:49
What about using std::move to avoid copying for Da
|
+ signaling_thread_->Invoke<void>( |
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); |
+ OnMessage_s(message); |
Taylor Brandstetter
2016/04/01 23:23:41
Again, OnMessage_s shouldn't be called twice.
mikescarlett
2016/04/05 19:58:49
Removed.
|
+ queued_data_.erase(stream_id); |
+ stream->Close(); |
+ } else { |
+ received_data.AppendData(data, len); |
+ } |
pthatcher1
2016/03/30 20:34:48
Same here with early returns. Something like this
mikescarlett
2016/04/05 19:58:49
Done.
|
+} |
pthatcher1
2016/03/30 20:34:48
Now that I see this more completely, I think that
Taylor Brandstetter
2016/04/01 23:23:41
+1
mikescarlett
2016/04/05 19:58:49
I revised it. QuicDataChannelMessage is a struct a
|
+ |
+void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ RTC_DCHECK(channel == quic_transport_channel_); |
+ state_ = kOpen; |
+ if (send_open_message_) { |
+ SendOpenMessage_w(); |
pthatcher1
2016/03/30 20:34:48
I though we weren't doing the open message in this
mikescarlett
2016/04/05 19:58:49
Right I'm not implementing it. Fixed
|
+ } |
+} |
+ |
+void QuicDataChannel::OnConnectionClosed() { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ SetState(kClosed); |
+} |
+ |
+void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ if (observer_ != nullptr) { |
Taylor Brandstetter
2016/04/01 23:23:41
If it's OnMessage_s that checks for an observer, y
mikescarlett
2016/04/05 19:58:49
Replaced with RTC_DCHECK.
|
+ observer_->OnMessage(received_data); |
+ } else { |
+ LOG(LS_WARNING) << "QUIC data channel " << id_ |
+ << " received a message but has no observer."; |
+ } |
pthatcher1
2016/03/30 20:34:48
Early return here please.
mikescarlett
2016/04/05 19:58:49
Done.
|
+} |
+ |
+void QuicDataChannel::SetState(DataState state) { |
+ if (state_ == state) { |
+ return; |
+ } |
+ state_ = state; |
+ if (observer_) { |
+ observer_->OnStateChange(); |
+ } |
+ if (state_ == kClosed) { |
+ SignalClosed(this); |
+ } |
+} |
+ |
+} // namespace webrtc |