Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Unified Diff: webrtc/api/quicdatachannel.cc

Issue 1844803002: Modify PeerConnection for end-to-end QuicDataChannel usage (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Remove webrtcsdp.cc from this CL Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/api/quicdatachannel.cc
diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..5dce606460c48c32132092c8ac4861cd2065f2de
--- /dev/null
+++ b/webrtc/api/quicdatachannel.cc
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/api/quicdatachannel.h"
+
+#include "webrtc/base/bind.h"
+#include "webrtc/base/bytebuffer.h"
+#include "webrtc/base/copyonwritebuffer.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/p2p/quic/quictransportchannel.h"
+#include "webrtc/p2p/quic/reliablequicstream.h"
+
+namespace webrtc {
+
+QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread,
+ rtc::Thread* worker_thread,
+ const std::string& label,
+ const DataChannelInit* config,
+ const MessageHelper& message_helper)
+ : signaling_thread_(signaling_thread),
+ worker_thread_(worker_thread),
+ message_helper_(message_helper),
+ observer_(nullptr),
+ id_(config->id),
+ state_(kConnecting),
+ buffered_amount_(0),
+ num_sent_messages_(0),
+ label_(label),
+ protocol_(config->protocol) {}
+
+QuicDataChannel::~QuicDataChannel() {}
+
+void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ observer_ = observer;
+}
+void QuicDataChannel::UnregisterObserver() {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ observer_ = nullptr;
+}
+
+bool QuicDataChannel::Send(const DataBuffer& buffer) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ return worker_thread_->Invoke<bool>(
+ rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
+}
+
+bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ if (state_ != kOpen) {
pthatcher1 2016/04/12 00:58:38 Since state_ is set on the signaling thread, I thi
pthatcher1 2016/04/12 00:58:38 Since the state is set on the signaling, I think w
mikescarlett 2016/04/13 16:53:37 Ok done.
+ LOG(LS_ERROR) << "QUIC data channel " << id_
+ << " is not open so cannot send.";
+ return false;
+ }
+ if (buffer.size() == 0) {
+ LOG(LS_WARNING) << "QUIC data channel " << id_
+ << " refuses to send an empty message.";
+ return true;
+ }
pthatcher1 2016/04/12 00:58:38 Why? If we prepend the message with the data chan
mikescarlett 2016/04/13 16:53:37 I wanted to be consistent with the SCTP data chann
+ RTC_DCHECK(quic_transport_channel_ != nullptr);
+ rtc::CopyOnWriteBuffer payload;
+ message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload);
pthatcher1 2016/04/12 00:58:38 Instead of copying the whole message (which may be
mikescarlett 2016/04/13 16:53:37 Yes that's fine.
+ return WriteData_w(payload.data<char>(), payload.size());
pthatcher1 2016/04/12 00:58:38 Instead of making WriteData_w its own method, can
pthatcher1 2016/04/12 00:58:39 Rather than having WriteData_w be a separate metho
mikescarlett 2016/04/13 16:53:37 I'll combine them.
+}
+
+// TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at
+// the stream and connection level, so that peers can set the maximum size of a
+// message. Currently QUIC defaults to 16KB per stream and session and will
+// block bytes exceeding this limit.
pthatcher1 2016/04/12 00:58:38 So we can't send data messages larger than 16KB?
mikescarlett 2016/04/13 16:53:37 I meant to say that the session buffers messages l
+bool QuicDataChannel::WriteData_w(const char* data, size_t len) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ cricket::ReliableQuicStream* stream =
+ quic_transport_channel_->CreateQuicStream();
+ RTC_DCHECK(stream != nullptr);
+ // Send the message with FIN == true, which signals to the remote peer that
+ // there is no more data after this message.
+ uint64_t old_queued_bytes = stream->queued_data_bytes();
+ rtc::StreamResult result = stream->Write(data, len, true);
+ if (result == rtc::SR_SUCCESS) {
+ // The message is sent and we don't need this QUIC stream.
+ LOG(LS_INFO) << "Stream " << stream->id()
+ << " successfully wrote data for QUIC data channel " << id_;
+ stream->Close();
+ return true;
+ }
+ if (result == rtc::SR_BLOCK) {
+ // The QUIC stream is write blocked, so the message will be queued by the
+ // QUIC session. It might be due to the QUIC transport not being writable,
+ // or it may be due to exceeding the QUIC flow control limit.
+ LOG(LS_WARNING) << "Stream " << stream->id()
+ << " is write blocked for QUIC data channel " << id_;
+ if (observer_ != nullptr) {
+ observer_->OnBufferedAmountChange(buffered_amount_);
+ }
+ buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes;
+ stream->SignalQueuedBytesWritten.connect(
+ this, &QuicDataChannel::OnQueuedBytesWritten);
+ // Once the stream has stopped writing, it will be closed.
+ stream->StopReading();
+ write_blocked_quic_streams_[stream->id()] = stream;
+ return true;
+ }
+ return false;
pthatcher1 2016/04/12 00:58:39 We should probably log what the result was, since
mikescarlett 2016/04/13 16:53:37 Done.
+}
+
+void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id,
+ uint64_t queued_bytes_written) {
+ if (observer_ != nullptr) {
+ observer_->OnBufferedAmountChange(buffered_amount_);
pthatcher1 2016/04/12 00:58:39 What thread is this called on? Because I'm pretty
mikescarlett 2016/04/13 16:53:37 OnQueuedBytesWritten is on the worker thread. I fi
+ }
+ buffered_amount_ -= queued_bytes_written;
+}
+
+void QuicDataChannel::SetTransportChannel(
+ cricket::QuicTransportChannel* channel) {
+ RTC_DCHECK(channel != nullptr);
pthatcher1 2016/04/12 00:58:38 RTC_DCHECK(channel) is sufficient.
mikescarlett 2016/04/13 16:53:37 Done.
+ LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_;
+ quic_transport_channel_ = channel;
+ quic_transport_channel_->SignalWritableState.connect(
+ this, &QuicDataChannel::OnWritableState);
+ quic_transport_channel_->SignalClosed.connect(
+ this, &QuicDataChannel::OnConnectionClosed);
pthatcher1 2016/04/12 00:58:39 We also need to disconnect from the previous quic_
mikescarlett 2016/04/13 16:53:37 This function should only be called once. I added
+ if (quic_transport_channel_->writable()) {
+ SetState(kOpen);
+ }
+}
+
+void QuicDataChannel::OnIncomingStream(uint64_t message_id,
+ const char* first_bytes,
+ size_t len,
+ cricket::ReliableQuicStream* stream) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ RTC_DCHECK(first_bytes != nullptr);
+ RTC_DCHECK(stream != nullptr);
+ if (!observer_) {
+ LOG(LS_WARNING) << "QUIC data channel " << id_
+ << " received a message but has no observer.";
+ stream->Close();
+ return;
+ }
+ // A FIN is received if the message fits into a single QUIC stream frame and
+ // the remote peer is done sending.
+ if (!stream->fin_received()) {
+ // If FIN is not received, the message is divided across multiple QUIC
+ // stream frames, so queue the data. OnDataReceived() will be called each
+ // time the remaining QUIC stream frames arrive.
+ LOG(LS_INFO) << "QUIC data channel " << id_
+ << " is queuing incoming data for stream " << stream->id();
+ rtc::CopyOnWriteBuffer received_data;
+ received_data.AppendData(first_bytes, len);
+ Message message;
+ message.stream = stream;
+ message.buffer = std::move(received_data);
+ incoming_quic_streams_[stream->id()] = std::move(message);
+ stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived);
+ stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed);
+ return;
+ }
+ // Otherwise propagate the message to the observer.
+ LOG(LS_INFO) << "Stream " << stream->id()
+ << " has finished receiving data for QUIC data channel " << id_;
+ DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false);
+ invoker_.AsyncInvoke<void>(
+ signaling_thread_,
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
+ stream->Close();
pthatcher1 2016/04/12 00:58:39 I think flipping this around to if (stream->fin_r
mikescarlett 2016/04/13 16:53:37 Done.
+}
+
+void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
+ const char* data,
+ size_t len) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ RTC_DCHECK(data != nullptr);
+ const auto& kv = incoming_quic_streams_.find(stream_id);
+ RTC_DCHECK(kv != incoming_quic_streams_.end());
+ Message& message = kv->second;
+ cricket::ReliableQuicStream* stream = message.stream;
+ rtc::CopyOnWriteBuffer& received_data = message.buffer;
+ // If the QUIC stream has not received a FIN, then the remote peer is not
+ // finished sending data.
+ if (!stream->fin_received()) {
+ received_data.AppendData(data, len);
+ return;
+ }
+ // Otherwise we are done receiving and can provide the data channel observer
+ // with the message.
+ LOG(LS_INFO) << "Stream " << stream_id
+ << " has finished receiving data for QUIC data channel " << id_;
+ received_data.AppendData(data, len);
+ DataBuffer final_message(std::move(received_data), false);
+ invoker_.AsyncInvoke<void>(
+ signaling_thread_,
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
+ // Once the stream is closed, OnDataReceived should not be fired.
+ stream->Close();
+ incoming_quic_streams_.erase(stream_id);
+}
+
+void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK(observer_ != nullptr);
+ observer_->OnMessage(received_data);
+}
+
+void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ RTC_DCHECK(channel == quic_transport_channel_);
+ LOG(LS_INFO) << "QuicTransportChannel is writable";
+ SetState(kOpen);
pthatcher1 2016/04/12 00:58:38 Actually, you need to check channel->writable().
mikescarlett 2016/04/13 16:53:37 Done.
+}
+
+void QuicDataChannel::Close() {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ if (state_ == kClosed) {
+ return;
+ }
+ LOG(LS_INFO) << "Closing QUIC data channel.";
+ SetState(kClosing);
+
+ for (auto& kv : incoming_quic_streams_) {
+ Message& message = kv.second;
+ cricket::ReliableQuicStream* stream = message.stream;
+ stream->Close();
+ }
+ incoming_quic_streams_.clear();
+
+ for (auto& kv : write_blocked_quic_streams_) {
+ cricket::ReliableQuicStream* stream = kv.second;
+ stream->Close();
+ }
+ write_blocked_quic_streams_.clear();
+
+ SetState(kClosed);
+}
+
+void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed.";
+ incoming_quic_streams_.erase(stream_id);
+}
+
+void QuicDataChannel::OnConnectionClosed() {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ SetState(kClosed);
+}
+
+void QuicDataChannel::SetState(DataState state) {
+ if (state_ == state) {
+ return;
+ }
+ LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel "
+ << id_;
+ state_ = state;
+ if (observer_) {
+ observer_->OnStateChange();
+ }
+ if (state_ == kClosed) {
+ SignalClosed(this);
+ }
+}
+
+} // namespace webrtc

Powered by Google App Engine
This is Rietveld 408576698