Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Matt's comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
mmenke 2015/12/11 22:19:58 Nit: This should be in the header.
xunjieli 2015/12/11 23:48:40 Done.
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/request_priority.h"
12 #include "net/spdy/spdy_buffer.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15 #include "net/spdy/spdy_stream.h"
16
17 namespace net {
18
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
mmenke 2015/12/11 22:19:59 global must be POD types. Standard way to do this
xunjieli 2015/12/11 23:48:40 Done.
20
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
22 const base::WeakPtr<SpdySession>& spdy_session,
23 scoped_ptr<base::Timer> timer)
24 : spdy_session_(spdy_session),
25 timer_(timer.release()),
mmenke 2015/12/11 22:19:58 std::move(timer)
xunjieli 2015/12/11 23:48:40 Done.
26 stream_closed_(false),
27 closed_stream_status_(ERR_FAILED),
28 more_read_data_pending_(false),
29 negotiated_protocol_(kProtoUnknown),
30 closed_stream_received_bytes_(0),
31 closed_stream_sent_bytes_(0),
32 weak_factory_(this) {}
33
34 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
mmenke 2015/12/11 22:19:59 definition order should match delcaration order.
xunjieli 2015/12/11 23:48:40 Done.
35 const base::WeakPtr<SpdySession>& spdy_session)
36 : BidirectionalStreamSpdyJob(
37 spdy_session,
38 make_scoped_ptr(new base::Timer(false, false))) {}
39
40 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
41 if (stream_.get()) {
mmenke 2015/12/11 22:19:59 nit: .get() not needed.
xunjieli 2015/12/11 23:48:40 Done.
42 stream_->DetachDelegate();
43 DCHECK(!stream_.get());
mmenke 2015/12/11 22:19:58 Am I missing something, or can this not currently
mmenke 2015/12/11 22:19:58 nit: .get() not needed.
xunjieli 2015/12/11 23:48:40 Why is it unsafe? The delegate can't delete the jo
xunjieli 2015/12/11 23:48:40 Done.
mmenke 2015/12/14 19:48:37 Sorry, I wasn't remotely clear there. See: https:
mmenke 2015/12/14 19:52:34 And I think we should have tests for destruction d
xunjieli 2015/12/14 21:03:05 Done. Thanks for pointing me to the code. I have d
44 }
45 }
46
47 void BidirectionalStreamSpdyJob::Start(
48 const HttpRequestInfo& request_info,
49 RequestPriority priority,
50 const BoundNetLog& net_log,
51 BidirectionalStreamJob::Delegate* delegate) {
52 delegate_ = delegate;
53 DCHECK(!stream_);
54 if (!spdy_session_) {
55 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
56 return;
57 }
58
59 request_info_ = request_info;
60
61 int rv = stream_request_.StartRequest(
62 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority,
63 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
64 weak_factory_.GetWeakPtr()));
65 if (rv != ERR_IO_PENDING)
66 OnStreamInitialized(rv);
67 }
68
69 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
70 if (stream_.get())
71 CHECK(!stream_->IsIdle());
72
73 CHECK(buf);
74 CHECK(buf_len);
75 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
76
77 if (!stream_closed_)
78 CHECK(stream_);
mmenke 2015/12/11 22:19:58 All these CHECKs should be DCHECKs.
xunjieli 2015/12/11 23:48:40 Done.
79
80 // If there is data buffered, complete the IO immediately.
81 if (!data_queue_.IsEmpty()) {
82 return data_queue_.Dequeue(buf->data(), buf_len);
83 } else if (stream_closed_) {
84 return closed_stream_status_;
85 }
86 // Read will complete asynchronously and Delegate::OnReadCompleted will be
87 // called upon completion.
88 user_buffer_ = buf;
89 user_buffer_len_ = buf_len;
90 return ERR_IO_PENDING;
91 }
92
93 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
94 int length,
95 bool end_stream) {
96 CHECK(!stream_closed_);
97 CHECK(stream_);
98
99 stream_->SendData(data, length,
100 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
101 }
102
103 void BidirectionalStreamSpdyJob::Cancel() {
104 if (!stream_)
105 return;
106 // Cancels the stream and detaches the delegate so it doesn't get called back.
107 stream_->DetachDelegate();
108 DCHECK(!stream_);
109 }
110
111 NextProto BidirectionalStreamSpdyJob::GetProtocol() const {
112 return negotiated_protocol_;
113 }
114
115 int64_t BidirectionalStreamSpdyJob::GetTotalReceivedBytes() const {
116 if (stream_closed_)
117 return closed_stream_received_bytes_;
118
119 if (!stream_)
120 return 0;
121
122 return stream_->raw_received_bytes();
123 }
124
125 int64_t BidirectionalStreamSpdyJob::GetTotalSentBytes() const {
126 if (stream_closed_)
127 return closed_stream_sent_bytes_;
128
129 if (!stream_)
130 return 0;
131
132 return stream_->raw_sent_bytes();
133 }
134
135 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
136 DCHECK(stream_);
137
138 delegate_->OnHeadersSent();
139 }
140
141 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
142 const SpdyHeaderBlock& response_headers) {
143 DCHECK(stream_);
144
145 negotiated_protocol_ = stream_->GetProtocol();
146 delegate_->OnHeadersReceived(response_headers);
147 return RESPONSE_HEADERS_ARE_COMPLETE;
148 }
149
150 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
151 DCHECK(stream_);
152 DCHECK(!stream_closed_);
153
154 if (buffer) {
mmenke 2015/12/11 22:19:58 Maybe early return if NULL?
xunjieli 2015/12/11 23:48:40 Done.
155 data_queue_.Enqueue(buffer.Pass());
mmenke 2015/12/11 22:19:59 std::move
mmenke 2015/12/11 22:19:59 What about flow control? I'm not familiar with th
xunjieli 2015/12/11 23:48:40 Done.
xunjieli 2015/12/11 23:48:40 Good question. I added comment. I believe that Spd
mmenke 2015/12/14 19:48:37 I'll dig into this a bit before responding.
xunjieli 2015/12/14 21:03:05 Acknowledged.
156 if (user_buffer_) {
157 // Handing small chunks of data to the caller creates measurable overhead.
158 // So buffer data in short time-spans and send a single read notification.
159 ScheduleBufferedRead();
mmenke 2015/12/11 22:19:58 Can we just not make this call if data_queue_ was
xunjieli 2015/12/11 23:48:40 There is one case where we still need to invoke On
mmenke 2015/12/14 19:48:37 Think it would actually be simpler to just have On
xunjieli 2015/12/14 21:03:05 We are having OnClose calling into Delegate via De
160 }
161 }
162 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
163 // SpdyStream to indicate the end of stream.
164 }
165
166 void BidirectionalStreamSpdyJob::OnDataSent() {
167 DCHECK(stream_);
168 DCHECK(!stream_closed_);
169
170 delegate_->OnDataSent();
171 }
172
173 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
174 DCHECK(stream_);
175 DCHECK(!stream_closed_);
176
177 delegate_->OnTrailersReceived(trailers);
178 }
179
180 void BidirectionalStreamSpdyJob::OnClose(int status) {
181 DCHECK(stream_);
182
183 stream_closed_ = true;
184 closed_stream_status_ = status;
185 closed_stream_received_bytes_ = stream_->raw_received_bytes();
186 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
187 stream_.reset();
188
189 if (status != OK) {
190 delegate_->OnFailed(status);
191 return;
192 }
193 // Complete any remaining read, as all data has been buffered.
194 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will
195 // do nothing.
196 DCHECK_EQ(OK, status);
197 timer_->Stop();
198 DoBufferedRead();
199 }
200
201 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
202 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
203 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
204 stream_->GetProtocolVersion(), true,
205 headers.get());
206 stream_->SendRequestHeaders(headers.Pass(), MORE_DATA_TO_SEND);
mmenke 2015/12/11 22:19:58 std::move
xunjieli 2015/12/11 23:48:40 Done.
207 }
208
209 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
210 DCHECK_NE(ERR_IO_PENDING, rv);
mmenke 2015/12/11 22:19:58 nit: include base/logging.h
xunjieli 2015/12/11 23:48:40 Done.
211 if (rv == OK) {
212 stream_ = stream_request_.ReleaseStream();
213 stream_->SetDelegate(this);
214 SendRequestHeaders();
215 return;
216 }
217 delegate_->OnFailed(static_cast<Error>(rv));
218 }
219
220 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
221 // If there is already a scheduled DoBufferedRead, don't issue
222 // another one. Mark that we have received more data and return.
223 if (timer_->IsRunning()) {
224 more_read_data_pending_ = true;
225 return;
226 }
227
228 more_read_data_pending_ = false;
229 timer_->Start(FROM_HERE, kBufferTime,
mmenke 2015/12/11 22:19:58 nit: Include base/location.h for FROM_HERE.
xunjieli 2015/12/11 23:48:40 Done.
230 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
231 weak_factory_.GetWeakPtr()));
232 }
233
234 void BidirectionalStreamSpdyJob::DoBufferedRead() {
235 DCHECK(!timer_->IsRunning());
236 // If the stream errored out, do not complete the read.
mmenke 2015/12/11 22:19:59 Should we just DCHECK on this, and not allow it, i
xunjieli 2015/12/11 23:48:40 Done.
237 if (!stream_ && !stream_closed_)
238 return;
239 if (stream_closed_ && closed_stream_status_ != OK)
240 return;
241
242 // When |more_read_data_pending_| is true, it means that more data has arrived
243 // since started waiting. Wait a little longer and continue to buffer.
244 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
mmenke 2015/12/11 22:19:58 Why do we need more_read_data_pending_? Can't we
xunjieli 2015/12/11 23:48:41 I was copying SpdyHttpStream's logic. The ShouldWa
mmenke 2015/12/14 19:48:37 Hrm...I think the extra variable here makes this c
xunjieli 2015/12/14 21:03:05 I don't think ShouldWaitForMoreBufferedData is eno
245 ScheduleBufferedRead();
246 return;
247 }
248
249 int rv = 0;
250 if (user_buffer_) {
251 rv = ReadData(user_buffer_.get(), user_buffer_len_);
252 DCHECK_NE(ERR_IO_PENDING, rv);
253 user_buffer_ = nullptr;
254 user_buffer_len_ = 0;
255 delegate_->OnDataRead(rv);
256 }
257 }
258
259 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
260 if (stream_closed_)
261 return false;
262 DCHECK_GT(user_buffer_len_, 0);
263 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
264 }
265
266 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698