OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/api/quicdatachannel.h" | |
12 | |
13 #include "webrtc/base/bind.h" | |
14 #include "webrtc/base/bytebuffer.h" | |
15 #include "webrtc/base/logging.h" | |
16 #include "webrtc/p2p/quic/quictransportchannel.h" | |
17 #include "webrtc/p2p/quic/reliablequicstream.h" | |
18 | |
19 namespace webrtc { | |
20 | |
21 QuicDataChannel::QuicDataChannel( | |
22 cricket::QuicTransportChannel* quic_transport_channel, | |
23 rtc::Thread* signaling_thread, | |
24 rtc::Thread* worker_thread, | |
25 const std::string& label, | |
26 const DataChannelInit* config) | |
Taylor Brandstetter
2016/04/01 23:23:41
What should happen if the data channel init is som
mikescarlett
2016/04/05 19:58:50
By error do you mean something like "RTC_DCHECK(co
Taylor Brandstetter
2016/04/05 22:31:17
It definitely shouldn't crash, but the API should
| |
27 : quic_transport_channel_(quic_transport_channel), | |
28 signaling_thread_(signaling_thread), | |
29 worker_thread_(worker_thread), | |
30 observer_(nullptr), | |
31 id_(config->id), | |
32 state_(quic_transport_channel_->quic_state() == | |
33 cricket::QUIC_TRANSPORT_CONNECTED | |
34 ? kOpen | |
35 : kConnecting), | |
36 buffered_amount_(0), | |
37 num_sent_messages_(0), | |
38 label_(label), | |
39 protocol_(config->protocol), | |
40 send_open_message_(false) { | |
41 quic_transport_channel_->SignalWritableState.connect( | |
42 this, &QuicDataChannel::OnWritableState); | |
43 quic_transport_channel_->SignalClosed.connect( | |
44 this, &QuicDataChannel::OnConnectionClosed); | |
45 if (state_ == kOpen && send_open_message_) { | |
Taylor Brandstetter
2016/04/01 23:23:41
If send_open_message_ is always false I'd just rem
mikescarlett
2016/04/05 19:58:49
Removed.
| |
46 worker_thread_->Invoke<void>( | |
47 rtc::Bind(&QuicDataChannel::SendOpenMessage_w, this)); | |
48 } | |
49 } | |
50 | |
51 QuicDataChannel::~QuicDataChannel() {} | |
52 | |
53 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | |
54 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
55 observer_ = observer; | |
56 } | |
57 void QuicDataChannel::UnregisterObserver() { | |
58 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
59 observer_ = nullptr; | |
60 } | |
61 | |
62 bool QuicDataChannel::Send(const DataBuffer& buffer) { | |
63 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
64 return worker_thread_->Invoke<bool>( | |
65 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | |
66 } | |
67 | |
68 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | |
69 RTC_DCHECK(worker_thread_->IsCurrent()); | |
70 if (state_ != kOpen) { | |
71 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
72 << " is not open so cannot send."; | |
73 return false; | |
74 } | |
75 if (buffer.size() == 0) { | |
76 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
77 << " refuses to send an empty message."; | |
78 return true; | |
79 } | |
80 size_t max_length = buffer.size() + 15; | |
81 rtc::ByteBuffer byte_buffer(nullptr, max_length, | |
82 rtc::ByteBuffer::ByteOrder::ORDER_HOST); | |
83 byte_buffer.WriteVarint(id_); | |
84 byte_buffer.WriteVarint(++num_sent_messages_); | |
85 byte_buffer.WriteBytes(buffer.data.data<char>(), buffer.size()); | |
86 return WriteData_w(byte_buffer.Data(), byte_buffer.Length()); | |
87 } | |
88 | |
89 bool QuicDataChannel::WriteData_w(const char* data, size_t len) { | |
90 RTC_DCHECK(worker_thread_->IsCurrent()); | |
91 cricket::ReliableQuicStream* stream = | |
92 quic_transport_channel_->CreateQuicStream(); | |
93 // The stream should not be NULL. Otherwise an internal error is preventing | |
94 // QUIC streams from being created or the data channel state is invalid. | |
Taylor Brandstetter
2016/04/01 23:23:41
If this can only occur due to an internal error, w
mikescarlett
2016/04/05 19:58:50
I agree now. A minor issue is this will crash if a
| |
95 if (stream == nullptr) { | |
96 LOG(LS_ERROR) << "QUIC data channel " << id_ | |
97 << " is open but failed to allocate a QUIC stream."; | |
98 return false; | |
99 } | |
100 // Send the message with FIN == true, which signals to the remote peer that | |
101 // there is no more data after this message. | |
102 rtc::StreamResult result = stream->Write(data, len, true); | |
103 if (result == rtc::SR_SUCCESS) { | |
104 // The message is sent and we don't need this QUIC stream. | |
105 LOG(INFO) << "Stream " << stream->id() | |
106 << " successfully wrote data for QUIC data channel " << id_; | |
107 stream->Close(); | |
108 return true; | |
109 } else if (result == rtc::SR_BLOCK) { | |
110 // The QUIC stream is write blocked, so the message will be queued by the | |
111 // QUIC session. It might be due to the QUIC transport not being writable, | |
112 // or it may be due to exceeding the QUIC flow control limit. | |
113 LOG(LS_WARNING) << "Stream " << stream->id() | |
114 << " is write blocked for QUIC data channel " << id_; | |
115 if (observer_ != nullptr) { | |
116 observer_->OnBufferedAmountChange(buffered_amount_); | |
117 buffered_amount_ += len; | |
Taylor Brandstetter
2016/04/01 23:23:41
I don't see buffered_amount_ ever decrease. Also w
mikescarlett
2016/04/05 19:58:49
I implemented this by adding a new ReliableQuicStr
| |
118 } | |
119 } | |
120 return false; | |
Taylor Brandstetter
2016/04/01 23:23:41
If the data is buffered successfully, this method
mikescarlett
2016/04/05 19:58:50
Okay buffering returns true now.
| |
121 } | |
122 | |
123 void QuicDataChannel::SendOpenMessage_w() { | |
124 RTC_DCHECK(worker_thread_->IsCurrent()); | |
125 // Not implemented. | |
126 } | |
127 | |
128 void QuicDataChannel::Close() { | |
129 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
130 if (state_ == kClosed) { | |
131 return; | |
132 } | |
133 LOG(LS_INFO) << "Closing QUIC data channel."; | |
134 SetState(kClosing); | |
135 for (auto& kv : incoming_quic_streams_) { | |
136 cricket::ReliableQuicStream* stream = kv.second; | |
137 stream->Close(); | |
138 } | |
139 queued_data_.clear(); | |
140 SetState(kClosed); | |
141 } | |
142 | |
143 void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) { | |
144 RTC_DCHECK(worker_thread_->IsCurrent()); | |
145 LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed."; | |
146 incoming_quic_streams_.erase(stream_id); | |
147 } | |
148 | |
149 void QuicDataChannel::OnIncomingStream(cricket::ReliableQuicStream* stream, | |
150 rtc::ByteBuffer* remaining_bytes) { | |
pthatcher1
2016/03/30 20:34:48
The order should be remaining_bytes, stream, since
mikescarlett
2016/04/05 19:58:50
Done.
| |
151 RTC_DCHECK(worker_thread_->IsCurrent()); | |
152 // The data channel ID has already been read from the byte buffer, so we can | |
153 // skip to the message id. | |
154 uint64_t message_id; | |
155 remaining_bytes->ReadVarint(&message_id); | |
Taylor Brandstetter
2016/04/01 23:23:41
Should probably handle failures of ReadVarint.
mikescarlett
2016/04/05 19:58:49
Done. ReadVarint is in QuicDataTransport now.
| |
156 | |
157 if (stream->fin_received()) { | |
158 LOG(LS_INFO) << "Stream " << stream->id() | |
159 << " has finished receiving data for QUIC data channel " | |
160 << id_; | |
161 // A FIN is received if the message fits into a single QUIC stream frame and | |
162 // the remote peer is done sending. In this case, propagate the data to the | |
163 // observer and close the stream. | |
164 if (observer_ != nullptr) { | |
165 DataBuffer message(rtc::CopyOnWriteBuffer(remaining_bytes->Data(), | |
166 remaining_bytes->Length()), | |
167 false); | |
168 signaling_thread_->Invoke<void>( | |
pthatcher1
2016/03/30 20:34:48
These needs to be an AsyncInvoke
mikescarlett
2016/04/05 19:58:49
Done.
| |
169 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); | |
170 OnMessage_s(message); | |
pthatcher1
2016/03/30 20:34:48
Why do we hop to the signalling thread and then ca
mikescarlett
2016/04/05 19:58:50
That clearly shouldn't be there. Fixed.
| |
171 } else { | |
172 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
173 << " received a message but has no observer."; | |
174 } | |
175 stream->Close(); | |
176 } else { | |
177 // Otherwise the message is divided across multiple QUIC stream frames, so | |
178 // queue the data. OnDataReceived() will be called each time the remaining | |
179 // QUIC stream frames arrive. | |
180 LOG(LS_INFO) << "QUIC data channel " << id_ | |
181 << " is queuing incoming data for stream " << stream->id(); | |
182 incoming_quic_streams_[stream->id()] = stream; | |
183 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream->id()]; | |
pthatcher1
2016/03/30 20:34:48
When does the buffer get added to the queud_data_?
pthatcher1
2016/03/30 20:34:48
I think we just want a normal rtc::Buffer, not an
Taylor Brandstetter
2016/04/01 23:23:41
Do you mean, "when does the object get constructed
mikescarlett
2016/04/05 19:58:49
I changed it to not use operator[] to create the b
mikescarlett
2016/04/05 19:58:49
It needs to be rtc::CopyOnWriteBuffer due to this
| |
184 received_data.AppendData(remaining_bytes->Data(), | |
185 remaining_bytes->Length()); | |
186 stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived); | |
187 stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed); | |
188 } | |
pthatcher1
2016/03/30 20:34:48
Can you use an early return in this code? Perhaps
mikescarlett
2016/04/05 19:58:49
Done.
| |
189 } | |
190 | |
191 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | |
192 const char* data, | |
193 size_t len) { | |
194 RTC_DCHECK(worker_thread_->IsCurrent()); | |
195 cricket::ReliableQuicStream* stream = incoming_quic_streams_[stream_id]; | |
196 // True if the QUIC stream has reeived a FIN, which indicates the remote | |
197 // peer is finished receiving data. | |
198 bool finished = stream->fin_received(); | |
199 // Lookup or create an rtc::CopyOnWriteBuffer. | |
200 rtc::CopyOnWriteBuffer& received_data = queued_data_[stream_id]; | |
pthatcher1
2016/03/30 20:34:48
We should use .find() and check to see if it's in
Taylor Brandstetter
2016/04/01 23:23:41
It always *should* be. So if you do that, put it i
mikescarlett
2016/04/05 19:58:49
I put an RTC_DCHECK. OnDataReceived is only called
| |
201 | |
202 if (finished) { | |
203 LOG(LS_INFO) << "Stream " << stream_id | |
204 << " has finished receiving data for QUIC data channel " | |
205 << id_; | |
206 received_data.AppendData(data, len); | |
207 DataBuffer message(received_data, false); | |
pthatcher1
2016/03/30 20:34:48
We should probably use a map of rtc::Buffer* so th
mikescarlett
2016/04/05 19:58:49
What about using std::move to avoid copying for Da
| |
208 signaling_thread_->Invoke<void>( | |
209 rtc::Bind(&QuicDataChannel::OnMessage_s, this, message)); | |
210 OnMessage_s(message); | |
Taylor Brandstetter
2016/04/01 23:23:41
Again, OnMessage_s shouldn't be called twice.
mikescarlett
2016/04/05 19:58:49
Removed.
| |
211 queued_data_.erase(stream_id); | |
212 stream->Close(); | |
213 } else { | |
214 received_data.AppendData(data, len); | |
215 } | |
pthatcher1
2016/03/30 20:34:48
Same here with early returns. Something like this
mikescarlett
2016/04/05 19:58:49
Done.
| |
216 } | |
pthatcher1
2016/03/30 20:34:48
Now that I see this more completely, I think that
Taylor Brandstetter
2016/04/01 23:23:41
+1
mikescarlett
2016/04/05 19:58:49
I revised it. QuicDataChannelMessage is a struct a
| |
217 | |
218 void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) { | |
219 RTC_DCHECK(worker_thread_->IsCurrent()); | |
220 RTC_DCHECK(channel == quic_transport_channel_); | |
221 state_ = kOpen; | |
222 if (send_open_message_) { | |
223 SendOpenMessage_w(); | |
pthatcher1
2016/03/30 20:34:48
I though we weren't doing the open message in this
mikescarlett
2016/04/05 19:58:49
Right I'm not implementing it. Fixed
| |
224 } | |
225 } | |
226 | |
227 void QuicDataChannel::OnConnectionClosed() { | |
228 RTC_DCHECK(worker_thread_->IsCurrent()); | |
229 SetState(kClosed); | |
230 } | |
231 | |
232 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | |
233 RTC_DCHECK(signaling_thread_->IsCurrent()); | |
234 if (observer_ != nullptr) { | |
Taylor Brandstetter
2016/04/01 23:23:41
If it's OnMessage_s that checks for an observer, y
mikescarlett
2016/04/05 19:58:49
Replaced with RTC_DCHECK.
| |
235 observer_->OnMessage(received_data); | |
236 } else { | |
237 LOG(LS_WARNING) << "QUIC data channel " << id_ | |
238 << " received a message but has no observer."; | |
239 } | |
pthatcher1
2016/03/30 20:34:48
Early return here please.
mikescarlett
2016/04/05 19:58:49
Done.
| |
240 } | |
241 | |
242 void QuicDataChannel::SetState(DataState state) { | |
243 if (state_ == state) { | |
244 return; | |
245 } | |
246 state_ = state; | |
247 if (observer_) { | |
248 observer_->OnStateChange(); | |
249 } | |
250 if (state_ == kClosed) { | |
251 SignalClosed(this); | |
252 } | |
253 } | |
254 | |
255 } // namespace webrtc | |
OLD | NEW |