Index: net/spdy/spdy_stream.cc |
=================================================================== |
--- net/spdy/spdy_stream.cc (revision 39786) |
+++ net/spdy/spdy_stream.cc (working copy) |
@@ -5,6 +5,7 @@ |
#include "net/spdy/spdy_stream.h" |
#include "base/logging.h" |
+#include "base/message_loop.h" |
#include "net/http/http_request_info.h" |
#include "net/http/http_response_info.h" |
#include "net/spdy/spdy_session.h" |
@@ -31,7 +32,9 @@ |
load_log_(log), |
send_bytes_(0), |
recv_bytes_(0), |
- histograms_recorded_(false) {} |
+ histograms_recorded_(false), |
+ buffered_read_callback_pending_(false), |
+ more_read_data_pending_(false) {} |
SpdyStream::~SpdyStream() { |
DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_; |
@@ -201,6 +204,8 @@ |
LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for " |
<< stream_id_; |
+ CHECK(!response_complete_); |
+ |
// If we don't have a response, then the SYN_REPLY did not come through. |
// We cannot pass data up to the caller unless the reply headers have been |
// received. |
@@ -209,48 +214,34 @@ |
return false; |
} |
- if (length > 0) |
- recv_bytes_ += length; |
- recv_last_byte_time_ = base::TimeTicks::Now(); |
- |
// A zero-length read means that the stream is being closed. |
if (!length) { |
metrics_.StopStream(); |
download_finished_ = true; |
+ response_complete_ = true; |
+ |
+ // We need to complete any pending buffered read now. |
+ DoBufferedReadCallback(); |
+ |
OnClose(net::OK); |
return true; |
} |
// Track our bandwidth. |
metrics_.RecordBytes(length); |
+ recv_bytes_ += length; |
+ recv_last_byte_time_ = base::TimeTicks::Now(); |
- if (length > 0) { |
- // TODO(mbelshe): If read is pending, we should copy the data straight into |
- // the read buffer here. For now, we'll queue it always. |
- // TODO(mbelshe): We need to have some throttling on this. We shouldn't |
- // buffer an infinite amount of data. |
+ // Save the received data. |
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length); |
+ memcpy(io_buffer->data(), data, length); |
+ response_body_.push_back(io_buffer); |
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length); |
- memcpy(io_buffer->data(), data, length); |
- |
- response_body_.push_back(io_buffer); |
- } |
- |
// Note that data may be received for a SpdyStream prior to the user calling |
wtc
2010/02/25 20:04:49
Please update this comment. It refers to the test
Mike Belshe
2010/02/25 23:30:17
Done.
|
// ReadResponseBody(), therefore user_callback_ may be NULL. This may often |
// happen for server initiated streams. |
- if (user_callback_) { |
- int rv; |
- if (user_buffer_) { |
- rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); |
- CHECK(rv != ERR_IO_PENDING); |
- user_buffer_ = NULL; |
- user_buffer_len_ = 0; |
- } else { |
- rv = OK; |
- } |
- DoCallback(rv); |
- } |
+ if (user_buffer_) |
+ ScheduleBufferedReadCallback(); |
return true; |
} |
@@ -342,6 +333,61 @@ |
return result; |
} |
+void SpdyStream::ScheduleBufferedReadCallback() { |
+ // If there is already a scheduled DoBufferedReadCallback, don't issue |
+ // another one. Mark that we have received more data and return. |
+ if (buffered_read_callback_pending_) { |
+ more_read_data_pending_ = true; |
+ return; |
+ } |
+ |
+ more_read_data_pending_ = false; |
+ buffered_read_callback_pending_ = true; |
+ const int kBufferTimeMs = 1; |
+ MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod( |
+ this, &SpdyStream::DoBufferedReadCallback), kBufferTimeMs); |
+} |
+ |
+// Checks to see if we should wait for more buffered data before notifying |
+// the caller. Returns true if we should wait, false otherwise. |
+bool SpdyStream::ShouldWaitForMoreBufferedData() { |
willchan no longer on Chromium
2010/02/25 20:47:35
I think this member function can be const
Mike Belshe
2010/02/25 23:30:17
Done.
|
+ // If the response is complete, there is no point in waiting. |
+ if (response_complete_) |
+ return false; |
+ |
+ int bytes_buffered = 0; |
+ std::list<scoped_refptr<IOBufferWithSize> >::iterator it; |
+ for (it = response_body_.begin(); |
+ it != response_body_.end() && bytes_buffered < user_buffer_len_; |
+ ++it) |
+ bytes_buffered += (*it)->size(); |
+ |
+ return bytes_buffered < user_buffer_len_; |
+} |
+ |
+void SpdyStream::DoBufferedReadCallback() { |
+ buffered_read_callback_pending_ = false; |
+ |
+ // When more_read_data_pending_ is true, it means that more data has |
+ // arrived since we started waiting. Wait a little longer and continue |
+ // to buffer. |
+ if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { |
+ ScheduleBufferedReadCallback(); |
+ return; |
+ } |
+ |
+ int rv = 0; |
+ if (user_buffer_) { |
+ rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); |
+ CHECK(rv != ERR_IO_PENDING); |
+ user_buffer_ = NULL; |
+ user_buffer_len_ = 0; |
+ } |
+ |
+ if (user_callback_) |
+ DoCallback(rv); |
+} |
+ |
void SpdyStream::DoCallback(int rv) { |
CHECK(rv != ERR_IO_PENDING); |
CHECK(user_callback_); |