Index: webrtc/api/quicdatachannel.cc |
diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5dce606460c48c32132092c8ac4861cd2065f2de |
--- /dev/null |
+++ b/webrtc/api/quicdatachannel.cc |
@@ -0,0 +1,269 @@ |
+/* |
+ * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
+ * |
+ * Use of this source code is governed by a BSD-style license |
+ * that can be found in the LICENSE file in the root of the source |
+ * tree. An additional intellectual property rights grant can be found |
+ * in the file PATENTS. All contributing project authors may |
+ * be found in the AUTHORS file in the root of the source tree. |
+ */ |
+ |
+#include "webrtc/api/quicdatachannel.h" |
+ |
+#include "webrtc/base/bind.h" |
+#include "webrtc/base/bytebuffer.h" |
+#include "webrtc/base/copyonwritebuffer.h" |
+#include "webrtc/base/logging.h" |
+#include "webrtc/p2p/quic/quictransportchannel.h" |
+#include "webrtc/p2p/quic/reliablequicstream.h" |
+ |
+namespace webrtc { |
+ |
+QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, |
+ rtc::Thread* worker_thread, |
+ const std::string& label, |
+ const DataChannelInit* config, |
+ const MessageHelper& message_helper) |
+ : signaling_thread_(signaling_thread), |
+ worker_thread_(worker_thread), |
+ message_helper_(message_helper), |
+ observer_(nullptr), |
+ id_(config->id), |
+ state_(kConnecting), |
+ buffered_amount_(0), |
+ num_sent_messages_(0), |
+ label_(label), |
+ protocol_(config->protocol) {} |
+ |
+QuicDataChannel::~QuicDataChannel() {} |
+ |
+void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ observer_ = observer; |
+} |
+void QuicDataChannel::UnregisterObserver() { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ observer_ = nullptr; |
+} |
+ |
+bool QuicDataChannel::Send(const DataBuffer& buffer) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
+} |
+ |
+bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ if (state_ != kOpen) { |
pthatcher1
2016/04/12 00:58:38
Since state_ is set on the signaling thread, I thi
pthatcher1
2016/04/12 00:58:38
Since the state is set on the signaling, I think w
mikescarlett
2016/04/13 16:53:37
Ok done.
|
+ LOG(LS_ERROR) << "QUIC data channel " << id_ |
+ << " is not open so cannot send."; |
+ return false; |
+ } |
+ if (buffer.size() == 0) { |
+ LOG(LS_WARNING) << "QUIC data channel " << id_ |
+ << " refuses to send an empty message."; |
+ return true; |
+ } |
pthatcher1
2016/04/12 00:58:38
Why? If we prepend the message with the data chan
mikescarlett
2016/04/13 16:53:37
I wanted to be consistent with the SCTP data chann
|
+ RTC_DCHECK(quic_transport_channel_ != nullptr); |
+ rtc::CopyOnWriteBuffer payload; |
+ message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload); |
pthatcher1
2016/04/12 00:58:38
Instead of copying the whole message (which may be
mikescarlett
2016/04/13 16:53:37
Yes that's fine.
|
+ return WriteData_w(payload.data<char>(), payload.size()); |
pthatcher1
2016/04/12 00:58:38
Instead of making WriteData_w its own method, can
pthatcher1
2016/04/12 00:58:39
Rather than having WriteData_w be a separate metho
mikescarlett
2016/04/13 16:53:37
I'll combine them.
|
+} |
+ |
+// TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at |
+// the stream and connection level, so that peers can set the maximum size of a |
+// message. Currently QUIC defaults to 16KB per stream and session and will |
+// block bytes exceeding this limit. |
pthatcher1
2016/04/12 00:58:38
So we can't send data messages larger than 16KB?
mikescarlett
2016/04/13 16:53:37
I meant to say that the session buffers messages l
|
+bool QuicDataChannel::WriteData_w(const char* data, size_t len) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ cricket::ReliableQuicStream* stream = |
+ quic_transport_channel_->CreateQuicStream(); |
+ RTC_DCHECK(stream != nullptr); |
+ // Send the message with FIN == true, which signals to the remote peer that |
+ // there is no more data after this message. |
+ uint64_t old_queued_bytes = stream->queued_data_bytes(); |
+ rtc::StreamResult result = stream->Write(data, len, true); |
+ if (result == rtc::SR_SUCCESS) { |
+ // The message is sent and we don't need this QUIC stream. |
+ LOG(LS_INFO) << "Stream " << stream->id() |
+ << " successfully wrote data for QUIC data channel " << id_; |
+ stream->Close(); |
+ return true; |
+ } |
+ if (result == rtc::SR_BLOCK) { |
+ // The QUIC stream is write blocked, so the message will be queued by the |
+ // QUIC session. It might be due to the QUIC transport not being writable, |
+ // or it may be due to exceeding the QUIC flow control limit. |
+ LOG(LS_WARNING) << "Stream " << stream->id() |
+ << " is write blocked for QUIC data channel " << id_; |
+ if (observer_ != nullptr) { |
+ observer_->OnBufferedAmountChange(buffered_amount_); |
+ } |
+ buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes; |
+ stream->SignalQueuedBytesWritten.connect( |
+ this, &QuicDataChannel::OnQueuedBytesWritten); |
+ // Once the stream has stopped writing, it will be closed. |
+ stream->StopReading(); |
+ write_blocked_quic_streams_[stream->id()] = stream; |
+ return true; |
+ } |
+ return false; |
pthatcher1
2016/04/12 00:58:39
We should probably log what the result was, since
mikescarlett
2016/04/13 16:53:37
Done.
|
+} |
+ |
+void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, |
+ uint64_t queued_bytes_written) { |
+ if (observer_ != nullptr) { |
+ observer_->OnBufferedAmountChange(buffered_amount_); |
pthatcher1
2016/04/12 00:58:39
What thread is this called on? Because I'm pretty
mikescarlett
2016/04/13 16:53:37
OnQueuedBytesWritten is on the worker thread. I fi
|
+ } |
+ buffered_amount_ -= queued_bytes_written; |
+} |
+ |
+void QuicDataChannel::SetTransportChannel( |
+ cricket::QuicTransportChannel* channel) { |
+ RTC_DCHECK(channel != nullptr); |
pthatcher1
2016/04/12 00:58:38
RTC_DCHECK(channel) is sufficient.
mikescarlett
2016/04/13 16:53:37
Done.
|
+ LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; |
+ quic_transport_channel_ = channel; |
+ quic_transport_channel_->SignalWritableState.connect( |
+ this, &QuicDataChannel::OnWritableState); |
+ quic_transport_channel_->SignalClosed.connect( |
+ this, &QuicDataChannel::OnConnectionClosed); |
pthatcher1
2016/04/12 00:58:39
We also need to disconnect from the previous quic_
mikescarlett
2016/04/13 16:53:37
This function should only be called once. I added
|
+ if (quic_transport_channel_->writable()) { |
+ SetState(kOpen); |
+ } |
+} |
+ |
+void QuicDataChannel::OnIncomingStream(uint64_t message_id, |
+ const char* first_bytes, |
+ size_t len, |
+ cricket::ReliableQuicStream* stream) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ RTC_DCHECK(first_bytes != nullptr); |
+ RTC_DCHECK(stream != nullptr); |
+ if (!observer_) { |
+ LOG(LS_WARNING) << "QUIC data channel " << id_ |
+ << " received a message but has no observer."; |
+ stream->Close(); |
+ return; |
+ } |
+ // A FIN is received if the message fits into a single QUIC stream frame and |
+ // the remote peer is done sending. |
+ if (!stream->fin_received()) { |
+ // If FIN is not received, the message is divided across multiple QUIC |
+ // stream frames, so queue the data. OnDataReceived() will be called each |
+ // time the remaining QUIC stream frames arrive. |
+ LOG(LS_INFO) << "QUIC data channel " << id_ |
+ << " is queuing incoming data for stream " << stream->id(); |
+ rtc::CopyOnWriteBuffer received_data; |
+ received_data.AppendData(first_bytes, len); |
+ Message message; |
+ message.stream = stream; |
+ message.buffer = std::move(received_data); |
+ incoming_quic_streams_[stream->id()] = std::move(message); |
+ stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); |
+ stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); |
+ return; |
+ } |
+ // Otherwise propagate the message to the observer. |
+ LOG(LS_INFO) << "Stream " << stream->id() |
+ << " has finished receiving data for QUIC data channel " << id_; |
+ DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false); |
+ invoker_.AsyncInvoke<void>( |
+ signaling_thread_, |
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
+ stream->Close(); |
pthatcher1
2016/04/12 00:58:39
I think flipping this around to
if (stream->fin_r
mikescarlett
2016/04/13 16:53:37
Done.
|
+} |
+ |
+void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, |
+ const char* data, |
+ size_t len) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ RTC_DCHECK(data != nullptr); |
+ const auto& kv = incoming_quic_streams_.find(stream_id); |
+ RTC_DCHECK(kv != incoming_quic_streams_.end()); |
+ Message& message = kv->second; |
+ cricket::ReliableQuicStream* stream = message.stream; |
+ rtc::CopyOnWriteBuffer& received_data = message.buffer; |
+ // If the QUIC stream has not received a FIN, then the remote peer is not |
+ // finished sending data. |
+ if (!stream->fin_received()) { |
+ received_data.AppendData(data, len); |
+ return; |
+ } |
+ // Otherwise we are done receiving and can provide the data channel observer |
+ // with the message. |
+ LOG(LS_INFO) << "Stream " << stream_id |
+ << " has finished receiving data for QUIC data channel " << id_; |
+ received_data.AppendData(data, len); |
+ DataBuffer final_message(std::move(received_data), false); |
+ invoker_.AsyncInvoke<void>( |
+ signaling_thread_, |
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
+ // Once the stream is closed, OnDataReceived should not be fired. |
+ stream->Close(); |
+ incoming_quic_streams_.erase(stream_id); |
+} |
+ |
+void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ RTC_DCHECK(observer_ != nullptr); |
+ observer_->OnMessage(received_data); |
+} |
+ |
+void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ RTC_DCHECK(channel == quic_transport_channel_); |
+ LOG(LS_INFO) << "QuicTransportChannel is writable"; |
+ SetState(kOpen); |
pthatcher1
2016/04/12 00:58:38
Actually, you need to check channel->writable().
mikescarlett
2016/04/13 16:53:37
Done.
|
+} |
+ |
+void QuicDataChannel::Close() { |
+ RTC_DCHECK(signaling_thread_->IsCurrent()); |
+ if (state_ == kClosed) { |
+ return; |
+ } |
+ LOG(LS_INFO) << "Closing QUIC data channel."; |
+ SetState(kClosing); |
+ |
+ for (auto& kv : incoming_quic_streams_) { |
+ Message& message = kv.second; |
+ cricket::ReliableQuicStream* stream = message.stream; |
+ stream->Close(); |
+ } |
+ incoming_quic_streams_.clear(); |
+ |
+ for (auto& kv : write_blocked_quic_streams_) { |
+ cricket::ReliableQuicStream* stream = kv.second; |
+ stream->Close(); |
+ } |
+ write_blocked_quic_streams_.clear(); |
+ |
+ SetState(kClosed); |
+} |
+ |
+void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; |
+ incoming_quic_streams_.erase(stream_id); |
+} |
+ |
+void QuicDataChannel::OnConnectionClosed() { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ SetState(kClosed); |
+} |
+ |
+void QuicDataChannel::SetState(DataState state) { |
+ if (state_ == state) { |
+ return; |
+ } |
+ LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " |
+ << id_; |
+ state_ = state; |
+ if (observer_) { |
+ observer_->OnStateChange(); |
+ } |
+ if (state_ == kClosed) { |
+ SignalClosed(this); |
+ } |
+} |
+ |
+} // namespace webrtc |