Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(177)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_impl.cc

Issue 1812823010: Rename net::BidirectionalStream*Job to net::BidirectionalStream*Impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address comments Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/spdy/bidirectional_stream_spdy_impl.h ('k') | net/spdy/bidirectional_stream_spdy_job.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/bidirectional_stream_spdy_job.h" 5 #include "net/spdy/bidirectional_stream_spdy_impl.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/time/time.h" 10 #include "base/time/time.h"
11 #include "base/timer/timer.h" 11 #include "base/timer/timer.h"
12 #include "net/http/bidirectional_stream_request_info.h" 12 #include "net/http/bidirectional_stream_request_info.h"
13 #include "net/spdy/spdy_buffer.h" 13 #include "net/spdy/spdy_buffer.h"
14 #include "net/spdy/spdy_header_block.h" 14 #include "net/spdy/spdy_header_block.h"
15 #include "net/spdy/spdy_http_utils.h" 15 #include "net/spdy/spdy_http_utils.h"
16 #include "net/spdy/spdy_stream.h" 16 #include "net/spdy/spdy_stream.h"
17 17
18 namespace net { 18 namespace net {
19 19
20 namespace { 20 namespace {
21 21
22 // Time to wait in millisecond to notify |delegate_| of data received. 22 // Time to wait in millisecond to notify |delegate_| of data received.
23 // Handing small chunks of data to the caller creates measurable overhead. 23 // Handing small chunks of data to the caller creates measurable overhead.
24 // So buffer data in short time-spans and send a single read notification. 24 // So buffer data in short time-spans and send a single read notification.
25 const int kBufferTimeMs = 1; 25 const int kBufferTimeMs = 1;
26 26
27 } // namespace 27 } // namespace
28 28
29 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob( 29 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
30 const base::WeakPtr<SpdySession>& spdy_session) 30 const base::WeakPtr<SpdySession>& spdy_session)
31 : spdy_session_(spdy_session), 31 : spdy_session_(spdy_session),
32 request_info_(nullptr), 32 request_info_(nullptr),
33 delegate_(nullptr), 33 delegate_(nullptr),
34 negotiated_protocol_(kProtoUnknown), 34 negotiated_protocol_(kProtoUnknown),
35 more_read_data_pending_(false), 35 more_read_data_pending_(false),
36 read_buffer_len_(0), 36 read_buffer_len_(0),
37 stream_closed_(false), 37 stream_closed_(false),
38 closed_stream_status_(ERR_FAILED), 38 closed_stream_status_(ERR_FAILED),
39 closed_stream_received_bytes_(0), 39 closed_stream_received_bytes_(0),
40 closed_stream_sent_bytes_(0), 40 closed_stream_sent_bytes_(0),
41 weak_factory_(this) {} 41 weak_factory_(this) {}
42 42
43 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() { 43 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
44 if (stream_) { 44 if (stream_) {
45 stream_->DetachDelegate(); 45 stream_->DetachDelegate();
46 DCHECK(!stream_); 46 DCHECK(!stream_);
47 } 47 }
48 } 48 }
49 49
50 void BidirectionalStreamSpdyJob::Start( 50 void BidirectionalStreamSpdyImpl::Start(
51 const BidirectionalStreamRequestInfo* request_info, 51 const BidirectionalStreamRequestInfo* request_info,
52 const BoundNetLog& net_log, 52 const BoundNetLog& net_log,
53 BidirectionalStreamJob::Delegate* delegate, 53 BidirectionalStreamImpl::Delegate* delegate,
54 scoped_ptr<base::Timer> timer) { 54 scoped_ptr<base::Timer> timer) {
55 DCHECK(!stream_); 55 DCHECK(!stream_);
56 DCHECK(timer); 56 DCHECK(timer);
57 57
58 delegate_ = delegate; 58 delegate_ = delegate;
59 timer_ = std::move(timer); 59 timer_ = std::move(timer);
60 60
61 if (!spdy_session_) { 61 if (!spdy_session_) {
62 delegate_->OnFailed(ERR_CONNECTION_CLOSED); 62 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
63 return; 63 return;
64 } 64 }
65 65
66 request_info_ = request_info; 66 request_info_ = request_info;
67 67
68 int rv = stream_request_.StartRequest( 68 int rv = stream_request_.StartRequest(
69 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, 69 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
70 request_info_->priority, net_log, 70 request_info_->priority, net_log,
71 base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized, 71 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
72 weak_factory_.GetWeakPtr())); 72 weak_factory_.GetWeakPtr()));
73 if (rv != ERR_IO_PENDING) 73 if (rv != ERR_IO_PENDING)
74 OnStreamInitialized(rv); 74 OnStreamInitialized(rv);
75 } 75 }
76 76
77 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) { 77 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
78 if (stream_) 78 if (stream_)
79 DCHECK(!stream_->IsIdle()); 79 DCHECK(!stream_->IsIdle());
80 80
81 DCHECK(buf); 81 DCHECK(buf);
82 DCHECK(buf_len); 82 DCHECK(buf_len);
83 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight"; 83 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
84 84
85 // If there is data buffered, complete the IO immediately. 85 // If there is data buffered, complete the IO immediately.
86 if (!read_data_queue_.IsEmpty()) { 86 if (!read_data_queue_.IsEmpty()) {
87 return read_data_queue_.Dequeue(buf->data(), buf_len); 87 return read_data_queue_.Dequeue(buf->data(), buf_len);
88 } else if (stream_closed_) { 88 } else if (stream_closed_) {
89 return closed_stream_status_; 89 return closed_stream_status_;
90 } 90 }
91 // Read will complete asynchronously and Delegate::OnReadCompleted will be 91 // Read will complete asynchronously and Delegate::OnReadCompleted will be
92 // called upon completion. 92 // called upon completion.
93 read_buffer_ = buf; 93 read_buffer_ = buf;
94 read_buffer_len_ = buf_len; 94 read_buffer_len_ = buf_len;
95 return ERR_IO_PENDING; 95 return ERR_IO_PENDING;
96 } 96 }
97 97
98 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data, 98 void BidirectionalStreamSpdyImpl::SendData(IOBuffer* data,
99 int length, 99 int length,
100 bool end_stream) { 100 bool end_stream) {
101 DCHECK(!stream_closed_); 101 DCHECK(!stream_closed_);
102 DCHECK(stream_); 102 DCHECK(stream_);
103 103
104 stream_->SendData(data, length, 104 stream_->SendData(data, length,
105 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); 105 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
106 } 106 }
107 107
108 void BidirectionalStreamSpdyJob::Cancel() { 108 void BidirectionalStreamSpdyImpl::Cancel() {
109 if (!stream_) 109 if (!stream_)
110 return; 110 return;
111 // Cancels the stream and detaches the delegate so it doesn't get called back. 111 // Cancels the stream and detaches the delegate so it doesn't get called back.
112 stream_->DetachDelegate(); 112 stream_->DetachDelegate();
113 DCHECK(!stream_); 113 DCHECK(!stream_);
114 } 114 }
115 115
116 NextProto BidirectionalStreamSpdyJob::GetProtocol() const { 116 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
117 return negotiated_protocol_; 117 return negotiated_protocol_;
118 } 118 }
119 119
120 int64_t BidirectionalStreamSpdyJob::GetTotalReceivedBytes() const { 120 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
121 if (stream_closed_) 121 if (stream_closed_)
122 return closed_stream_received_bytes_; 122 return closed_stream_received_bytes_;
123 123
124 if (!stream_) 124 if (!stream_)
125 return 0; 125 return 0;
126 126
127 return stream_->raw_received_bytes(); 127 return stream_->raw_received_bytes();
128 } 128 }
129 129
130 int64_t BidirectionalStreamSpdyJob::GetTotalSentBytes() const { 130 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
131 if (stream_closed_) 131 if (stream_closed_)
132 return closed_stream_sent_bytes_; 132 return closed_stream_sent_bytes_;
133 133
134 if (!stream_) 134 if (!stream_)
135 return 0; 135 return 0;
136 136
137 return stream_->raw_sent_bytes(); 137 return stream_->raw_sent_bytes();
138 } 138 }
139 139
140 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() { 140 void BidirectionalStreamSpdyImpl::OnRequestHeadersSent() {
141 DCHECK(stream_); 141 DCHECK(stream_);
142 142
143 negotiated_protocol_ = stream_->GetProtocol(); 143 negotiated_protocol_ = stream_->GetProtocol();
144 delegate_->OnHeadersSent(); 144 delegate_->OnHeadersSent();
145 } 145 }
146 146
147 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated( 147 SpdyResponseHeadersStatus BidirectionalStreamSpdyImpl::OnResponseHeadersUpdated(
148 const SpdyHeaderBlock& response_headers) { 148 const SpdyHeaderBlock& response_headers) {
149 DCHECK(stream_); 149 DCHECK(stream_);
150 150
151 delegate_->OnHeadersReceived(response_headers); 151 delegate_->OnHeadersReceived(response_headers);
152 return RESPONSE_HEADERS_ARE_COMPLETE; 152 return RESPONSE_HEADERS_ARE_COMPLETE;
153 } 153 }
154 154
155 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { 155 void BidirectionalStreamSpdyImpl::OnDataReceived(
156 scoped_ptr<SpdyBuffer> buffer) {
156 DCHECK(stream_); 157 DCHECK(stream_);
157 DCHECK(!stream_closed_); 158 DCHECK(!stream_closed_);
158 159
159 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by 160 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
160 // SpdyStream to indicate the end of stream. 161 // by SpdyStream to indicate the end of stream.
161 if (!buffer) 162 if (!buffer)
162 return; 163 return;
163 164
164 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust 165 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
165 // recv window size accordingly. 166 // recv window size accordingly.
166 read_data_queue_.Enqueue(std::move(buffer)); 167 read_data_queue_.Enqueue(std::move(buffer));
167 if (read_buffer_) { 168 if (read_buffer_) {
168 // Handing small chunks of data to the caller creates measurable overhead. 169 // Handing small chunks of data to the caller creates measurable overhead.
169 // So buffer data in short time-spans and send a single read notification. 170 // So buffer data in short time-spans and send a single read notification.
170 ScheduleBufferedRead(); 171 ScheduleBufferedRead();
171 } 172 }
172 } 173 }
173 174
174 void BidirectionalStreamSpdyJob::OnDataSent() { 175 void BidirectionalStreamSpdyImpl::OnDataSent() {
175 DCHECK(stream_); 176 DCHECK(stream_);
176 DCHECK(!stream_closed_); 177 DCHECK(!stream_closed_);
177 178
178 delegate_->OnDataSent(); 179 delegate_->OnDataSent();
179 } 180 }
180 181
181 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) { 182 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) {
182 DCHECK(stream_); 183 DCHECK(stream_);
183 DCHECK(!stream_closed_); 184 DCHECK(!stream_closed_);
184 185
185 delegate_->OnTrailersReceived(trailers); 186 delegate_->OnTrailersReceived(trailers);
186 } 187 }
187 188
188 void BidirectionalStreamSpdyJob::OnClose(int status) { 189 void BidirectionalStreamSpdyImpl::OnClose(int status) {
189 DCHECK(stream_); 190 DCHECK(stream_);
190 191
191 stream_closed_ = true; 192 stream_closed_ = true;
192 closed_stream_status_ = status; 193 closed_stream_status_ = status;
193 closed_stream_received_bytes_ = stream_->raw_received_bytes(); 194 closed_stream_received_bytes_ = stream_->raw_received_bytes();
194 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); 195 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
195 stream_.reset(); 196 stream_.reset();
196 197
197 if (status != OK) { 198 if (status != OK) {
198 delegate_->OnFailed(status); 199 delegate_->OnFailed(status);
199 return; 200 return;
200 } 201 }
201 // Complete any remaining read, as all data has been buffered. 202 // Complete any remaining read, as all data has been buffered.
202 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will 203 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
203 // do nothing. 204 // do nothing.
204 timer_->Stop(); 205 timer_->Stop();
205 DoBufferedRead(); 206 DoBufferedRead();
206 } 207 }
207 208
208 void BidirectionalStreamSpdyJob::SendRequestHeaders() { 209 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
209 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); 210 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
210 HttpRequestInfo http_request_info; 211 HttpRequestInfo http_request_info;
211 http_request_info.url = request_info_->url; 212 http_request_info.url = request_info_->url;
212 http_request_info.method = request_info_->method; 213 http_request_info.method = request_info_->method;
213 http_request_info.extra_headers = request_info_->extra_headers; 214 http_request_info.extra_headers = request_info_->extra_headers;
214 215
215 CreateSpdyHeadersFromHttpRequest( 216 CreateSpdyHeadersFromHttpRequest(
216 http_request_info, http_request_info.extra_headers, 217 http_request_info, http_request_info.extra_headers,
217 stream_->GetProtocolVersion(), true, headers.get()); 218 stream_->GetProtocolVersion(), true, headers.get());
218 stream_->SendRequestHeaders(std::move(headers), 219 stream_->SendRequestHeaders(std::move(headers),
219 request_info_->end_stream_on_headers 220 request_info_->end_stream_on_headers
220 ? NO_MORE_DATA_TO_SEND 221 ? NO_MORE_DATA_TO_SEND
221 : MORE_DATA_TO_SEND); 222 : MORE_DATA_TO_SEND);
222 } 223 }
223 224
224 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) { 225 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
225 DCHECK_NE(ERR_IO_PENDING, rv); 226 DCHECK_NE(ERR_IO_PENDING, rv);
226 if (rv == OK) { 227 if (rv == OK) {
227 stream_ = stream_request_.ReleaseStream(); 228 stream_ = stream_request_.ReleaseStream();
228 stream_->SetDelegate(this); 229 stream_->SetDelegate(this);
229 SendRequestHeaders(); 230 SendRequestHeaders();
230 return; 231 return;
231 } 232 }
232 delegate_->OnFailed(rv); 233 delegate_->OnFailed(rv);
233 } 234 }
234 235
235 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() { 236 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
236 // If there is already a scheduled DoBufferedRead, don't issue 237 // If there is already a scheduled DoBufferedRead, don't issue
237 // another one. Mark that we have received more data and return. 238 // another one. Mark that we have received more data and return.
238 if (timer_->IsRunning()) { 239 if (timer_->IsRunning()) {
239 more_read_data_pending_ = true; 240 more_read_data_pending_ = true;
240 return; 241 return;
241 } 242 }
242 243
243 more_read_data_pending_ = false; 244 more_read_data_pending_ = false;
244 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs), 245 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs),
245 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead, 246 base::Bind(&BidirectionalStreamSpdyImpl::DoBufferedRead,
246 weak_factory_.GetWeakPtr())); 247 weak_factory_.GetWeakPtr()));
247 } 248 }
248 249
249 void BidirectionalStreamSpdyJob::DoBufferedRead() { 250 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
250 DCHECK(!timer_->IsRunning()); 251 DCHECK(!timer_->IsRunning());
251 // Check to see that the stream has not errored out. 252 // Check to see that the stream has not errored out.
252 DCHECK(stream_ || stream_closed_); 253 DCHECK(stream_ || stream_closed_);
253 DCHECK(!stream_closed_ || closed_stream_status_ == OK); 254 DCHECK(!stream_closed_ || closed_stream_status_ == OK);
254 255
255 // When |more_read_data_pending_| is true, it means that more data has arrived 256 // When |more_read_data_pending_| is true, it means that more data has arrived
256 // since started waiting. Wait a little longer and continue to buffer. 257 // since started waiting. Wait a little longer and continue to buffer.
257 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { 258 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
258 ScheduleBufferedRead(); 259 ScheduleBufferedRead();
259 return; 260 return;
260 } 261 }
261 262
262 int rv = 0; 263 int rv = 0;
263 if (read_buffer_) { 264 if (read_buffer_) {
264 rv = ReadData(read_buffer_.get(), read_buffer_len_); 265 rv = ReadData(read_buffer_.get(), read_buffer_len_);
265 DCHECK_NE(ERR_IO_PENDING, rv); 266 DCHECK_NE(ERR_IO_PENDING, rv);
266 read_buffer_ = nullptr; 267 read_buffer_ = nullptr;
267 read_buffer_len_ = 0; 268 read_buffer_len_ = 0;
268 delegate_->OnDataRead(rv); 269 delegate_->OnDataRead(rv);
269 } 270 }
270 } 271 }
271 272
272 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const { 273 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
273 if (stream_closed_) 274 if (stream_closed_)
274 return false; 275 return false;
275 DCHECK_GT(read_buffer_len_, 0); 276 DCHECK_GT(read_buffer_len_, 0);
276 return read_data_queue_.GetTotalSize() < 277 return read_data_queue_.GetTotalSize() <
277 static_cast<size_t>(read_buffer_len_); 278 static_cast<size_t>(read_buffer_len_);
278 } 279 }
279 280
280 } // namespace net 281 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/bidirectional_stream_spdy_impl.h ('k') | net/spdy/bidirectional_stream_spdy_job.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698