OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/bidirectional_stream_spdy_job.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/memory/scoped_ptr.h" | |
mmenke
2015/12/11 22:19:58
Nit: This should be in the header.
xunjieli
2015/12/11 23:48:40
Done.
| |
9 #include "base/time/time.h" | |
10 #include "base/timer/timer.h" | |
11 #include "net/base/request_priority.h" | |
12 #include "net/spdy/spdy_buffer.h" | |
13 #include "net/spdy/spdy_header_block.h" | |
14 #include "net/spdy/spdy_http_utils.h" | |
15 #include "net/spdy/spdy_stream.h" | |
16 | |
17 namespace net { | |
18 | |
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
mmenke
2015/12/11 22:19:59
global must be POD types. Standard way to do this
xunjieli
2015/12/11 23:48:40
Done.
| |
20 | |
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob( | |
22 const base::WeakPtr<SpdySession>& spdy_session, | |
23 scoped_ptr<base::Timer> timer) | |
24 : spdy_session_(spdy_session), | |
25 timer_(timer.release()), | |
mmenke
2015/12/11 22:19:58
std::move(timer)
xunjieli
2015/12/11 23:48:40
Done.
| |
26 stream_closed_(false), | |
27 closed_stream_status_(ERR_FAILED), | |
28 more_read_data_pending_(false), | |
29 negotiated_protocol_(kProtoUnknown), | |
30 closed_stream_received_bytes_(0), | |
31 closed_stream_sent_bytes_(0), | |
32 weak_factory_(this) {} | |
33 | |
34 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob( | |
mmenke
2015/12/11 22:19:59
definition order should match delcaration order.
xunjieli
2015/12/11 23:48:40
Done.
| |
35 const base::WeakPtr<SpdySession>& spdy_session) | |
36 : BidirectionalStreamSpdyJob( | |
37 spdy_session, | |
38 make_scoped_ptr(new base::Timer(false, false))) {} | |
39 | |
40 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() { | |
41 if (stream_.get()) { | |
mmenke
2015/12/11 22:19:59
nit: .get() not needed.
xunjieli
2015/12/11 23:48:40
Done.
| |
42 stream_->DetachDelegate(); | |
43 DCHECK(!stream_.get()); | |
mmenke
2015/12/11 22:19:58
Am I missing something, or can this not currently
mmenke
2015/12/11 22:19:58
nit: .get() not needed.
xunjieli
2015/12/11 23:48:40
Why is it unsafe? The delegate can't delete the jo
xunjieli
2015/12/11 23:48:40
Done.
mmenke
2015/12/14 19:48:37
Sorry, I wasn't remotely clear there.
See:
https:
mmenke
2015/12/14 19:52:34
And I think we should have tests for destruction d
xunjieli
2015/12/14 21:03:05
Done. Thanks for pointing me to the code. I have d
| |
44 } | |
45 } | |
46 | |
47 void BidirectionalStreamSpdyJob::Start( | |
48 const HttpRequestInfo& request_info, | |
49 RequestPriority priority, | |
50 const BoundNetLog& net_log, | |
51 BidirectionalStreamJob::Delegate* delegate) { | |
52 delegate_ = delegate; | |
53 DCHECK(!stream_); | |
54 if (!spdy_session_) { | |
55 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
56 return; | |
57 } | |
58 | |
59 request_info_ = request_info; | |
60 | |
61 int rv = stream_request_.StartRequest( | |
62 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority, | |
63 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized, | |
64 weak_factory_.GetWeakPtr())); | |
65 if (rv != ERR_IO_PENDING) | |
66 OnStreamInitialized(rv); | |
67 } | |
68 | |
69 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) { | |
70 if (stream_.get()) | |
71 CHECK(!stream_->IsIdle()); | |
72 | |
73 CHECK(buf); | |
74 CHECK(buf_len); | |
75 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight"; | |
76 | |
77 if (!stream_closed_) | |
78 CHECK(stream_); | |
mmenke
2015/12/11 22:19:58
All these CHECKs should be DCHECKs.
xunjieli
2015/12/11 23:48:40
Done.
| |
79 | |
80 // If there is data buffered, complete the IO immediately. | |
81 if (!data_queue_.IsEmpty()) { | |
82 return data_queue_.Dequeue(buf->data(), buf_len); | |
83 } else if (stream_closed_) { | |
84 return closed_stream_status_; | |
85 } | |
86 // Read will complete asynchronously and Delegate::OnReadCompleted will be | |
87 // called upon completion. | |
88 user_buffer_ = buf; | |
89 user_buffer_len_ = buf_len; | |
90 return ERR_IO_PENDING; | |
91 } | |
92 | |
93 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data, | |
94 int length, | |
95 bool end_stream) { | |
96 CHECK(!stream_closed_); | |
97 CHECK(stream_); | |
98 | |
99 stream_->SendData(data, length, | |
100 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
101 } | |
102 | |
103 void BidirectionalStreamSpdyJob::Cancel() { | |
104 if (!stream_) | |
105 return; | |
106 // Cancels the stream and detaches the delegate so it doesn't get called back. | |
107 stream_->DetachDelegate(); | |
108 DCHECK(!stream_); | |
109 } | |
110 | |
111 NextProto BidirectionalStreamSpdyJob::GetProtocol() const { | |
112 return negotiated_protocol_; | |
113 } | |
114 | |
115 int64_t BidirectionalStreamSpdyJob::GetTotalReceivedBytes() const { | |
116 if (stream_closed_) | |
117 return closed_stream_received_bytes_; | |
118 | |
119 if (!stream_) | |
120 return 0; | |
121 | |
122 return stream_->raw_received_bytes(); | |
123 } | |
124 | |
125 int64_t BidirectionalStreamSpdyJob::GetTotalSentBytes() const { | |
126 if (stream_closed_) | |
127 return closed_stream_sent_bytes_; | |
128 | |
129 if (!stream_) | |
130 return 0; | |
131 | |
132 return stream_->raw_sent_bytes(); | |
133 } | |
134 | |
135 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() { | |
136 DCHECK(stream_); | |
137 | |
138 delegate_->OnHeadersSent(); | |
139 } | |
140 | |
141 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated( | |
142 const SpdyHeaderBlock& response_headers) { | |
143 DCHECK(stream_); | |
144 | |
145 negotiated_protocol_ = stream_->GetProtocol(); | |
146 delegate_->OnHeadersReceived(response_headers); | |
147 return RESPONSE_HEADERS_ARE_COMPLETE; | |
148 } | |
149 | |
150 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
151 DCHECK(stream_); | |
152 DCHECK(!stream_closed_); | |
153 | |
154 if (buffer) { | |
mmenke
2015/12/11 22:19:58
Maybe early return if NULL?
xunjieli
2015/12/11 23:48:40
Done.
| |
155 data_queue_.Enqueue(buffer.Pass()); | |
mmenke
2015/12/11 22:19:59
std::move
mmenke
2015/12/11 22:19:59
What about flow control?
I'm not familiar with th
xunjieli
2015/12/11 23:48:40
Done.
xunjieli
2015/12/11 23:48:40
Good question. I added comment. I believe that Spd
mmenke
2015/12/14 19:48:37
I'll dig into this a bit before responding.
xunjieli
2015/12/14 21:03:05
Acknowledged.
| |
156 if (user_buffer_) { | |
157 // Handing small chunks of data to the caller creates measurable overhead. | |
158 // So buffer data in short time-spans and send a single read notification. | |
159 ScheduleBufferedRead(); | |
mmenke
2015/12/11 22:19:58
Can we just not make this call if data_queue_ was
xunjieli
2015/12/11 23:48:40
There is one case where we still need to invoke On
mmenke
2015/12/14 19:48:37
Think it would actually be simpler to just have On
xunjieli
2015/12/14 21:03:05
We are having OnClose calling into Delegate via De
| |
160 } | |
161 } | |
162 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by | |
163 // SpdyStream to indicate the end of stream. | |
164 } | |
165 | |
166 void BidirectionalStreamSpdyJob::OnDataSent() { | |
167 DCHECK(stream_); | |
168 DCHECK(!stream_closed_); | |
169 | |
170 delegate_->OnDataSent(); | |
171 } | |
172 | |
173 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) { | |
174 DCHECK(stream_); | |
175 DCHECK(!stream_closed_); | |
176 | |
177 delegate_->OnTrailersReceived(trailers); | |
178 } | |
179 | |
180 void BidirectionalStreamSpdyJob::OnClose(int status) { | |
181 DCHECK(stream_); | |
182 | |
183 stream_closed_ = true; | |
184 closed_stream_status_ = status; | |
185 closed_stream_received_bytes_ = stream_->raw_received_bytes(); | |
186 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); | |
187 stream_.reset(); | |
188 | |
189 if (status != OK) { | |
190 delegate_->OnFailed(status); | |
191 return; | |
192 } | |
193 // Complete any remaining read, as all data has been buffered. | |
194 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will | |
195 // do nothing. | |
196 DCHECK_EQ(OK, status); | |
197 timer_->Stop(); | |
198 DoBufferedRead(); | |
199 } | |
200 | |
201 void BidirectionalStreamSpdyJob::SendRequestHeaders() { | |
202 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
203 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers, | |
204 stream_->GetProtocolVersion(), true, | |
205 headers.get()); | |
206 stream_->SendRequestHeaders(headers.Pass(), MORE_DATA_TO_SEND); | |
mmenke
2015/12/11 22:19:58
std::move
xunjieli
2015/12/11 23:48:40
Done.
| |
207 } | |
208 | |
209 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) { | |
210 DCHECK_NE(ERR_IO_PENDING, rv); | |
mmenke
2015/12/11 22:19:58
nit: include base/logging.h
xunjieli
2015/12/11 23:48:40
Done.
| |
211 if (rv == OK) { | |
212 stream_ = stream_request_.ReleaseStream(); | |
213 stream_->SetDelegate(this); | |
214 SendRequestHeaders(); | |
215 return; | |
216 } | |
217 delegate_->OnFailed(static_cast<Error>(rv)); | |
218 } | |
219 | |
220 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() { | |
221 // If there is already a scheduled DoBufferedRead, don't issue | |
222 // another one. Mark that we have received more data and return. | |
223 if (timer_->IsRunning()) { | |
224 more_read_data_pending_ = true; | |
225 return; | |
226 } | |
227 | |
228 more_read_data_pending_ = false; | |
229 timer_->Start(FROM_HERE, kBufferTime, | |
mmenke
2015/12/11 22:19:58
nit: Include base/location.h for FROM_HERE.
xunjieli
2015/12/11 23:48:40
Done.
| |
230 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead, | |
231 weak_factory_.GetWeakPtr())); | |
232 } | |
233 | |
234 void BidirectionalStreamSpdyJob::DoBufferedRead() { | |
235 DCHECK(!timer_->IsRunning()); | |
236 // If the stream errored out, do not complete the read. | |
mmenke
2015/12/11 22:19:59
Should we just DCHECK on this, and not allow it, i
xunjieli
2015/12/11 23:48:40
Done.
| |
237 if (!stream_ && !stream_closed_) | |
238 return; | |
239 if (stream_closed_ && closed_stream_status_ != OK) | |
240 return; | |
241 | |
242 // When |more_read_data_pending_| is true, it means that more data has arrived | |
243 // since started waiting. Wait a little longer and continue to buffer. | |
244 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
mmenke
2015/12/11 22:19:58
Why do we need more_read_data_pending_? Can't we
xunjieli
2015/12/11 23:48:41
I was copying SpdyHttpStream's logic. The ShouldWa
mmenke
2015/12/14 19:48:37
Hrm...I think the extra variable here makes this c
xunjieli
2015/12/14 21:03:05
I don't think ShouldWaitForMoreBufferedData is eno
| |
245 ScheduleBufferedRead(); | |
246 return; | |
247 } | |
248 | |
249 int rv = 0; | |
250 if (user_buffer_) { | |
251 rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
252 DCHECK_NE(ERR_IO_PENDING, rv); | |
253 user_buffer_ = nullptr; | |
254 user_buffer_len_ = 0; | |
255 delegate_->OnDataRead(rv); | |
256 } | |
257 } | |
258 | |
259 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const { | |
260 if (stream_closed_) | |
261 return false; | |
262 DCHECK_GT(user_buffer_len_, 0); | |
263 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
264 } | |
265 | |
266 } // namespace net | |
OLD | NEW |