Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(15)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Get rid of DeterministicSocketData Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/logging.h"
10 #include "base/time/time.h"
11 #include "base/timer/timer.h"
12 #include "net/base/request_priority.h"
13 #include "net/http/bidirectional_stream_request_info.h"
14 #include "net/spdy/spdy_buffer.h"
15 #include "net/spdy/spdy_header_block.h"
16 #include "net/spdy/spdy_http_utils.h"
17 #include "net/spdy/spdy_stream.h"
18
19 namespace net {
20
21 const int64 kBufferTimeMs = 1;
mef 2015/12/15 22:59:40 would be nice to have comment. Also consider movin
xunjieli 2015/12/16 00:26:09 Done.
22
23 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
24 const base::WeakPtr<SpdySession>& spdy_session)
25 : spdy_session_(spdy_session),
26 stream_closed_(false),
27 closed_stream_status_(ERR_FAILED),
28 more_read_data_pending_(false),
mef 2015/12/15 22:59:40 initialize all fields, e.g. |read_buffer_len_| is
xunjieli 2015/12/16 00:26:09 Done. Good catch!
29 negotiated_protocol_(kProtoUnknown),
30 closed_stream_received_bytes_(0),
31 closed_stream_sent_bytes_(0),
32 weak_factory_(this) {}
33
34 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
35 if (stream_) {
36 stream_->DetachDelegate();
37 DCHECK(!stream_);
38 }
39 }
40
41 void BidirectionalStreamSpdyJob::Start(
42 const BidirectionalStreamRequestInfo& request_info,
43 RequestPriority priority,
44 const BoundNetLog& net_log,
45 BidirectionalStreamJob::Delegate* delegate,
46 scoped_ptr<base::Timer> timer) {
47 DCHECK(!stream_);
48 DCHECK(timer);
49
50 delegate_ = delegate;
51 timer_ = std::move(timer);
52
53 if (!spdy_session_) {
54 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
55 return;
56 }
57
58 request_info_ = request_info;
59
60 int rv = stream_request_.StartRequest(
61 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority,
62 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
63 weak_factory_.GetWeakPtr()));
64 if (rv != ERR_IO_PENDING)
65 OnStreamInitialized(rv);
66 }
67
68 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
69 if (stream_)
70 DCHECK(!stream_->IsIdle());
71
72 DCHECK(buf);
73 DCHECK(buf_len);
74 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
75
76 if (!stream_closed_)
mef 2015/12/15 22:59:40 FWIW it seems that |stream_closed_| is equivalent
xunjieli 2015/12/16 00:26:09 Done. I got rid of this check. But |stream_closed|
77 DCHECK(stream_);
78
79 // If there is data buffered, complete the IO immediately.
80 if (!data_queue_.IsEmpty()) {
81 return data_queue_.Dequeue(buf->data(), buf_len);
82 } else if (stream_closed_) {
83 return closed_stream_status_;
84 }
85 // Read will complete asynchronously and Delegate::OnReadCompleted will be
86 // called upon completion.
87 read_buffer_ = buf;
88 read_buffer_len_ = buf_len;
89 return ERR_IO_PENDING;
90 }
91
92 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
93 int length,
94 bool end_stream) {
95 DCHECK(!stream_closed_);
96 DCHECK(stream_);
97
98 stream_->SendData(data, length,
99 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
100 }
101
102 void BidirectionalStreamSpdyJob::Cancel() {
103 if (!stream_)
104 return;
105 // Cancels the stream and detaches the delegate so it doesn't get called back.
106 stream_->DetachDelegate();
107 DCHECK(!stream_);
108 }
109
110 NextProto BidirectionalStreamSpdyJob::GetProtocol() const {
111 return negotiated_protocol_;
112 }
113
114 int64_t BidirectionalStreamSpdyJob::GetTotalReceivedBytes() const {
115 if (stream_closed_)
116 return closed_stream_received_bytes_;
117
118 if (!stream_)
119 return 0;
120
121 return stream_->raw_received_bytes();
122 }
123
124 int64_t BidirectionalStreamSpdyJob::GetTotalSentBytes() const {
125 if (stream_closed_)
126 return closed_stream_sent_bytes_;
127
128 if (!stream_)
129 return 0;
130
131 return stream_->raw_sent_bytes();
132 }
133
134 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
135 DCHECK(stream_);
136
137 negotiated_protocol_ = stream_->GetProtocol();
138 delegate_->OnHeadersSent();
139 }
140
141 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
142 const SpdyHeaderBlock& response_headers) {
143 DCHECK(stream_);
144
145 delegate_->OnHeadersReceived(response_headers);
146 return RESPONSE_HEADERS_ARE_COMPLETE;
147 }
148
149 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
150 DCHECK(stream_);
151 DCHECK(!stream_closed_);
152
153 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
154 // SpdyStream to indicate the end of stream.
155 if (!buffer)
156 return;
157
158 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
159 // recv window size accordingly.
160 data_queue_.Enqueue(std::move(buffer));
161 if (read_buffer_) {
162 // Handing small chunks of data to the caller creates measurable overhead.
163 // So buffer data in short time-spans and send a single read notification.
164 ScheduleBufferedRead();
165 }
166 }
167
168 void BidirectionalStreamSpdyJob::OnDataSent() {
169 DCHECK(stream_);
170 DCHECK(!stream_closed_);
171
172 delegate_->OnDataSent();
173 }
174
175 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
176 DCHECK(stream_);
177 DCHECK(!stream_closed_);
178
179 delegate_->OnTrailersReceived(trailers);
180 }
181
182 void BidirectionalStreamSpdyJob::OnClose(int status) {
183 DCHECK(stream_);
184
185 stream_closed_ = true;
186 closed_stream_status_ = status;
187 closed_stream_received_bytes_ = stream_->raw_received_bytes();
188 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
189 stream_.reset();
190
191 if (status != OK) {
192 delegate_->OnFailed(status);
193 return;
194 }
195 // Complete any remaining read, as all data has been buffered.
196 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
197 // do nothing.
198 DCHECK_EQ(OK, status);
mef 2015/12/15 23:14:28 this seems unneeded due to line 191.
xunjieli 2015/12/16 00:26:09 Done.
199 timer_->Stop();
200 DoBufferedRead();
201 }
202
203 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
204 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
205 HttpRequestInfo http_request_info;
206 http_request_info.url = request_info_.url;
207 http_request_info.method = request_info_.method;
208 http_request_info.extra_headers = request_info_.extra_headers;
209
210 CreateSpdyHeadersFromHttpRequest(
211 http_request_info, http_request_info.extra_headers,
212 stream_->GetProtocolVersion(), true, headers.get());
213 stream_->SendRequestHeaders(std::move(headers),
214 request_info_.end_stream_on_headers
215 ? NO_MORE_DATA_TO_SEND
216 : MORE_DATA_TO_SEND);
217 }
218
219 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
220 DCHECK_NE(ERR_IO_PENDING, rv);
221 if (rv == OK) {
222 stream_ = stream_request_.ReleaseStream();
223 stream_->SetDelegate(this);
224 SendRequestHeaders();
225 return;
226 }
227 delegate_->OnFailed(static_cast<Error>(rv));
mef 2015/12/15 23:14:28 I think delegate_->OnFailed now takes int.
xunjieli 2015/12/16 00:26:09 Done.
228 }
229
230 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
231 // If there is already a scheduled DoBufferedRead, don't issue
232 // another one. Mark that we have received more data and return.
233 if (timer_->IsRunning()) {
234 more_read_data_pending_ = true;
235 return;
236 }
237
238 more_read_data_pending_ = false;
239 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs),
240 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
241 weak_factory_.GetWeakPtr()));
242 }
243
244 void BidirectionalStreamSpdyJob::DoBufferedRead() {
245 DCHECK(!timer_->IsRunning());
246 // Check to see that the stream has not errored out.
247 DCHECK(stream_ || stream_closed_);
248 DCHECK(!stream_closed_ || closed_stream_status_ == OK);
249
250 // When |more_read_data_pending_| is true, it means that more data has arrived
251 // since started waiting. Wait a little longer and continue to buffer.
252 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
253 ScheduleBufferedRead();
254 return;
255 }
256
257 int rv = 0;
258 if (read_buffer_) {
259 rv = ReadData(read_buffer_.get(), read_buffer_len_);
260 DCHECK_NE(ERR_IO_PENDING, rv);
261 read_buffer_ = nullptr;
262 read_buffer_len_ = 0;
263 delegate_->OnDataRead(rv);
264 }
265 }
266
267 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
268 if (stream_closed_)
269 return false;
270 DCHECK_GT(read_buffer_len_, 0);
271 return data_queue_.GetTotalSize() < static_cast<size_t>(read_buffer_len_);
272 }
273
274 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698