Chromium Code Reviews| Index: webrtc/api/quicdatachannel.cc |
| diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..5dce606460c48c32132092c8ac4861cd2065f2de |
| --- /dev/null |
| +++ b/webrtc/api/quicdatachannel.cc |
| @@ -0,0 +1,269 @@ |
| +/* |
| + * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
| + * |
| + * Use of this source code is governed by a BSD-style license |
| + * that can be found in the LICENSE file in the root of the source |
| + * tree. An additional intellectual property rights grant can be found |
| + * in the file PATENTS. All contributing project authors may |
| + * be found in the AUTHORS file in the root of the source tree. |
| + */ |
| + |
| +#include "webrtc/api/quicdatachannel.h" |
| + |
| +#include "webrtc/base/bind.h" |
| +#include "webrtc/base/bytebuffer.h" |
| +#include "webrtc/base/copyonwritebuffer.h" |
| +#include "webrtc/base/logging.h" |
| +#include "webrtc/p2p/quic/quictransportchannel.h" |
| +#include "webrtc/p2p/quic/reliablequicstream.h" |
| + |
| +namespace webrtc { |
| + |
| +QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, |
| + rtc::Thread* worker_thread, |
| + const std::string& label, |
| + const DataChannelInit* config, |
| + const MessageHelper& message_helper) |
| + : signaling_thread_(signaling_thread), |
| + worker_thread_(worker_thread), |
| + message_helper_(message_helper), |
| + observer_(nullptr), |
| + id_(config->id), |
| + state_(kConnecting), |
| + buffered_amount_(0), |
| + num_sent_messages_(0), |
| + label_(label), |
| + protocol_(config->protocol) {} |
| + |
| +QuicDataChannel::~QuicDataChannel() {} |
| + |
| +void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { |
| + RTC_DCHECK(signaling_thread_->IsCurrent()); |
| + observer_ = observer; |
| +} |
| +void QuicDataChannel::UnregisterObserver() { |
| + RTC_DCHECK(signaling_thread_->IsCurrent()); |
| + observer_ = nullptr; |
| +} |
| + |
| +bool QuicDataChannel::Send(const DataBuffer& buffer) { |
| + RTC_DCHECK(signaling_thread_->IsCurrent()); |
| + return worker_thread_->Invoke<bool>( |
| + rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
| +} |
| + |
| +bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + if (state_ != kOpen) { |
|
pthatcher1
2016/04/12 00:58:38
Since state_ is set on the signaling thread, I thi
pthatcher1
2016/04/12 00:58:38
Since the state is set on the signaling, I think w
mikescarlett
2016/04/13 16:53:37
Ok done.
|
| + LOG(LS_ERROR) << "QUIC data channel " << id_ |
| + << " is not open so cannot send."; |
| + return false; |
| + } |
| + if (buffer.size() == 0) { |
| + LOG(LS_WARNING) << "QUIC data channel " << id_ |
| + << " refuses to send an empty message."; |
| + return true; |
| + } |
|
pthatcher1
2016/04/12 00:58:38
Why? If we prepend the message with the data chan
mikescarlett
2016/04/13 16:53:37
I wanted to be consistent with the SCTP data chann
|
| + RTC_DCHECK(quic_transport_channel_ != nullptr); |
| + rtc::CopyOnWriteBuffer payload; |
| + message_helper_.Encode(buffer, id_, ++num_sent_messages_, &payload); |
|
pthatcher1
2016/04/12 00:58:38
Instead of copying the whole message (which may be
mikescarlett
2016/04/13 16:53:37
Yes that's fine.
|
| + return WriteData_w(payload.data<char>(), payload.size()); |
|
pthatcher1
2016/04/12 00:58:38
Instead of making WriteData_w its own method, can
pthatcher1
2016/04/12 00:58:39
Rather than having WriteData_w be a separate metho
mikescarlett
2016/04/13 16:53:37
I'll combine them.
|
| +} |
| + |
| +// TODO(mikescarlett): Allow peers to negotiate QUIC flow control window size at |
| +// the stream and connection level, so that peers can set the maximum size of a |
| +// message. Currently QUIC defaults to 16KB per stream and session and will |
| +// block bytes exceeding this limit. |
|
pthatcher1
2016/04/12 00:58:38
So we can't send data messages larger than 16KB?
mikescarlett
2016/04/13 16:53:37
I meant to say that the session buffers messages l
|
| +bool QuicDataChannel::WriteData_w(const char* data, size_t len) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + cricket::ReliableQuicStream* stream = |
| + quic_transport_channel_->CreateQuicStream(); |
| + RTC_DCHECK(stream != nullptr); |
| + // Send the message with FIN == true, which signals to the remote peer that |
| + // there is no more data after this message. |
| + uint64_t old_queued_bytes = stream->queued_data_bytes(); |
| + rtc::StreamResult result = stream->Write(data, len, true); |
| + if (result == rtc::SR_SUCCESS) { |
| + // The message is sent and we don't need this QUIC stream. |
| + LOG(LS_INFO) << "Stream " << stream->id() |
| + << " successfully wrote data for QUIC data channel " << id_; |
| + stream->Close(); |
| + return true; |
| + } |
| + if (result == rtc::SR_BLOCK) { |
| + // The QUIC stream is write blocked, so the message will be queued by the |
| + // QUIC session. It might be due to the QUIC transport not being writable, |
| + // or it may be due to exceeding the QUIC flow control limit. |
| + LOG(LS_WARNING) << "Stream " << stream->id() |
| + << " is write blocked for QUIC data channel " << id_; |
| + if (observer_ != nullptr) { |
| + observer_->OnBufferedAmountChange(buffered_amount_); |
| + } |
| + buffered_amount_ += stream->queued_data_bytes() - old_queued_bytes; |
| + stream->SignalQueuedBytesWritten.connect( |
| + this, &QuicDataChannel::OnQueuedBytesWritten); |
| + // Once the stream has stopped writing, it will be closed. |
| + stream->StopReading(); |
| + write_blocked_quic_streams_[stream->id()] = stream; |
| + return true; |
| + } |
| + return false; |
|
pthatcher1
2016/04/12 00:58:39
We should probably log what the result was, since
mikescarlett
2016/04/13 16:53:37
Done.
|
| +} |
| + |
| +void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, |
| + uint64_t queued_bytes_written) { |
| + if (observer_ != nullptr) { |
| + observer_->OnBufferedAmountChange(buffered_amount_); |
|
pthatcher1
2016/04/12 00:58:39
What thread is this called on? Because I'm pretty
mikescarlett
2016/04/13 16:53:37
OnQueuedBytesWritten is on the worker thread. I fi
|
| + } |
| + buffered_amount_ -= queued_bytes_written; |
| +} |
| + |
| +void QuicDataChannel::SetTransportChannel( |
| + cricket::QuicTransportChannel* channel) { |
| + RTC_DCHECK(channel != nullptr); |
|
pthatcher1
2016/04/12 00:58:38
RTC_DCHECK(channel) is sufficient.
mikescarlett
2016/04/13 16:53:37
Done.
|
| + LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; |
| + quic_transport_channel_ = channel; |
| + quic_transport_channel_->SignalWritableState.connect( |
| + this, &QuicDataChannel::OnWritableState); |
| + quic_transport_channel_->SignalClosed.connect( |
| + this, &QuicDataChannel::OnConnectionClosed); |
|
pthatcher1
2016/04/12 00:58:39
We also need to disconnect from the previous quic_
mikescarlett
2016/04/13 16:53:37
This function should only be called once. I added
|
| + if (quic_transport_channel_->writable()) { |
| + SetState(kOpen); |
| + } |
| +} |
| + |
| +void QuicDataChannel::OnIncomingStream(uint64_t message_id, |
| + const char* first_bytes, |
| + size_t len, |
| + cricket::ReliableQuicStream* stream) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + RTC_DCHECK(first_bytes != nullptr); |
| + RTC_DCHECK(stream != nullptr); |
| + if (!observer_) { |
| + LOG(LS_WARNING) << "QUIC data channel " << id_ |
| + << " received a message but has no observer."; |
| + stream->Close(); |
| + return; |
| + } |
| + // A FIN is received if the message fits into a single QUIC stream frame and |
| + // the remote peer is done sending. |
| + if (!stream->fin_received()) { |
| + // If FIN is not received, the message is divided across multiple QUIC |
| + // stream frames, so queue the data. OnDataReceived() will be called each |
| + // time the remaining QUIC stream frames arrive. |
| + LOG(LS_INFO) << "QUIC data channel " << id_ |
| + << " is queuing incoming data for stream " << stream->id(); |
| + rtc::CopyOnWriteBuffer received_data; |
| + received_data.AppendData(first_bytes, len); |
| + Message message; |
| + message.stream = stream; |
| + message.buffer = std::move(received_data); |
| + incoming_quic_streams_[stream->id()] = std::move(message); |
| + stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); |
| + stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); |
| + return; |
| + } |
| + // Otherwise propagate the message to the observer. |
| + LOG(LS_INFO) << "Stream " << stream->id() |
| + << " has finished receiving data for QUIC data channel " << id_; |
| + DataBuffer final_message(rtc::CopyOnWriteBuffer(first_bytes, len), false); |
| + invoker_.AsyncInvoke<void>( |
| + signaling_thread_, |
| + rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
| + stream->Close(); |
|
pthatcher1
2016/04/12 00:58:39
I think flipping this around to
if (stream->fin_r
mikescarlett
2016/04/13 16:53:37
Done.
|
| +} |
| + |
| +void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, |
| + const char* data, |
| + size_t len) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + RTC_DCHECK(data != nullptr); |
| + const auto& kv = incoming_quic_streams_.find(stream_id); |
| + RTC_DCHECK(kv != incoming_quic_streams_.end()); |
| + Message& message = kv->second; |
| + cricket::ReliableQuicStream* stream = message.stream; |
| + rtc::CopyOnWriteBuffer& received_data = message.buffer; |
| + // If the QUIC stream has not received a FIN, then the remote peer is not |
| + // finished sending data. |
| + if (!stream->fin_received()) { |
| + received_data.AppendData(data, len); |
| + return; |
| + } |
| + // Otherwise we are done receiving and can provide the data channel observer |
| + // with the message. |
| + LOG(LS_INFO) << "Stream " << stream_id |
| + << " has finished receiving data for QUIC data channel " << id_; |
| + received_data.AppendData(data, len); |
| + DataBuffer final_message(std::move(received_data), false); |
| + invoker_.AsyncInvoke<void>( |
| + signaling_thread_, |
| + rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
| + // Once the stream is closed, OnDataReceived should not be fired. |
| + stream->Close(); |
| + incoming_quic_streams_.erase(stream_id); |
| +} |
| + |
| +void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { |
| + RTC_DCHECK(signaling_thread_->IsCurrent()); |
| + RTC_DCHECK(observer_ != nullptr); |
| + observer_->OnMessage(received_data); |
| +} |
| + |
| +void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + RTC_DCHECK(channel == quic_transport_channel_); |
| + LOG(LS_INFO) << "QuicTransportChannel is writable"; |
| + SetState(kOpen); |
|
pthatcher1
2016/04/12 00:58:38
Actually, you need to check channel->writable().
mikescarlett
2016/04/13 16:53:37
Done.
|
| +} |
| + |
| +void QuicDataChannel::Close() { |
| + RTC_DCHECK(signaling_thread_->IsCurrent()); |
| + if (state_ == kClosed) { |
| + return; |
| + } |
| + LOG(LS_INFO) << "Closing QUIC data channel."; |
| + SetState(kClosing); |
| + |
| + for (auto& kv : incoming_quic_streams_) { |
| + Message& message = kv.second; |
| + cricket::ReliableQuicStream* stream = message.stream; |
| + stream->Close(); |
| + } |
| + incoming_quic_streams_.clear(); |
| + |
| + for (auto& kv : write_blocked_quic_streams_) { |
| + cricket::ReliableQuicStream* stream = kv.second; |
| + stream->Close(); |
| + } |
| + write_blocked_quic_streams_.clear(); |
| + |
| + SetState(kClosed); |
| +} |
| + |
| +void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; |
| + incoming_quic_streams_.erase(stream_id); |
| +} |
| + |
| +void QuicDataChannel::OnConnectionClosed() { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + SetState(kClosed); |
| +} |
| + |
| +void QuicDataChannel::SetState(DataState state) { |
| + if (state_ == state) { |
| + return; |
| + } |
| + LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " |
| + << id_; |
| + state_ = state; |
| + if (observer_) { |
| + observer_->OnStateChange(); |
| + } |
| + if (state_ == kClosed) { |
| + SignalClosed(this); |
| + } |
| +} |
| + |
| +} // namespace webrtc |