Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(916)

Side by Side Diff: net/quic/bidirectional_stream_quic_job.cc

Issue 1744693002: Implement QUIC-based net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@basecl
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
Ryan Hamilton 2016/02/27 00:21:14 It looks like this class has a lot of duplicate co
xunjieli 2016/02/29 15:21:37 I'd like to think this is a thin wrapper on top of
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/bidirectional_stream_quic_job.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/logging.h"
10 #include "base/timer/timer.h"
11 #include "net/http/bidirectional_stream_request_info.h"
12 #include "net/socket/next_proto.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15
16 namespace net {
17
18 BidirectionalStreamQuicJob::BidirectionalStreamQuicJob(
19 const base::WeakPtr<QuicChromiumClientSession>& session)
20 : session_(session),
21 was_handshake_confirmed_(session->IsCryptoHandshakeConfirmed()),
22 stream_(nullptr),
23 request_info_(nullptr),
24 delegate_(nullptr),
25 response_status_(OK),
26 negotiated_protocol_(kProtoUnknown),
27 read_buffer_len_(0),
28 headers_bytes_received_(0),
29 headers_bytes_sent_(0),
30 closed_stream_received_bytes_(0),
31 closed_stream_sent_bytes_(0),
32 has_sent_headers_(false),
33 has_received_headers_(false),
34 weak_factory_(this) {
35 DCHECK(session_);
36 session_->AddObserver(this);
37 }
38
39 BidirectionalStreamQuicJob::~BidirectionalStreamQuicJob() {
40 Cancel();
41 if (session_)
42 session_->RemoveObserver(this);
43 }
44
45 void BidirectionalStreamQuicJob::Start(
46 const BidirectionalStreamRequestInfo* request_info,
47 const BoundNetLog& net_log,
48 BidirectionalStreamJob::Delegate* delegate,
49 scoped_ptr<base::Timer> /* timer */) {
50 DCHECK(!stream_);
51
52 if (!session_) {
53 response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
54 : ERR_QUIC_HANDSHAKE_FAILED;
55 NotifyError(response_status_);
56 return;
57 }
58
59 delegate_ = delegate;
60 request_info_ = request_info;
61
62 int rv = stream_request_.StartRequest(
63 session_, &stream_, base::Bind(&BidirectionalStreamQuicJob::OnStreamReady,
64 weak_factory_.GetWeakPtr()));
65 if (rv == OK) {
66 OnStreamReady(rv);
67 } else if (!was_handshake_confirmed_) {
68 response_status_ = ERR_QUIC_HANDSHAKE_FAILED;
69 NotifyError(response_status_);
70 }
71 }
72
73 int BidirectionalStreamQuicJob::ReadData(IOBuffer* buf, int buf_len) {
74 DCHECK(buf);
75 DCHECK(buf_len);
76
77 if (!stream_) {
78 // If the stream is already closed, there is no body to read.
79 return response_status_;
80 }
81 int rv = stream_->Read(buf, buf_len);
82 if (rv != ERR_IO_PENDING) {
83 if (stream_->IsDoneReading()) {
84 stream_->SetDelegate(nullptr);
85 stream_->OnFinRead(); // If write side is close, will call OnClose.
86 ResetStream();
87 }
88 return rv;
89 }
90 // Read will complete asynchronously and Delegate::OnReadCompleted will be
91 // called upon completion.
92 read_buffer_ = buf;
93 read_buffer_len_ = buf_len;
94 return ERR_IO_PENDING;
95 }
96
97 void BidirectionalStreamQuicJob::SendData(IOBuffer* data,
98 int length,
99 bool end_stream) {
100 DCHECK(stream_);
101
102 if (length > 0 || end_stream) {
103 base::StringPiece string_data(data->data(), length);
104 int rv = stream_->WriteStreamData(
105 string_data, end_stream,
106 base::Bind(&BidirectionalStreamQuicJob::OnSendDataComplete,
107 weak_factory_.GetWeakPtr()));
108 DCHECK(rv == OK || rv == ERR_IO_PENDING);
109 if (rv == OK) {
110 base::ThreadTaskRunnerHandle::Get()->PostTask(
111 FROM_HERE, base::Bind(&BidirectionalStreamQuicJob::OnSendDataComplete,
112 weak_factory_.GetWeakPtr(), OK));
113 }
114 }
115 }
116
117 void BidirectionalStreamQuicJob::Cancel() {
118 if (stream_) {
119 stream_->SetDelegate(nullptr);
120 stream_->Reset(QUIC_STREAM_CANCELLED);
121 ResetStream();
122 }
123 }
124
125 NextProto BidirectionalStreamQuicJob::GetProtocol() const {
126 return negotiated_protocol_;
127 }
128
129 int64_t BidirectionalStreamQuicJob::GetTotalReceivedBytes() const {
130 int64_t total_received_bytes = headers_bytes_received_;
131 if (stream_) {
132 total_received_bytes += stream_->stream_bytes_read();
133 } else {
134 total_received_bytes += closed_stream_received_bytes_;
135 }
136 return total_received_bytes;
137 }
138
139 int64_t BidirectionalStreamQuicJob::GetTotalSentBytes() const {
140 int64_t total_sent_bytes = headers_bytes_sent_;
141 if (stream_) {
142 total_sent_bytes += stream_->stream_bytes_written();
143 } else {
144 total_sent_bytes += closed_stream_sent_bytes_;
145 }
146 return total_sent_bytes;
147 }
148
149 void BidirectionalStreamQuicJob::OnHeadersAvailable(
150 const SpdyHeaderBlock& headers,
151 size_t frame_len) {
152 headers_bytes_received_ += frame_len;
153 negotiated_protocol_ = kProtoQUIC1SPDY3;
154 if (!has_received_headers_) {
155 has_received_headers_ = true;
156 delegate_->OnHeadersReceived(headers);
157 } else {
158 if (stream_->IsDoneReading()) {
159 stream_->SetDelegate(nullptr);
160 stream_->OnFinRead(); // If write side is close, will call OnClose
161 ResetStream();
162 }
163 delegate_->OnTrailersReceived(headers);
164 }
165 }
166
167 void BidirectionalStreamQuicJob::OnDataAvailable() {
168 // Return early if ReadData has not been called.
169 if (!read_buffer_)
170 return;
171
172 CHECK(read_buffer_);
173 CHECK_NE(0, read_buffer_len_);
174 int rv = ReadData(read_buffer_.get(), read_buffer_len_);
175 if (rv == ERR_IO_PENDING) {
176 // Spurrious notification. Wait for the next one.
177 return;
178 }
179 read_buffer_ = nullptr;
180 read_buffer_len_ = 0;
181 delegate_->OnDataRead(rv);
182 }
183
184 void BidirectionalStreamQuicJob::OnClose(QuicErrorCode error) {
185 DCHECK(stream_);
186 if (error == QUIC_NO_ERROR &&
187 stream_->stream_error() == QUIC_STREAM_NO_ERROR) {
188 ResetStream();
189 return;
190 }
191 response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
192 : ERR_QUIC_HANDSHAKE_FAILED;
193 ResetStream();
194 NotifyError(response_status_);
195 }
196
197 void BidirectionalStreamQuicJob::OnError(int error) {
198 NotifyError(error);
199 }
200
201 bool BidirectionalStreamQuicJob::HasSendHeadersComplete() {
202 return has_sent_headers_;
203 }
204
205 void BidirectionalStreamQuicJob::OnCryptoHandshakeConfirmed() {
206 was_handshake_confirmed_ = true;
207 }
208
209 void BidirectionalStreamQuicJob::OnSessionClosed(int error) {
210 DCHECK_NE(OK, error);
211 session_.reset();
212 NotifyError(error);
213 }
214
215 void BidirectionalStreamQuicJob::OnStreamReady(int rv) {
216 DCHECK_NE(ERR_IO_PENDING, rv);
217 DCHECK(rv == OK || !stream_);
218 if (rv == OK) {
219 stream_->SetDelegate(this);
220 SendRequestHeaders();
221 } else {
222 response_status_ = rv;
223 NotifyError(response_status_);
224 }
225 }
226
227 void BidirectionalStreamQuicJob::OnSendDataComplete(int rv) {
228 DCHECK(rv == OK || !stream_);
229 if (rv == OK) {
230 delegate_->OnDataSent();
231 } else {
232 NotifyError(rv);
233 }
234 }
235
236 void BidirectionalStreamQuicJob::SendRequestHeaders() {
237 DCHECK(!has_sent_headers_);
238 DCHECK(stream_);
239
240 SpdyHeaderBlock headers;
241 HttpRequestInfo http_request_info;
242 http_request_info.url = request_info_->url;
243 http_request_info.method = request_info_->method;
244 http_request_info.extra_headers = request_info_->extra_headers;
245
246 CreateSpdyHeadersFromHttpRequest(http_request_info,
247 http_request_info.extra_headers, HTTP2, true,
248 &headers);
249 size_t frame_len = stream_->WriteHeaders(
250 headers, request_info_->end_stream_on_headers, nullptr);
251 headers_bytes_sent_ += frame_len;
252 has_sent_headers_ = true;
253 delegate_->OnHeadersSent();
254 }
255
256 void BidirectionalStreamQuicJob::NotifyError(int error) {
257 DCHECK_NE(OK, error);
258 DCHECK_NE(ERR_IO_PENDING, error);
259
260 ResetStream();
261 delegate_->OnFailed(error);
262 }
263
264 void BidirectionalStreamQuicJob::ResetStream() {
265 if (!stream_)
266 return;
267 closed_stream_received_bytes_ = stream_->stream_bytes_read();
268 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
269 stream_ = nullptr;
270 }
271
272 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698