Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: net/quic/bidirectional_stream_quic_impl.cc

Issue 1744693002: Implement QUIC-based net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@basecl
Patch Set: Address Ryan's comments Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/bidirectional_stream_quic_impl.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/logging.h"
10 #include "base/timer/timer.h"
11 #include "net/http/bidirectional_stream_request_info.h"
12 #include "net/socket/next_proto.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15
16 namespace net {
17
18 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
19 const base::WeakPtr<QuicChromiumClientSession>& session)
20 : session_(session),
21 was_handshake_confirmed_(session->IsCryptoHandshakeConfirmed()),
22 stream_(nullptr),
23 request_info_(nullptr),
24 delegate_(nullptr),
25 response_status_(OK),
26 negotiated_protocol_(kProtoUnknown),
27 read_buffer_len_(0),
28 headers_bytes_received_(0),
29 headers_bytes_sent_(0),
30 closed_stream_received_bytes_(0),
31 closed_stream_sent_bytes_(0),
32 has_sent_headers_(false),
33 has_received_headers_(false),
34 weak_factory_(this) {
35 DCHECK(session_);
36 session_->AddObserver(this);
37 }
38
39 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
40 Cancel();
41 if (session_)
42 session_->RemoveObserver(this);
43 }
44
45 void BidirectionalStreamQuicImpl::Start(
46 const BidirectionalStreamRequestInfo* request_info,
47 const BoundNetLog& net_log,
48 BidirectionalStreamJob::Delegate* delegate,
49 scoped_ptr<base::Timer> /* timer */) {
50 DCHECK(!stream_);
51
52 if (!session_) {
53 response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
54 : ERR_QUIC_HANDSHAKE_FAILED;
55 NotifyError(response_status_);
56 return;
57 }
58
59 delegate_ = delegate;
60 request_info_ = request_info;
61
62 int rv = stream_request_.StartRequest(
63 session_, &stream_,
64 base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
65 weak_factory_.GetWeakPtr()));
66 if (rv == OK) {
67 OnStreamReady(rv);
68 } else if (!was_handshake_confirmed_) {
69 response_status_ = ERR_QUIC_HANDSHAKE_FAILED;
70 NotifyError(response_status_);
71 }
72 }
73
74 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buf, int buf_len) {
75 DCHECK(buf);
76 DCHECK(buf_len);
77
78 if (!stream_) {
79 // If the stream is already closed, there is no body to read.
80 return response_status_;
81 }
82 int rv = stream_->Read(buf, buf_len);
83 if (rv != ERR_IO_PENDING) {
84 if (stream_->IsDoneReading()) {
85 stream_->SetDelegate(nullptr);
86 stream_->OnFinRead(); // If write side is close, will call OnClose.
87 ResetStream();
88 }
89 return rv;
90 }
91 // Read will complete asynchronously and Delegate::OnReadCompleted will be
92 // called upon completion.
93 read_buffer_ = buf;
94 read_buffer_len_ = buf_len;
95 return ERR_IO_PENDING;
96 }
97
98 void BidirectionalStreamQuicImpl::SendData(IOBuffer* data,
99 int length,
100 bool end_stream) {
101 DCHECK(stream_);
102
103 if (length > 0 || end_stream) {
104 base::StringPiece string_data(data->data(), length);
105 int rv = stream_->WriteStreamData(
106 string_data, end_stream,
107 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
108 weak_factory_.GetWeakPtr()));
109 DCHECK(rv == OK || rv == ERR_IO_PENDING);
110 if (rv == OK) {
111 base::ThreadTaskRunnerHandle::Get()->PostTask(
112 FROM_HERE,
113 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
114 weak_factory_.GetWeakPtr(), OK));
115 }
116 }
117 }
118
119 void BidirectionalStreamQuicImpl::Cancel() {
120 if (stream_) {
121 stream_->SetDelegate(nullptr);
122 stream_->Reset(QUIC_STREAM_CANCELLED);
123 ResetStream();
124 }
125 }
126
127 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
128 return negotiated_protocol_;
129 }
130
131 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
132 int64_t total_received_bytes = headers_bytes_received_;
133 if (stream_) {
134 total_received_bytes += stream_->stream_bytes_read();
mef 2016/02/29 16:26:52 Could just "return headers_bytes_received_ + strea
xunjieli 2016/02/29 16:34:09 Done.
135 } else {
136 total_received_bytes += closed_stream_received_bytes_;
137 }
138 return total_received_bytes;
139 }
140
141 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
142 int64_t total_sent_bytes = headers_bytes_sent_;
143 if (stream_) {
144 total_sent_bytes += stream_->stream_bytes_written();
145 } else {
146 total_sent_bytes += closed_stream_sent_bytes_;
147 }
148 return total_sent_bytes;
149 }
150
151 void BidirectionalStreamQuicImpl::OnHeadersAvailable(
152 const SpdyHeaderBlock& headers,
153 size_t frame_len) {
154 headers_bytes_received_ += frame_len;
155 negotiated_protocol_ = kProtoQUIC1SPDY3;
156 if (!has_received_headers_) {
157 has_received_headers_ = true;
158 delegate_->OnHeadersReceived(headers);
159 } else {
160 if (stream_->IsDoneReading()) {
161 stream_->SetDelegate(nullptr);
162 stream_->OnFinRead(); // If write side is close, will call OnClose
163 ResetStream();
164 }
165 delegate_->OnTrailersReceived(headers);
166 }
167 }
168
169 void BidirectionalStreamQuicImpl::OnDataAvailable() {
170 // Return early if ReadData has not been called.
171 if (!read_buffer_)
172 return;
173
174 CHECK(read_buffer_);
175 CHECK_NE(0, read_buffer_len_);
176 int rv = ReadData(read_buffer_.get(), read_buffer_len_);
177 if (rv == ERR_IO_PENDING) {
178 // Spurrious notification. Wait for the next one.
179 return;
180 }
181 read_buffer_ = nullptr;
182 read_buffer_len_ = 0;
183 delegate_->OnDataRead(rv);
184 }
185
186 void BidirectionalStreamQuicImpl::OnClose(QuicErrorCode error) {
187 DCHECK(stream_);
188 if (error == QUIC_NO_ERROR &&
189 stream_->stream_error() == QUIC_STREAM_NO_ERROR) {
190 ResetStream();
191 return;
192 }
193 response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
194 : ERR_QUIC_HANDSHAKE_FAILED;
195 ResetStream();
196 NotifyError(response_status_);
197 }
198
199 void BidirectionalStreamQuicImpl::OnError(int error) {
200 NotifyError(error);
201 }
202
203 bool BidirectionalStreamQuicImpl::HasSendHeadersComplete() {
204 return has_sent_headers_;
205 }
206
207 void BidirectionalStreamQuicImpl::OnCryptoHandshakeConfirmed() {
208 was_handshake_confirmed_ = true;
209 }
210
211 void BidirectionalStreamQuicImpl::OnSessionClosed(int error) {
212 DCHECK_NE(OK, error);
213 session_.reset();
214 NotifyError(error);
215 }
216
217 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
218 DCHECK_NE(ERR_IO_PENDING, rv);
219 DCHECK(rv == OK || !stream_);
220 if (rv == OK) {
221 stream_->SetDelegate(this);
222 SendRequestHeaders();
223 } else {
224 response_status_ = rv;
225 NotifyError(response_status_);
226 }
227 }
228
229 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
230 DCHECK(rv == OK || !stream_);
231 if (rv == OK) {
232 delegate_->OnDataSent();
233 } else {
234 NotifyError(rv);
235 }
236 }
237
238 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
239 DCHECK(!has_sent_headers_);
240 DCHECK(stream_);
241
242 SpdyHeaderBlock headers;
243 HttpRequestInfo http_request_info;
244 http_request_info.url = request_info_->url;
245 http_request_info.method = request_info_->method;
246 http_request_info.extra_headers = request_info_->extra_headers;
247
248 CreateSpdyHeadersFromHttpRequest(http_request_info,
249 http_request_info.extra_headers, HTTP2, true,
250 &headers);
251 size_t frame_len = stream_->WriteHeaders(
252 headers, request_info_->end_stream_on_headers, nullptr);
253 headers_bytes_sent_ += frame_len;
254 has_sent_headers_ = true;
255 delegate_->OnHeadersSent();
256 }
257
258 void BidirectionalStreamQuicImpl::NotifyError(int error) {
259 DCHECK_NE(OK, error);
260 DCHECK_NE(ERR_IO_PENDING, error);
261
262 ResetStream();
263 delegate_->OnFailed(error);
264 }
265
266 void BidirectionalStreamQuicImpl::ResetStream() {
267 if (!stream_)
268 return;
269 closed_stream_received_bytes_ = stream_->stream_bytes_read();
270 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
271 stream_ = nullptr;
272 }
273
274 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698