Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(335)

Side by Side Diff: webrtc/api/quicdatachannel.cc

Issue 1844803002: Modify PeerConnection for end-to-end QuicDataChannel usage (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/api/quicdatachannel.h"
12
13 #include "webrtc/base/bind.h"
14 #include "webrtc/base/bytebuffer.h"
15 #include "webrtc/base/logging.h"
16 #include "webrtc/p2p/quic/quictransportchannel.h"
17 #include "webrtc/p2p/quic/reliablequicstream.h"
18
19 namespace webrtc {
20
21 QuicDataChannel::QuicDataChannel(
22 cricket::QuicTransportChannel* quic_transport_channel,
23 rtc::Thread* signaling_thread,
24 rtc::Thread* worker_thread,
25 const std::string& label,
26 const DataChannelInit* config)
Taylor Brandstetter 2016/04/01 23:23:41 What should happen if the data channel init is som
mikescarlett 2016/04/05 19:58:50 By error do you mean something like "RTC_DCHECK(co
Taylor Brandstetter 2016/04/05 22:31:17 It definitely shouldn't crash, but the API should
27 : quic_transport_channel_(quic_transport_channel),
28 signaling_thread_(signaling_thread),
29 worker_thread_(worker_thread),
30 observer_(nullptr),
31 id_(config->id),
32 state_(quic_transport_channel_->quic_state() ==
33 cricket::QUIC_TRANSPORT_CONNECTED
34 ? kOpen
35 : kConnecting),
36 buffered_amount_(0),
37 num_sent_messages_(0),
38 label_(label),
39 protocol_(config->protocol),
40 send_open_message_(false) {
41 quic_transport_channel_->SignalWritableState.connect(
42 this, &QuicDataChannel::OnWritableState);
43 quic_transport_channel_->SignalClosed.connect(
44 this, &QuicDataChannel::OnConnectionClosed);
45 if (state_ == kOpen && send_open_message_) {
Taylor Brandstetter 2016/04/01 23:23:41 If send_open_message_ is always false I'd just rem
mikescarlett 2016/04/05 19:58:49 Removed.
46 worker_thread_->Invoke<void>(
47 rtc::Bind(&QuicDataChannel::SendOpenMessage_w, this));
48 }
49 }
50
51 QuicDataChannel::~QuicDataChannel() {}
52
53 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
54 RTC_DCHECK(signaling_thread_->IsCurrent());
55 observer_ = observer;
56 }
57 void QuicDataChannel::UnregisterObserver() {
58 RTC_DCHECK(signaling_thread_->IsCurrent());
59 observer_ = nullptr;
60 }
61
62 bool QuicDataChannel::Send(const DataBuffer& buffer) {
63 RTC_DCHECK(signaling_thread_->IsCurrent());
64 return worker_thread_->Invoke<bool>(
65 rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
66 }
67
68 bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
69 RTC_DCHECK(worker_thread_->IsCurrent());
70 if (state_ != kOpen) {
71 LOG(LS_ERROR) << "QUIC data channel " << id_
72 << " is not open so cannot send.";
73 return false;
74 }
75 if (buffer.size() == 0) {
76 LOG(LS_WARNING) << "QUIC data channel " << id_
77 << " refuses to send an empty message.";
78 return true;
79 }
80 size_t max_length = buffer.size() + 15;
81 rtc::ByteBuffer byte_buffer(nullptr, max_length,
82 rtc::ByteBuffer::ByteOrder::ORDER_HOST);
83 byte_buffer.WriteVarint(id_);
84 byte_buffer.WriteVarint(++num_sent_messages_);
85 byte_buffer.WriteBytes(buffer.data.data<char>(), buffer.size());
86 return WriteData_w(byte_buffer.Data(), byte_buffer.Length());
87 }
88
89 bool QuicDataChannel::WriteData_w(const char* data, size_t len) {
90 RTC_DCHECK(worker_thread_->IsCurrent());
91 cricket::ReliableQuicStream* stream =
92 quic_transport_channel_->CreateQuicStream();
93 // The stream should not be NULL. Otherwise an internal error is preventing
94 // QUIC streams from being created or the data channel state is invalid.
Taylor Brandstetter 2016/04/01 23:23:41 If this can only occur due to an internal error, w
mikescarlett 2016/04/05 19:58:50 I agree now. A minor issue is this will crash if a
95 if (stream == nullptr) {
96 LOG(LS_ERROR) << "QUIC data channel " << id_
97 << " is open but failed to allocate a QUIC stream.";
98 return false;
99 }
100 // Send the message with FIN == true, which signals to the remote peer that
101 // there is no more data after this message.
102 rtc::StreamResult result = stream->Write(data, len, true);
103 if (result == rtc::SR_SUCCESS) {
104 // The message is sent and we don't need this QUIC stream.
105 LOG(INFO) << "Stream " << stream->id()
106 << " successfully wrote data for QUIC data channel " << id_;
107 stream->Close();
108 return true;
109 } else if (result == rtc::SR_BLOCK) {
110 // The QUIC stream is write blocked, so the message will be queued by the
111 // QUIC session. It might be due to the QUIC transport not being writable,
112 // or it may be due to exceeding the QUIC flow control limit.
113 LOG(LS_WARNING) << "Stream " << stream->id()
114 << " is write blocked for QUIC data channel " << id_;
115 if (observer_ != nullptr) {
116 observer_->OnBufferedAmountChange(buffered_amount_);
117 buffered_amount_ += len;
Taylor Brandstetter 2016/04/01 23:23:41 I don't see buffered_amount_ ever decrease. Also w
mikescarlett 2016/04/05 19:58:49 I implemented this by adding a new ReliableQuicStr
118 }
119 }
120 return false;
Taylor Brandstetter 2016/04/01 23:23:41 If the data is buffered successfully, this method
mikescarlett 2016/04/05 19:58:50 Okay buffering returns true now.
121 }
122
123 void QuicDataChannel::SendOpenMessage_w() {
124 RTC_DCHECK(worker_thread_->IsCurrent());
125 // Not implemented.
126 }
127
128 void QuicDataChannel::Close() {
129 RTC_DCHECK(signaling_thread_->IsCurrent());
130 if (state_ == kClosed) {
131 return;
132 }
133 LOG(LS_INFO) << "Closing QUIC data channel.";
134 SetState(kClosing);
135 for (auto& kv : incoming_quic_streams_) {
136 cricket::ReliableQuicStream* stream = kv.second;
137 stream->Close();
138 }
139 queued_data_.clear();
140 SetState(kClosed);
141 }
142
143 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) {
144 RTC_DCHECK(worker_thread_->IsCurrent());
145 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed.";
146 incoming_quic_streams_.erase(stream_id);
147 }
148
149 void QuicDataChannel::OnIncomingStream(cricket::ReliableQuicStream* stream,
150 rtc::ByteBuffer* remaining_bytes) {
pthatcher1 2016/03/30 20:34:48 The order should be remaining_bytes, stream, since
mikescarlett 2016/04/05 19:58:50 Done.
151 RTC_DCHECK(worker_thread_->IsCurrent());
152 // The data channel ID has already been read from the byte buffer, so we can
153 // skip to the message id.
154 uint64_t message_id;
155 remaining_bytes->ReadVarint(&message_id);
Taylor Brandstetter 2016/04/01 23:23:41 Should probably handle failures of ReadVarint.
mikescarlett 2016/04/05 19:58:49 Done. ReadVarint is in QuicDataTransport now.
156
157 if (stream->fin_received()) {
158 LOG(LS_INFO) << "Stream " << stream->id()
159 << " has finished receiving data for QUIC data channel "
160 << id_;
161 // A FIN is received if the message fits into a single QUIC stream frame and
162 // the remote peer is done sending. In this case, propagate the data to the
163 // observer and close the stream.
164 if (observer_ != nullptr) {
165 DataBuffer message(rtc::CopyOnWriteBuffer(remaining_bytes->Data(),
166 remaining_bytes->Length()),
167 false);
168 signaling_thread_->Invoke<void>(
pthatcher1 2016/03/30 20:34:48 These needs to be an AsyncInvoke
mikescarlett 2016/04/05 19:58:49 Done.
169 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message));
170 OnMessage_s(message);
pthatcher1 2016/03/30 20:34:48 Why do we hop to the signalling thread and then ca
mikescarlett 2016/04/05 19:58:50 That clearly shouldn't be there. Fixed.
171 } else {
172 LOG(LS_WARNING) << "QUIC data channel " << id_
173 << " received a message but has no observer.";
174 }
175 stream->Close();
176 } else {
177 // Otherwise the message is divided across multiple QUIC stream frames, so
178 // queue the data. OnDataReceived() will be called each time the remaining
179 // QUIC stream frames arrive.
180 LOG(LS_INFO) << "QUIC data channel " << id_
181 << " is queuing incoming data for stream " << stream->id();
182 incoming_quic_streams_[stream->id()] = stream;
183 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream->id()];
pthatcher1 2016/03/30 20:34:48 When does the buffer get added to the queud_data_?
pthatcher1 2016/03/30 20:34:48 I think we just want a normal rtc::Buffer, not an
Taylor Brandstetter 2016/04/01 23:23:41 Do you mean, "when does the object get constructed
mikescarlett 2016/04/05 19:58:49 I changed it to not use operator[] to create the b
mikescarlett 2016/04/05 19:58:49 It needs to be rtc::CopyOnWriteBuffer due to this
184 received_data.AppendData(remaining_bytes->Data(),
185 remaining_bytes->Length());
186 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived);
187 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed);
188 }
pthatcher1 2016/03/30 20:34:48 Can you use an early return in this code? Perhaps
mikescarlett 2016/04/05 19:58:49 Done.
189 }
190
191 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
192 const char* data,
193 size_t len) {
194 RTC_DCHECK(worker_thread_->IsCurrent());
195 cricket::ReliableQuicStream* stream = incoming_quic_streams_[stream_id];
196 // True if the QUIC stream has reeived a FIN, which indicates the remote
197 // peer is finished receiving data.
198 bool finished = stream->fin_received();
199 // Lookup or create an rtc::CopyOnWriteBuffer.
200 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream_id];
pthatcher1 2016/03/30 20:34:48 We should use .find() and check to see if it's in
Taylor Brandstetter 2016/04/01 23:23:41 It always *should* be. So if you do that, put it i
mikescarlett 2016/04/05 19:58:49 I put an RTC_DCHECK. OnDataReceived is only called
201
202 if (finished) {
203 LOG(LS_INFO) << "Stream " << stream_id
204 << " has finished receiving data for QUIC data channel "
205 << id_;
206 received_data.AppendData(data, len);
207 DataBuffer message(received_data, false);
pthatcher1 2016/03/30 20:34:48 We should probably use a map of rtc::Buffer* so th
mikescarlett 2016/04/05 19:58:49 What about using std::move to avoid copying for Da
208 signaling_thread_->Invoke<void>(
209 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message));
210 OnMessage_s(message);
Taylor Brandstetter 2016/04/01 23:23:41 Again, OnMessage_s shouldn't be called twice.
mikescarlett 2016/04/05 19:58:49 Removed.
211 queued_data_.erase(stream_id);
212 stream->Close();
213 } else {
214 received_data.AppendData(data, len);
215 }
pthatcher1 2016/03/30 20:34:48 Same here with early returns. Something like this
mikescarlett 2016/04/05 19:58:49 Done.
216 }
pthatcher1 2016/03/30 20:34:48 Now that I see this more completely, I think that
Taylor Brandstetter 2016/04/01 23:23:41 +1
mikescarlett 2016/04/05 19:58:49 I revised it. QuicDataChannelMessage is a struct a
217
218 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) {
219 RTC_DCHECK(worker_thread_->IsCurrent());
220 RTC_DCHECK(channel == quic_transport_channel_);
221 state_ = kOpen;
222 if (send_open_message_) {
223 SendOpenMessage_w();
pthatcher1 2016/03/30 20:34:48 I though we weren't doing the open message in this
mikescarlett 2016/04/05 19:58:49 Right I'm not implementing it. Fixed
224 }
225 }
226
227 void QuicDataChannel::OnConnectionClosed() {
228 RTC_DCHECK(worker_thread_->IsCurrent());
229 SetState(kClosed);
230 }
231
232 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
233 RTC_DCHECK(signaling_thread_->IsCurrent());
234 if (observer_ != nullptr) {
Taylor Brandstetter 2016/04/01 23:23:41 If it's OnMessage_s that checks for an observer, y
mikescarlett 2016/04/05 19:58:49 Replaced with RTC_DCHECK.
235 observer_->OnMessage(received_data);
236 } else {
237 LOG(LS_WARNING) << "QUIC data channel " << id_
238 << " received a message but has no observer.";
239 }
pthatcher1 2016/03/30 20:34:48 Early return here please.
mikescarlett 2016/04/05 19:58:49 Done.
240 }
241
242 void QuicDataChannel::SetState(DataState state) {
243 if (state_ == state) {
244 return;
245 }
246 state_ = state;
247 if (observer_) {
248 observer_->OnStateChange();
249 }
250 if (state_ == kClosed) {
251 SignalClosed(this);
252 }
253 }
254
255 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698