Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: net/spdy/spdy_stream.cc

Issue 652209: Modify the SPDY stream to be buffered.... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 10 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_stream.h" 5 #include "net/spdy/spdy_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/message_loop.h"
8 #include "net/http/http_request_info.h" 9 #include "net/http/http_request_info.h"
9 #include "net/http/http_response_info.h" 10 #include "net/http/http_response_info.h"
10 #include "net/spdy/spdy_session.h" 11 #include "net/spdy/spdy_session.h"
11 12
12 namespace net { 13 namespace net {
13 14
14 SpdyStream::SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id, 15 SpdyStream::SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id,
15 bool pushed, LoadLog* log) 16 bool pushed, LoadLog* log)
16 : stream_id_(stream_id), 17 : stream_id_(stream_id),
17 priority_(0), 18 priority_(0),
18 pushed_(pushed), 19 pushed_(pushed),
19 download_finished_(false), 20 download_finished_(false),
20 metrics_(Singleton<BandwidthMetrics>::get()), 21 metrics_(Singleton<BandwidthMetrics>::get()),
21 session_(session), 22 session_(session),
22 response_(NULL), 23 response_(NULL),
23 request_body_stream_(NULL), 24 request_body_stream_(NULL),
24 response_complete_(false), 25 response_complete_(false),
25 io_state_(STATE_NONE), 26 io_state_(STATE_NONE),
26 response_status_(OK), 27 response_status_(OK),
27 user_callback_(NULL), 28 user_callback_(NULL),
28 user_buffer_(NULL), 29 user_buffer_(NULL),
29 user_buffer_len_(0), 30 user_buffer_len_(0),
30 cancelled_(false), 31 cancelled_(false),
31 load_log_(log), 32 load_log_(log),
32 send_bytes_(0), 33 send_bytes_(0),
33 recv_bytes_(0), 34 recv_bytes_(0),
34 histograms_recorded_(false) {} 35 histograms_recorded_(false),
36 buffered_read_callback_pending_(false),
37 more_read_data_pending_(false) {}
35 38
36 SpdyStream::~SpdyStream() { 39 SpdyStream::~SpdyStream() {
37 DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_; 40 DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_;
38 41
39 // TODO(willchan): We're still calling CancelStream() too many times, because 42 // TODO(willchan): We're still calling CancelStream() too many times, because
40 // inactive pending/pushed streams will still have stream_id_ set. 43 // inactive pending/pushed streams will still have stream_id_ set.
41 if (stream_id_) { 44 if (stream_id_) {
42 session_->CancelStream(stream_id_); 45 session_->CancelStream(stream_id_);
43 } else if (!response_complete_) { 46 } else if (!response_complete_) {
44 NOTREACHED(); 47 NOTREACHED();
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after
194 197
195 if (user_callback_) 198 if (user_callback_)
196 DoCallback(rv); 199 DoCallback(rv);
197 } 200 }
198 201
199 bool SpdyStream::OnDataReceived(const char* data, int length) { 202 bool SpdyStream::OnDataReceived(const char* data, int length) {
200 DCHECK_GE(length, 0); 203 DCHECK_GE(length, 0);
201 LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for " 204 LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for "
202 << stream_id_; 205 << stream_id_;
203 206
207 CHECK(!response_complete_);
208
204 // If we don't have a response, then the SYN_REPLY did not come through. 209 // If we don't have a response, then the SYN_REPLY did not come through.
205 // We cannot pass data up to the caller unless the reply headers have been 210 // We cannot pass data up to the caller unless the reply headers have been
206 // received. 211 // received.
207 if (!response_->headers) { 212 if (!response_->headers) {
208 OnClose(ERR_SYN_REPLY_NOT_RECEIVED); 213 OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
209 return false; 214 return false;
210 } 215 }
211 216
212 if (length > 0)
213 recv_bytes_ += length;
214 recv_last_byte_time_ = base::TimeTicks::Now();
215
216 // A zero-length read means that the stream is being closed. 217 // A zero-length read means that the stream is being closed.
217 if (!length) { 218 if (!length) {
218 metrics_.StopStream(); 219 metrics_.StopStream();
219 download_finished_ = true; 220 download_finished_ = true;
221 response_complete_ = true;
222
223 // We need to complete any pending buffered read now.
224 DoBufferedReadCallback();
225
220 OnClose(net::OK); 226 OnClose(net::OK);
221 return true; 227 return true;
222 } 228 }
223 229
224 // Track our bandwidth. 230 // Track our bandwidth.
225 metrics_.RecordBytes(length); 231 metrics_.RecordBytes(length);
232 recv_bytes_ += length;
233 recv_last_byte_time_ = base::TimeTicks::Now();
226 234
227 if (length > 0) { 235 // Save the received data.
228 // TODO(mbelshe): If read is pending, we should copy the data straight into 236 IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
229 // the read buffer here. For now, we'll queue it always. 237 memcpy(io_buffer->data(), data, length);
230 // TODO(mbelshe): We need to have some throttling on this. We shouldn't 238 response_body_.push_back(io_buffer);
231 // buffer an infinite amount of data.
232
233 IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
234 memcpy(io_buffer->data(), data, length);
235
236 response_body_.push_back(io_buffer);
237 }
238 239
239 // Note that data may be received for a SpdyStream prior to the user calling 240 // Note that data may be received for a SpdyStream prior to the user calling
wtc 2010/02/25 20:04:49 Please update this comment. It refers to the test
Mike Belshe 2010/02/25 23:30:17 Done.
240 // ReadResponseBody(), therefore user_callback_ may be NULL. This may often 241 // ReadResponseBody(), therefore user_callback_ may be NULL. This may often
241 // happen for server initiated streams. 242 // happen for server initiated streams.
242 if (user_callback_) { 243 if (user_buffer_)
243 int rv; 244 ScheduleBufferedReadCallback();
244 if (user_buffer_) {
245 rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
246 CHECK(rv != ERR_IO_PENDING);
247 user_buffer_ = NULL;
248 user_buffer_len_ = 0;
249 } else {
250 rv = OK;
251 }
252 DoCallback(rv);
253 }
254 245
255 return true; 246 return true;
256 } 247 }
257 248
258 void SpdyStream::OnWriteComplete(int status) { 249 void SpdyStream::OnWriteComplete(int status) {
259 // TODO(mbelshe): Check for cancellation here. If we're cancelled, we 250 // TODO(mbelshe): Check for cancellation here. If we're cancelled, we
260 // should discontinue the DoLoop. 251 // should discontinue the DoLoop.
261 252
262 if (status > 0) 253 if (status > 0)
263 send_bytes_ += status; 254 send_bytes_ += status;
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 break; 326 break;
336 default: 327 default:
337 NOTREACHED(); 328 NOTREACHED();
338 break; 329 break;
339 } 330 }
340 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE); 331 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);
341 332
342 return result; 333 return result;
343 } 334 }
344 335
336 void SpdyStream::ScheduleBufferedReadCallback() {
337 // If there is already a scheduled DoBufferedReadCallback, don't issue
338 // another one. Mark that we have received more data and return.
339 if (buffered_read_callback_pending_) {
340 more_read_data_pending_ = true;
341 return;
342 }
343
344 more_read_data_pending_ = false;
345 buffered_read_callback_pending_ = true;
346 const int kBufferTimeMs = 1;
347 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(
348 this, &SpdyStream::DoBufferedReadCallback), kBufferTimeMs);
349 }
350
351 // Checks to see if we should wait for more buffered data before notifying
352 // the caller. Returns true if we should wait, false otherwise.
353 bool SpdyStream::ShouldWaitForMoreBufferedData() {
willchan no longer on Chromium 2010/02/25 20:47:35 I think this member function can be const
Mike Belshe 2010/02/25 23:30:17 Done.
354 // If the response is complete, there is no point in waiting.
355 if (response_complete_)
356 return false;
357
358 int bytes_buffered = 0;
359 std::list<scoped_refptr<IOBufferWithSize> >::iterator it;
360 for (it = response_body_.begin();
361 it != response_body_.end() && bytes_buffered < user_buffer_len_;
362 ++it)
363 bytes_buffered += (*it)->size();
364
365 return bytes_buffered < user_buffer_len_;
366 }
367
368 void SpdyStream::DoBufferedReadCallback() {
369 buffered_read_callback_pending_ = false;
370
371 // When more_read_data_pending_ is true, it means that more data has
372 // arrived since we started waiting. Wait a little longer and continue
373 // to buffer.
374 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
375 ScheduleBufferedReadCallback();
376 return;
377 }
378
379 int rv = 0;
380 if (user_buffer_) {
381 rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
382 CHECK(rv != ERR_IO_PENDING);
383 user_buffer_ = NULL;
384 user_buffer_len_ = 0;
385 }
386
387 if (user_callback_)
388 DoCallback(rv);
389 }
390
345 void SpdyStream::DoCallback(int rv) { 391 void SpdyStream::DoCallback(int rv) {
346 CHECK(rv != ERR_IO_PENDING); 392 CHECK(rv != ERR_IO_PENDING);
347 CHECK(user_callback_); 393 CHECK(user_callback_);
348 394
349 // Since Run may result in being called back, clear user_callback_ in advance. 395 // Since Run may result in being called back, clear user_callback_ in advance.
350 CompletionCallback* c = user_callback_; 396 CompletionCallback* c = user_callback_;
351 user_callback_ = NULL; 397 user_callback_ = NULL;
352 c->Run(rv); 398 c->Run(rv);
353 } 399 }
354 400
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
447 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 493 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
448 recv_last_byte_time_ - recv_first_byte_time_); 494 recv_last_byte_time_ - recv_first_byte_time_);
449 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 495 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
450 recv_last_byte_time_ - send_time_); 496 recv_last_byte_time_ - send_time_);
451 497
452 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 498 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
453 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 499 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
454 } 500 }
455 501
456 } // namespace net 502 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698